fix: properly handle dynamic pool with zero minimum size
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 31 Dec 2023 20:19:24 +0000 (21:19 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 31 Dec 2023 20:19:24 +0000 (21:19 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
15 files changed:
.c8rc.json
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/pools/thread/dynamic.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/dynamic.test.mjs
tests/pools/selection-strategies/selection-strategies.test.mjs
tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs
tests/pools/thread/dynamic.test.mjs

index 4ef7222d3d02d0f684f37818b8742e196a1cf025..b09dc7b8af3a9a2faf9288721eff958fcc5c3ca4 100644 (file)
@@ -2,6 +2,6 @@
   "check-coverage": true,
   "lines": 90,
   "statements": 90,
-  "functions": 92,
-  "branches": 92
+  "functions": 90,
+  "branches": 90
 }
index ac5116c42ad44bc65f7724250f5b268965de85ca..ca04a2e128ef8ed5ecb8edda94277bc40a34e38a 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Properly handle dynamic pool with zero minimum size.
+
 ## [3.1.13] - 2023-12-30
 
 ### Changed
index b6f3a5d08bd838426828ecd282e73f175b7e274a..536f4a03d8a0b2e052bc515b972d1de8098a94ae 100644 (file)
@@ -607,18 +607,15 @@ export abstract class AbstractPool<
 
   private setTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].on(
-        'idleWorkerNode',
-        this.handleIdleWorkerNodeEvent
-      )
+      this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
     }
   }
 
   private unsetTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].off(
-        'idleWorkerNode',
-        this.handleIdleWorkerNodeEvent
+        'idle',
+        this.handleWorkerNodeIdleEvent
       )
     }
   }
@@ -627,7 +624,7 @@ export abstract class AbstractPool<
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].on(
         'backPressure',
-        this.handleBackPressureEvent
+        this.handleWorkerNodeBackPressureEvent
       )
     }
   }
@@ -636,7 +633,7 @@ export abstract class AbstractPool<
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].off(
         'backPressure',
-        this.handleBackPressureEvent
+        this.handleWorkerNodeBackPressureEvent
       )
     }
   }
@@ -1298,7 +1295,6 @@ export abstract class AbstractPool<
         })
       }
     })
-    const workerInfo = this.getWorkerInfo(workerNodeKey)
     this.sendToWorker(workerNodeKey, {
       checkActive: true
     })
@@ -1313,12 +1309,13 @@ export abstract class AbstractPool<
         })
       }
     }
-    workerInfo.dynamic = true
+    const workerNode = this.workerNodes[workerNodeKey]
+    workerNode.info.dynamic = true
     if (
       this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
       this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
     ) {
-      workerInfo.ready = true
+      workerNode.info.ready = true
     }
     this.checkAndEmitDynamicWorkerCreationEvents()
     return workerNodeKey
@@ -1382,14 +1379,14 @@ export abstract class AbstractPool<
     if (this.opts.enableTasksQueue === true) {
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
         this.workerNodes[workerNodeKey].on(
-          'idleWorkerNode',
-          this.handleIdleWorkerNodeEvent
+          'idle',
+          this.handleWorkerNodeIdleEvent
         )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
         this.workerNodes[workerNodeKey].on(
           'backPressure',
-          this.handleBackPressureEvent
+          this.handleWorkerNodeBackPressureEvent
         )
       }
     }
@@ -1523,7 +1520,7 @@ export abstract class AbstractPool<
     }
   }
 
-  private readonly handleIdleWorkerNodeEvent = (
+  private readonly handleWorkerNodeIdleEvent = (
     eventDetail: WorkerNodeEventDetail,
     previousStolenTask?: Task<Data>
   ): void => {
@@ -1594,7 +1591,7 @@ export abstract class AbstractPool<
     }
     sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
       .then(() => {
-        this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+        this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
         return undefined
       })
       .catch(EMPTY_FUNCTION)
@@ -1627,7 +1624,7 @@ export abstract class AbstractPool<
     }
   }
 
