From e1e4d404cecf16046b7a7e570cd115107c7c2070 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 7 Jul 2025 12:20:22 +0200 Subject: [PATCH] fix: close potential event listeners leak MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- benchmarks/benchmarks-utils.cjs | 8 +++- src/pools/utils.ts | 32 +++++++------ tests/test-utils.cjs | 83 ++++++++++++++++++++++++--------- 3 files changed, 88 insertions(+), 35 deletions(-) diff --git a/benchmarks/benchmarks-utils.cjs b/benchmarks/benchmarks-utils.cjs index fa266a753..222134c25 100644 --- a/benchmarks/benchmarks-utils.cjs +++ b/benchmarks/benchmarks-utils.cjs @@ -24,10 +24,16 @@ const jsonIntegerSerialization = n => { * @returns - The nth fibonacci number. */ const fibonacci = n => { + if (n === 0) { + return 0n + } + if (n === 1) { + return 1n + } n = BigInt(n) let current = 1n let previous = 0n - while (--n) { + while (n-- > 1n) { const tmp = current current += previous previous = tmp diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 025402749..630ec8e21 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -461,30 +461,36 @@ export const waitWorkerNodeEvents = async < numberOfEventsToWait: number, timeout: number ): Promise => { - return await new Promise(resolve => { + return await new Promise((resolve, reject) => { let events = 0 if (numberOfEventsToWait === 0) { resolve(events) return } + const listener = () => { + ++events + if (events === numberOfEventsToWait) { + if (timeoutHandle != null) clearTimeout(timeoutHandle) + workerNode.off(workerNodeEvent, listener) + resolve(events) + } + } + const timeoutHandle = + timeout >= 0 + ? setTimeout(() => { + workerNode.off(workerNodeEvent, listener) + resolve(events) + }, timeout) + : undefined switch (workerNodeEvent) { case 'backPressure': case 'idle': case 'taskFinished': - workerNode.on(workerNodeEvent, () => { - ++events - if (events === numberOfEventsToWait) { - resolve(events) - } - }) + workerNode.on(workerNodeEvent, listener) break default: - throw new Error('Invalid worker node event') - } - if (timeout >= 0) { - setTimeout(() => { - resolve(events) - }, timeout) + if (timeoutHandle != null) clearTimeout(timeoutHandle) + reject(new Error('Invalid worker node event')) } }) } diff --git a/tests/test-utils.cjs b/tests/test-utils.cjs index 5c6f394d7..a2d29cc06 100644 --- a/tests/test-utils.cjs +++ b/tests/test-utils.cjs @@ -1,37 +1,72 @@ const { TaskFunctions } = require('./test-types.cjs') -const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => { - return await new Promise(resolve => { +const waitWorkerEvents = async ( + pool, + workerEvent, + numberOfEventsToWait, + timeoutMs = 5000 +) => { + return await new Promise((resolve, reject) => { let events = 0 if (numberOfEventsToWait === 0) { resolve(events) return } - for (const workerNode of pool.workerNodes) { - workerNode.worker.on(workerEvent, () => { - ++events - if (events === numberOfEventsToWait) { - resolve(events) - } + const listeners = [] + const timeout = setTimeout(() => { + listeners.forEach(({ listener, workerNode }) => { + workerNode.worker.off(workerEvent, listener) }) + reject( + new Error( + `Timed out after ${timeoutMs}ms waiting for ${numberOfEventsToWait} '${workerEvent}' events. Received ${events}.` + ) + ) + }, timeoutMs) + const listener = () => { + events++ + if (events === numberOfEventsToWait) { + clearTimeout(timeout) + listeners.forEach(({ listener, workerNode }) => { + workerNode.worker.off(workerEvent, listener) + }) + resolve(events) + } + } + for (const workerNode of pool.workerNodes) { + listeners.push({ listener, workerNode }) + workerNode.worker.on(workerEvent, listener) } }) } -const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => { - return await new Promise(resolve => { - let events = 0 - if (numberOfEventsToWait === 0) { - resolve(events) - return - } - pool.emitter?.on(poolEvent, () => { - ++events - if (events === numberOfEventsToWait) { - resolve(events) +const waitPoolEvents = async ( + pool, + poolEvent, + numberOfEventsToWait, + timeoutMs = 5000 +) => { + const eventPromises = [] + const eventPromise = (eventEmitter, event, timeoutMs = 5000) => { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + eventEmitter.off(event, listener) + reject(new Error(`Event '${event}' timed out after ${timeoutMs}ms`)) + }, timeoutMs) + + const listener = evt => { + clearTimeout(timeout) + eventEmitter.off(event, listener) + resolve(evt) } + + eventEmitter.on(event, listener) }) - }) + } + for (let i = 0; i < numberOfEventsToWait; i++) { + eventPromises.push(eventPromise(pool.emitter, poolEvent, timeoutMs)) + } + return await Promise.all(eventPromises) } const sleep = async ms => { @@ -70,9 +105,15 @@ const jsonIntegerSerialization = n => { * @returns - The nth fibonacci number. */ const fibonacci = n => { + if (n === 0) { + return 0 + } + if (n === 1) { + return 1 + } let current = 1 let previous = 0 - while (--n) { + while (n-- > 1) { const tmp = current current += previous previous = tmp -- 2.43.0