]> Piment Noir Git Repositories - poolifier.git/commitdiff
fix: close potential event listeners leak
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 7 Jul 2025 10:20:22 +0000 (12:20 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 7 Jul 2025 10:20:22 +0000 (12:20 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
benchmarks/benchmarks-utils.cjs
src/pools/utils.ts
tests/test-utils.cjs

index fa266a7538af96a0726b03124abebfc870aa14db..222134c25cb240eaf985ead82b0d2b204eddc50f 100644 (file)
@@ -24,10 +24,16 @@ const jsonIntegerSerialization = n => {
  * @returns - The nth fibonacci number.
  */
 const fibonacci = n => {
+  if (n === 0) {
+    return 0n
+  }
+  if (n === 1) {
+    return 1n
+  }
   n = BigInt(n)
   let current = 1n
   let previous = 0n
-  while (--n) {
+  while (n-- > 1n) {
     const tmp = current
     current += previous
     previous = tmp
index 025402749e3715756fd9e8327609bbddb2ffc1e6..630ec8e212107e9f75b1df5138b29f01a789ac18 100644 (file)
@@ -461,30 +461,36 @@ export const waitWorkerNodeEvents = async <
   numberOfEventsToWait: number,
   timeout: number
 ): Promise<number> => {
-  return await new Promise<number>(resolve => {
+  return await new Promise<number>((resolve, reject) => {
     let events = 0
     if (numberOfEventsToWait === 0) {
       resolve(events)
       return
     }
+    const listener = () => {
+      ++events
+      if (events === numberOfEventsToWait) {
+        if (timeoutHandle != null) clearTimeout(timeoutHandle)
+        workerNode.off(workerNodeEvent, listener)
+        resolve(events)
+      }
+    }
+    const timeoutHandle =
+      timeout >= 0
+        ? setTimeout(() => {
+          workerNode.off(workerNodeEvent, listener)
+          resolve(events)
+        }, timeout)
+        : undefined
     switch (workerNodeEvent) {
       case 'backPressure':
       case 'idle':
       case 'taskFinished':
-        workerNode.on(workerNodeEvent, () => {
-          ++events
-          if (events === numberOfEventsToWait) {
-            resolve(events)
-          }
-        })
+        workerNode.on(workerNodeEvent, listener)
         break
       default:
-        throw new Error('Invalid worker node event')
-    }
-    if (timeout >= 0) {
-      setTimeout(() => {
-        resolve(events)
-      }, timeout)
+        if (timeoutHandle != null) clearTimeout(timeoutHandle)
+        reject(new Error('Invalid worker node event'))
     }
   })
 }
index 5c6f394d76fd9ee9c16e4ab4f924229de0c02620..a2d29cc06cf6ca98a7d78d8abc93608a21860db0 100644 (file)
@@ -1,37 +1,72 @@
 const { TaskFunctions } = require('./test-types.cjs')
 
-const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => {
-  return await new Promise(resolve => {
+const waitWorkerEvents = async (
+  pool,
+  workerEvent,
+  numberOfEventsToWait,
+  timeoutMs = 5000
+) => {
+  return await new Promise((resolve, reject) => {
     let events = 0
     if (numberOfEventsToWait === 0) {
       resolve(events)
       return
     }
-    for (const workerNode of pool.workerNodes) {
-      workerNode.worker.on(workerEvent, () => {
-        ++events
-        if (events === numberOfEventsToWait) {
-          resolve(events)
-        }
+    const listeners = []
+    const timeout = setTimeout(() => {
+      listeners.forEach(({ listener, workerNode }) => {
+        workerNode.worker.off(workerEvent, listener)
       })
+      reject(
+        new Error(
+          `Timed out after ${timeoutMs}ms waiting for ${numberOfEventsToWait} '${workerEvent}' events. Received ${events}.`
+        )
+      )
+    }, timeoutMs)
+    const listener = () => {
+      events++
+      if (events === numberOfEventsToWait) {
+        clearTimeout(timeout)
+        listeners.forEach(({ listener, workerNode }) => {
+          workerNode.worker.off(workerEvent, listener)
+        })
+        resolve(events)
+      }
+    }
+    for (const workerNode of pool.workerNodes) {
+      listeners.push({ listener, workerNode })
+      workerNode.worker.on(workerEvent, listener)
     }
   })
 }
 
-const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
-  return await new Promise(resolve => {
-    let events = 0
-    if (numberOfEventsToWait === 0) {
-      resolve(events)
-      return
-    }
-    pool.emitter?.on(poolEvent, () => {
-      ++events
-      if (events === numberOfEventsToWait) {
-        resolve(events)
+const waitPoolEvents = async (
+  pool,
+  poolEvent,
+  numberOfEventsToWait,
+  timeoutMs = 5000
+) => {
+  const eventPromises = []
+  const eventPromise = (eventEmitter, event, timeoutMs = 5000) => {
+    return new Promise((resolve, reject) => {
+      const timeout = setTimeout(() => {
+        eventEmitter.off(event, listener)
+        reject(new Error(`Event '${event}' timed out after ${timeoutMs}ms`))
+      }, timeoutMs)
+
+      const listener = evt => {
+        clearTimeout(timeout)
+        eventEmitter.off(event, listener)
+        resolve(evt)
       }
+
+      eventEmitter.on(event, listener)
     })
-  })
+  }
+  for (let i = 0; i < numberOfEventsToWait; i++) {
+    eventPromises.push(eventPromise(pool.emitter, poolEvent, timeoutMs))
+  }
+  return await Promise.all(eventPromises)
 }
 
 const sleep = async ms => {
@@ -70,9 +105,15 @@ const jsonIntegerSerialization = n => {
  * @returns - The nth fibonacci number.
  */
 const fibonacci = n => {
+  if (n === 0) {
+    return 0
+  }
+  if (n === 1) {
+    return 1
+  }
   let current = 1
   let previous = 0
-  while (--n) {
+  while (n-- > 1) {
     const tmp = current
     current += previous
     previous = tmp