fix: fix pool ready event emission
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 22 Nov 2023 18:47:27 +0000 (19:47 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 22 Nov 2023 18:47:27 +0000 (19:47 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/dynamic.test.mjs
tests/pools/cluster/fixed.test.mjs
tests/pools/thread/dynamic.test.mjs
tests/pools/thread/fixed.test.mjs
tests/utils.test.mjs

index 60a03ae2aaaa532c9bd14b363a10b139858c5e72..13defe7d0c3f5a34d5d96ea6cd3818ac55aaccf8 100644 (file)
@@ -10,12 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Fixed
 
 - Ensure pool statuses are checked at initialization, `start()` or `destroy()`.
+- Ensure pool `ready` event can be emitted after several `start()/destroy()` cycles.
 
 ## [3.0.5] - 2023-10-27
 
 ### Fixed
 
-- Ensure pool ready event can be emitted only once.
+- Ensure pool `ready` event can be emitted only once.
 
 ## [3.0.4] - 2023-10-20
 
@@ -272,7 +273,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ### Fixed
 
-- Fix race condition between ready and task functions worker message handling at startup.
+- Fix race condition between readiness and task functions worker message handling at startup.
 - Fix duplicate task function worker usage statistics computation per task function.
 - Update task function worker usage statistics if and only if there's at least two different task functions.
 - Fix race condition at task function worker usage executing task computation leading to negative value.
index de37c0f9266cc6d6b6ca37fdd7c174ccd94afaec..b14801b69269b43cfe878dee88ef4987151dfb33 100644 (file)
@@ -17,7 +17,6 @@ import {
   max,
   median,
   min,
-  once,
   round
 } from '../utils'
 import { KillBehaviors } from '../worker/worker-options'
@@ -117,6 +116,10 @@ export abstract class AbstractPool<
    * Whether the pool is destroying or not.
    */
   private destroying: boolean
+  /**
+   * Whether the pool ready event has been emitted or not.
+   */
+  private readyEventEmitted: boolean
   /**
    * The start timestamp of the pool.
    */
@@ -167,6 +170,7 @@ export abstract class AbstractPool<
     this.started = false
     this.starting = false
     this.destroying = false
+    this.readyEventEmitted = false
     if (this.opts.startWorkers === true) {
       this.start()
     }
@@ -982,6 +986,7 @@ export abstract class AbstractPool<
     )
     this.emitter?.emit(PoolEvents.destroy, this.info)
     this.emitter?.emitDestroy()
+    this.readyEventEmitted = false
     this.destroying = false
     this.started = false
   }
@@ -1572,12 +1577,9 @@ export abstract class AbstractPool<
     )
     workerInfo.ready = message.ready as boolean
     workerInfo.taskFunctionNames = message.taskFunctionNames
-    if (this.ready) {
-      const emitPoolReadyEventOnce = once(
-        () => this.emitter?.emit(PoolEvents.ready, this.info),
-        this
-      )
-      emitPoolReadyEventOnce()
+    if (!this.readyEventEmitted && this.ready) {
+      this.readyEventEmitted = true
+      this.emitter?.emit(PoolEvents.ready, this.info)
     }
   }
 
index 535eb7e94416b41e12d4f26af9ed02e9d923262c..1246a7d05bcc3cb06b81443fb50b9929216c9a0d 100644 (file)
@@ -915,6 +915,7 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.info.started).toBe(false)
     expect(pool.info.ready).toBe(false)
+    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes).toStrictEqual([])
     await expect(pool.execute()).rejects.toThrow(
       new Error('Cannot execute a task on not started pool')
@@ -922,6 +923,8 @@ describe('Abstract pool test suite', () => {
     pool.start()
     expect(pool.info.started).toBe(true)
     expect(pool.info.ready).toBe(true)
+    await waitPoolEvents(pool, PoolEvents.ready, 1)
+    expect(pool.readyEventEmitted).toBe(true)
     expect(pool.workerNodes.length).toBe(numberOfWorkers)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode).toBeInstanceOf(WorkerNode)
index afc013f165a7d2416b39c1061d329710cd183304..2c542375e5f5fda1b83daea9fe29c3d205150f49 100644 (file)
@@ -67,6 +67,7 @@ describe('Dynamic cluster pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.started).toBe(false)
+    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(min)
     expect(poolDestroy).toBe(1)
index 426a64f00d29e051985c93371f34c9fe0ea4c86e..c9b528b9c2e5b57fedec5cdfc81f71e6f50b7ece 100644 (file)
@@ -262,6 +262,7 @@ describe('Fixed cluster pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.started).toBe(false)
+    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(numberOfWorkers)
     expect(poolDestroy).toBe(1)
index 4fcd2d1b039c3c27e3c3308399c25e2c94a1867c..ec9d51defd5ad1cc4094a02abf8aeabc4dbeeeba 100644 (file)
@@ -67,6 +67,7 @@ describe('Dynamic thread pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.started).toBe(false)
+    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(min)
     expect(poolDestroy).toBe(1)
index fc68fdb23a01a163e33d532d0dd8a1a27b1781ea..6b46512ccd4184404d41087e60884fbf814364aa 100644 (file)
@@ -293,6 +293,7 @@ describe('Fixed thread pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.started).toBe(false)
+    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(numberOfThreads)
     expect(poolDestroy).toBe(1)
index ac544f89c5a2f504a6c768c212461e30a8467ee9..be764cd217a540cac154e8b5ab686ae4bb1070e5 100644 (file)
@@ -19,7 +19,7 @@ import {
   max,
   median,
   min,
-  once,
+  // once,
   round,
   secureRandom,
   sleep
@@ -239,18 +239,18 @@ describe('Utils test suite', () => {
     expect(max(1, 1)).toBe(1)
   })
 
-  it('Verify once()', () => {
-    let called = 0
-    const fn = () => ++called
-    const onceFn = once(fn, this)
-    const result1 = onceFn()
-    expect(called).toBe(1)
-    expect(result1).toBe(1)
-    const result2 = onceFn()
-    expect(called).toBe(1)
-    expect(result2).toBe(1)
-    const result3 = onceFn()
-    expect(called).toBe(1)
-    expect(result3).toBe(1)
-  })
+  // it('Verify once()', () => {
+  //   let called = 0
+  //   const fn = () => ++called
+  //   const onceFn = once(fn, this)
+  //   const result1 = onceFn()
+  //   expect(called).toBe(1)
+  //   expect(result1).toBe(1)
+  //   const result2 = onceFn()
+  //   expect(called).toBe(1)
+  //   expect(result2).toBe(1)
+  //   const result3 = onceFn()
+  //   expect(called).toBe(1)
+  //   expect(result3).toBe(1)
+  // })
 })