-  private readonly handleBackPressureEvent = (
+  private readonly handleWorkerNodeBackPressureEvent = (
     eventDetail: WorkerNodeEventDetail
   ): void => {
     if (
@@ -1696,15 +1693,14 @@ export abstract class AbstractPool<
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
     const { workerId, ready, taskFunctionNames } = message
-    if (ready === false) {
+    if (ready == null || !ready) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       throw new Error(`Worker ${workerId!} failed to initialize`)
     }
-    const workerInfo = this.getWorkerInfo(
-      this.getWorkerNodeKeyByWorkerId(workerId)
-    )
-    workerInfo.ready = ready as boolean
-    workerInfo.taskFunctionNames = taskFunctionNames
+    const workerNode =
+      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+    workerNode.info.ready = ready
+    workerNode.info.taskFunctionNames = taskFunctionNames
     if (!this.readyEventEmitted && this.ready) {
       this.readyEventEmitted = true
       this.emitter?.emit(PoolEvents.ready, this.info)
@@ -1753,7 +1749,7 @@ export abstract class AbstractPool<
           this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNodeTasksUsage.sequentiallyStolen === 0
         ) {
-          workerNode.emit('idleWorkerNode', {
+          workerNode.emit('idle', {
             // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
             workerId: workerId!,
             workerNodeKey
index 8ef11c00d8975f38cded441e0ac4a42a639e9116..564dbf54a91adec20804bc5598e800e8301e930a 100644 (file)
@@ -41,7 +41,10 @@ export class DynamicClusterPool<
 
   /** @inheritDoc */
   protected shallCreateDynamicWorker (): boolean {
-    return !this.full && this.internalBusy()
+    return (
+      (!this.full && this.internalBusy()) ||
+      (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
+    )
   }
 
   /** @inheritDoc */
index 58982f192ee457865e46115203b9509bdc24e29d..f87465582b2070ee748bcd650806f29b8aeebeb3 100644 (file)
@@ -78,6 +78,9 @@ export class FairShareWorkerChoiceStrategy<
   }
 
   private fairShareNextWorkerNodeKey (): number | undefined {
+    if (this.pool.workerNodes.length === 0) {
+      return undefined
+    }
     return this.pool.workerNodes.reduce(
       (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
         if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
index 0f2e123979561ce811fc7d81f9334b9d5d837e83..5ef438f6084a1d3fb0d26326870bf351d5a5972d 100644 (file)
@@ -69,6 +69,9 @@ export class LeastBusyWorkerChoiceStrategy<
   }
 
   private leastBusyNextWorkerNodeKey (): number | undefined {
+    if (this.pool.workerNodes.length === 0) {
+      return undefined
+    }
     return this.pool.workerNodes.reduce(
       (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
         return this.isWorkerNodeReady(workerNodeKey) &&
index 588c75d9df7dd07b8ab961d9f908d6f58f4955f2..c3fe32d144176d88e848a8923fa1eb4b63126f88 100644 (file)
@@ -65,6 +65,9 @@ export class LeastEluWorkerChoiceStrategy<
   }
 
   private leastEluNextWorkerNodeKey (): number | undefined {
+    if (this.pool.workerNodes.length === 0) {
+      return undefined
+    }
     return this.pool.workerNodes.reduce(
       (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
         return this.isWorkerNodeReady(workerNodeKey) &&
index 5e8a09b42b86aae9e6b59e8ac77322b8705d890f..e45d57549580b883c1f6a185dc7701b861004336 100644 (file)
@@ -51,6 +51,9 @@ export class LeastUsedWorkerChoiceStrategy<
   }
 
   private leastUsedNextWorkerNodeKey (): number | undefined {
+    if (this.pool.workerNodes.length === 0) {
+      return undefined
+    }
     return this.pool.workerNodes.reduce(
       (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
         return this.isWorkerNodeReady(workerNodeKey) &&
index 9e18fdc4315fc322f7aabc5e49c92bdaa3a46767..46e7ac64221d500f87f7d344c4687849bb0361df 100644 (file)
@@ -169,14 +169,10 @@ export class WorkerChoiceStrategyContext<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
    */
   public execute (): number {
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    const workerChoiceStrategy = this.workerChoiceStrategies.get(
-      this.workerChoiceStrategy
-    )!
-    if (!workerChoiceStrategy.hasPoolWorkerNodesReady()) {
-      return this.execute()
-    }
-    return this.executeStrategy(workerChoiceStrategy)
+    return this.executeStrategy(
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.workerChoiceStrategies.get(this.workerChoiceStrategy)!
+    )
   }
 
   /**
@@ -184,7 +180,7 @@ export class WorkerChoiceStrategyContext<
    *
    * @param workerChoiceStrategy - The worker choice strategy.
    * @returns The key of the worker node.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
    */
   private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
     let workerNodeKey: number | undefined
index 2a7d0149392a4295ec8e32717c68a84a140cef53..836cae0b0615e1f74733e09f9f21b450bf15506b 100644 (file)
@@ -41,7 +41,10 @@ export class DynamicThreadPool<
 
   /** @inheritDoc */
   protected shallCreateDynamicWorker (): boolean {
-    return !this.full && this.internalBusy()
+    return (
+      (!this.full && this.internalBusy()) ||
+      (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
+    )
   }
 
   /** @inheritDoc */
index b4b9d706d0a10c9fa3fbb5ff1a24c1af81f46624..3fa8e2d6c29ab8626969d0a86f62648ca35990f7 100644 (file)
@@ -221,6 +221,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.mjs'
     )
     expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
+    expect(pool.emitter.eventNames()).toStrictEqual([])
     expect(pool.opts).toStrictEqual({
       startWorkers: true,
       enableEvents: true,
index 8c9196d1e9976db770a21865853187a862d34a00..870a5f4df9aa0f3bc997b4463f67746d3e7d8c42 100644 (file)
@@ -152,4 +152,23 @@ describe('Dynamic cluster pool test suite', () => {
     // We need to clean up the resources after our test
     await pool.destroy()
   })
+
+  it.skip('Verify that a pool with zero worker works', async () => {
+    const pool = new DynamicClusterPool(
+      0,
+      max,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    expect(pool.starting).toBe(false)
+    expect(pool.workerNodes.length).toBe(pool.info.minSize)
+    const maxMultiplier = 10000
+    const promises = new Set()
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      promises.add(pool.execute())
+    }
+    await Promise.all(promises)
+    expect(pool.workerNodes.length).toBe(max)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
 })
index 8dc06262d2d35418b1ff050bff843ce03b1efe97..bca3c82b1647e22146b6094321d7c9ac60485403 100644 (file)
@@ -143,25 +143,6 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
-    const pool = new DynamicThreadPool(
-      min,
-      max,
-      './tests/worker-files/thread/testWorker.mjs'
-    )
-    expect(pool.starting).toBe(false)
-    expect(pool.workerNodes.length).toBe(min)
-    const maxMultiplier = 10000
-    const promises = new Set()
-    for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.add(pool.execute())
-    }
-    await Promise.all(promises)
-    expect(pool.workerNodes.length).toBe(max)
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
   it('Verify ROUND_ROBIN strategy default policy', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
     let pool = new FixedThreadPool(
index 85ec426359c3fdbc0e0830404fe02d56886d7d99..77f1d4a1cfa6e63ae382df30d506e17701266576 100644 (file)
@@ -158,11 +158,6 @@ describe('Worker choice strategy context test suite', () => {
       workerChoiceStrategyStub
     )
     const chosenWorkerKey = workerChoiceStrategyContext.execute()
-    expect(
-      workerChoiceStrategyContext.workerChoiceStrategies.get(
-        workerChoiceStrategyContext.workerChoiceStrategy
-      ).hasPoolWorkerNodesReady.callCount
-    ).toBe(6)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
         workerChoiceStrategyContext.workerChoiceStrategy
@@ -171,29 +166,6 @@ describe('Worker choice strategy context test suite', () => {
     expect(chosenWorkerKey).toBe(1)
   })
 
-  it('Verify that execute() throws error if worker choice strategy recursion reach the maximum depth', () => {
-    const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
-      fixedPool
-    )
-    const workerChoiceStrategyStub = createStubInstance(
-      RoundRobinWorkerChoiceStrategy,
-      {
-        hasPoolWorkerNodesReady: stub().returns(false),
-        choose: stub().returns(0)
-      }
-    )
-    expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    workerChoiceStrategyContext.workerChoiceStrategies.set(
-      workerChoiceStrategyContext.workerChoiceStrategy,
-      workerChoiceStrategyStub
-    )
-    expect(() => workerChoiceStrategyContext.execute()).toThrow(
-      new RangeError('Maximum call stack size exceeded')
-    )
-  })
-
   it('Verify that execute() return the worker node key chosen by the strategy with dynamic pool', () => {
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       dynamicPool
index 06cf64c64081353b9b5db74aed8b2dc39088ffa7..9d35ce8aa8088b58809b4c54d67ec2712e06d974 100644 (file)
@@ -152,4 +152,23 @@ describe('Dynamic thread pool test suite', () => {
     // We need to clean up the resources after our test
     await pool.destroy()
   })
+
+  it('Verify that a pool with zero worker works', async () => {
+    const pool = new DynamicThreadPool(
+      0,
+      max,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    expect(pool.starting).toBe(false)
+    expect(pool.workerNodes.length).toBe(pool.info.minSize)
+    const maxMultiplier = 10000
+    const promises = new Set()
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      promises.add(pool.execute())
+    }
+    await Promise.all(promises)
+    expect(pool.workerNodes.length).toBe(max)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
 })