-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathpromisebuffer.ts
108 lines (96 loc) · 3.85 KB
/
promisebuffer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import { SyncPromise, rejectedSyncPromise, resolvedSyncPromise } from './syncpromise';
export interface PromiseBuffer<T> {
// exposes the internal array so tests can assert on the state of it.
// XXX: this really should not be public api.
$: Array<PromiseLike<T>>;
add(taskProducer: () => PromiseLike<T>): PromiseLike<T>;
drain(timeout?: number): PromiseLike<boolean>;
}
export const SENTRY_BUFFER_FULL_ERROR = Symbol.for('SentryBufferFullError');
/**
* Creates an new PromiseBuffer object with the specified limit
* @param limit max number of promises that can be stored in the buffer
*/
export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
const buffer: Array<PromiseLike<T>> = [];
function isReady(): boolean {
return limit === undefined || buffer.length < limit;
}
/**
* Remove a promise from the queue.
*
* @param task Can be any PromiseLike<T>
* @returns Removed promise.
*/
function remove(task: PromiseLike<T>): PromiseLike<T | void> {
return buffer.splice(buffer.indexOf(task), 1)[0] || Promise.resolve(undefined);
}
/**
* Add a promise (representing an in-flight action) to the queue, and set it to remove itself on fulfillment.
*
* @param taskProducer A function producing any PromiseLike<T>; In previous versions this used to be `task:
* PromiseLike<T>`, but under that model, Promises were instantly created on the call-site and their executor
* functions therefore ran immediately. Thus, even if the buffer was full, the action still happened. By
* requiring the promise to be wrapped in a function, we can defer promise creation until after the buffer
* limit check.
* @returns The original promise.
*/
function add(taskProducer: () => PromiseLike<T>): PromiseLike<T> {
if (!isReady()) {
return rejectedSyncPromise(SENTRY_BUFFER_FULL_ERROR);
}
// start the task and add its promise to the queue
const task = taskProducer();
if (buffer.indexOf(task) === -1) {
buffer.push(task);
}
void task
.then(() => remove(task))
// Use `then(null, rejectionHandler)` rather than `catch(rejectionHandler)` so that we can use `PromiseLike`
// rather than `Promise`. `PromiseLike` doesn't have a `.catch` method, making its polyfill smaller. (ES5 didn't
// have promises, so TS has to polyfill when down-compiling.)
.then(null, () =>
remove(task).then(null, () => {
// We have to add another catch here because `remove()` starts a new promise chain.
}),
);
return task;
}
/**
* Wait for all promises in the queue to resolve or for timeout to expire, whichever comes first.
*
* @param timeout The time, in ms, after which to resolve to `false` if the queue is still non-empty. Passing `0` (or
* not passing anything) will make the promise wait as long as it takes for the queue to drain before resolving to
* `true`.
* @returns A promise which will resolve to `true` if the queue is already empty or drains before the timeout, and
* `false` otherwise
*/
function drain(timeout?: number): PromiseLike<boolean> {
return new SyncPromise<boolean>((resolve, reject) => {
let counter = buffer.length;
if (!counter) {
return resolve(true);
}
// wait for `timeout` ms and then resolve to `false` (if not cancelled first)
const capturedSetTimeout = setTimeout(() => {
if (timeout && timeout > 0) {
resolve(false);
}
}, timeout);
// if all promises resolve in time, cancel the timer and resolve to `true`
buffer.forEach(item => {
void resolvedSyncPromise(item).then(() => {
if (!--counter) {
clearTimeout(capturedSetTimeout);
resolve(true);
}
}, reject);
});
});
}
return {
$: buffer,
add,
drain,
};
}