numberOfEventsToWait: number,
timeout: number
): Promise<number> => {
- return await new Promise<number>(resolve => {
+ return await new Promise<number>((resolve, reject) => {
let events = 0
if (numberOfEventsToWait === 0) {
resolve(events)
return
}
+ const listener = () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ if (timeoutHandle != null) clearTimeout(timeoutHandle)
+ workerNode.off(workerNodeEvent, listener)
+ resolve(events)
+ }
+ }
+ const timeoutHandle =
+ timeout >= 0
+ ? setTimeout(() => {
+ workerNode.off(workerNodeEvent, listener)
+ resolve(events)
+ }, timeout)
+ : undefined
switch (workerNodeEvent) {
case 'backPressure':
case 'idle':
case 'taskFinished':
- workerNode.on(workerNodeEvent, () => {
- ++events
- if (events === numberOfEventsToWait) {
- resolve(events)
- }
- })
+ workerNode.on(workerNodeEvent, listener)
break
default:
- throw new Error('Invalid worker node event')
- }
- if (timeout >= 0) {
- setTimeout(() => {
- resolve(events)
- }, timeout)
+ if (timeoutHandle != null) clearTimeout(timeoutHandle)
+ reject(new Error('Invalid worker node event'))
}
})
}
const { TaskFunctions } = require('./test-types.cjs')
-const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => {
- return await new Promise(resolve => {
+const waitWorkerEvents = async (
+ pool,
+ workerEvent,
+ numberOfEventsToWait,
+ timeoutMs = 5000
+) => {
+ return await new Promise((resolve, reject) => {
let events = 0
if (numberOfEventsToWait === 0) {
resolve(events)
return
}
- for (const workerNode of pool.workerNodes) {
- workerNode.worker.on(workerEvent, () => {
- ++events
- if (events === numberOfEventsToWait) {
- resolve(events)
- }
+ const listeners = []
+ const timeout = setTimeout(() => {
+ listeners.forEach(({ listener, workerNode }) => {
+ workerNode.worker.off(workerEvent, listener)
})
+ reject(
+ new Error(
+ `Timed out after ${timeoutMs}ms waiting for ${numberOfEventsToWait} '${workerEvent}' events. Received ${events}.`
+ )
+ )
+ }, timeoutMs)
+ const listener = () => {
+ events++
+ if (events === numberOfEventsToWait) {
+ clearTimeout(timeout)
+ listeners.forEach(({ listener, workerNode }) => {
+ workerNode.worker.off(workerEvent, listener)
+ })
+ resolve(events)
+ }
+ }
+ for (const workerNode of pool.workerNodes) {
+ listeners.push({ listener, workerNode })
+ workerNode.worker.on(workerEvent, listener)
}
})
}
-const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
- return await new Promise(resolve => {
- let events = 0
- if (numberOfEventsToWait === 0) {
- resolve(events)
- return
- }
- pool.emitter?.on(poolEvent, () => {
- ++events
- if (events === numberOfEventsToWait) {
- resolve(events)
+const waitPoolEvents = async (
+ pool,
+ poolEvent,
+ numberOfEventsToWait,
+ timeoutMs = 5000
+) => {
+ const eventPromises = []
+ const eventPromise = (eventEmitter, event, timeoutMs = 5000) => {
+ return new Promise((resolve, reject) => {
+ const timeout = setTimeout(() => {
+ eventEmitter.off(event, listener)
+ reject(new Error(`Event '${event}' timed out after ${timeoutMs}ms`))
+ }, timeoutMs)
+
+ const listener = evt => {
+ clearTimeout(timeout)
+ eventEmitter.off(event, listener)
+ resolve(evt)
}
+
+ eventEmitter.on(event, listener)
})
- })
+ }
+ for (let i = 0; i < numberOfEventsToWait; i++) {
+ eventPromises.push(eventPromise(pool.emitter, poolEvent, timeoutMs))
+ }
+ return await Promise.all(eventPromises)
}
const sleep = async ms => {
* @returns - The nth fibonacci number.
*/
const fibonacci = n => {
+ if (n === 0) {
+ return 0
+ }
+ if (n === 1) {
+ return 1
+ }
let current = 1
let previous = 0
- while (--n) {
+ while (n-- > 1) {
const tmp = current
current += previous
previous = tmp