fix: fix dynamic pool with minimum # of workers set to zero
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 5 Jan 2024 22:33:09 +0000 (23:33 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 5 Jan 2024 22:33:09 +0000 (23:33 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
13 files changed:
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/pools/thread/dynamic.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/dynamic.test.mjs
tests/pools/thread/dynamic.test.mjs
tests/pools/thread/fixed.test.mjs

index f98d5cdeabc725ed1bfce1f89450ad922705738d..ab114e1a3fbdf6a5ae1575d7ba6dbf43615ef0b9 100644 (file)
@@ -443,6 +443,9 @@ export abstract class AbstractPool<
    * The pool readiness boolean status.
    */
   private get ready (): boolean {
+    if (this.empty) {
+      return false
+    }
     return (
       this.workerNodes.reduce(
         (accumulator, workerNode) =>
@@ -454,6 +457,16 @@ export abstract class AbstractPool<
     )
   }
 
+  /**
+   * The pool emptiness boolean status.
+   */
+  protected get empty (): boolean {
+    if (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) {
+      return true
+    }
+    return false
+  }
+
   /**
    * The approximate pool utilization.
    *
@@ -1711,6 +1724,13 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkAndEmitReadyEvent (): void {
+    if (!this.readyEventEmitted && this.ready) {
+      this.emitter?.emit(PoolEvents.ready, this.info)
+      this.readyEventEmitted = true
+    }
+  }
+
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
     const { workerId, ready, taskFunctionNames } = message
     if (ready == null || !ready) {
@@ -1720,10 +1740,7 @@ export abstract class AbstractPool<
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     workerNode.info.ready = ready
     workerNode.info.taskFunctionNames = taskFunctionNames
-    if (!this.readyEventEmitted && this.ready) {
-      this.emitter?.emit(PoolEvents.ready, this.info)
-      this.readyEventEmitted = true
-    }
+    this.checkAndEmitReadyEvent()
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
@@ -1847,6 +1864,13 @@ export abstract class AbstractPool<
     return workerNodeKey
   }
 
+  private checkAndEmitEmptyEvent (): void {
+    if (this.empty) {
+      this.emitter?.emit(PoolEvents.empty, this.info)
+      this.readyEventEmitted = false
+    }
+  }
+
   /**
    * Removes the worker node from the pool worker nodes.
    *
@@ -1858,6 +1882,7 @@ export abstract class AbstractPool<
       this.workerNodes.splice(workerNodeKey, 1)
       this.workerChoiceStrategyContext?.remove(workerNodeKey)
     }
+    this.checkAndEmitEmptyEvent()
   }
 
   protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
index 7b2c37984ece0bff91021777653d90e6c3e5ccab..e8e51947246602194951bb4366c68c9cf9d63ad4 100644 (file)
@@ -40,10 +40,7 @@ export class DynamicClusterPool<
 
   /** @inheritDoc */
   protected shallCreateDynamicWorker (): boolean {
-    return (
-      (!this.full && this.internalBusy()) ||
-      (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
-    )
+    return (!this.full && this.internalBusy()) || this.empty
   }
 
   /** @inheritDoc */
index 3aa9feb70b04d8ea3b321777dd053d9823d994b0..cda63d881f0bab742cf0412cfa266714fdefeb27 100644 (file)
@@ -42,6 +42,7 @@ export const PoolEvents = Object.freeze({
   ready: 'ready',
   busy: 'busy',
   full: 'full',
+  empty: 'empty',
   destroy: 'destroy',
   error: 'error',
   taskError: 'taskError',
@@ -246,9 +247,10 @@ export interface IPool<
    *
    * Events that can currently be listened to:
    *
-   * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready.
+   * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready.
    * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
    * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
+   * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
    * - `'destroy'`: Emitted when the pool is destroyed.
    * - `'error'`: Emitted when an uncaught error occurs.
    * - `'taskError'`: Emitted when an error occurs while executing a task.
index 55f43092d17760653168b59bee184fc508db1d54..f244374291da55b9173129b6384b00f49d952e65 100644 (file)
@@ -132,11 +132,14 @@ export abstract class AbstractWorkerChoiceStrategy<
   }
 
   /**
-   * Check the next worker node readiness.
+   * Check the next worker node key.
    */
-  protected checkNextWorkerNodeReadiness (): void {
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) {
+  protected checkNextWorkerNodeKey (): void {
+    if (
+      this.nextWorkerNodeKey != null &&
+      (this.nextWorkerNodeKey < 0 ||
+        !this.isWorkerNodeReady(this.nextWorkerNodeKey))
+    ) {
       delete this.nextWorkerNodeKey
     }
   }
@@ -189,6 +192,9 @@ export abstract class AbstractWorkerChoiceStrategy<
    * @param workerNodeKey - The worker node key.
    */
   protected setPreviousWorkerNodeKey (workerNodeKey: number | undefined): void {
-    this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey
+    this.previousWorkerNodeKey =
+      workerNodeKey != null && workerNodeKey >= 0
+        ? workerNodeKey
+        : this.previousWorkerNodeKey
   }
 }
index 2d604e71a97002ad0980da85f2799f232608b6d3..97a3aa04e5b941a31a053c53bf3f4416d15855f0 100644 (file)
@@ -114,7 +114,9 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
   }
 
   private interleavedWeightedRoundRobinNextWorkerNodeId (): void {
-    if (
+    if (this.pool.workerNodes.length === 0) {
+      this.workerNodeId = 0
+    } else if (
       this.roundId === this.roundWeights.length - 1 &&
       this.workerNodeId === this.pool.workerNodes.length - 1
     ) {
index 41326a78ea0ec3ea2b198de4255c827774422f1b..53f3ba4fd36c0ee70f0918889e5a5edd9708645b 100644 (file)
@@ -44,7 +44,7 @@ export class RoundRobinWorkerChoiceStrategy<
     const chosenWorkerNodeKey = this.nextWorkerNodeKey
     this.setPreviousWorkerNodeKey(chosenWorkerNodeKey)
     this.roundRobinNextWorkerNodeKey()
-    this.checkNextWorkerNodeReadiness()
+    this.checkNextWorkerNodeKey()
     return chosenWorkerNodeKey
   }
 
index 94a832f626f90e03d71151be86d947a55c937b4d..38b6dd7509a0c292a4ab1cb82efdad6109d68b1f 100644 (file)
@@ -64,7 +64,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   public choose (): number | undefined {
     this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
     this.weightedRoundRobinNextWorkerNodeKey()
-    this.checkNextWorkerNodeReadiness()
+    this.checkNextWorkerNodeKey()
     return this.nextWorkerNodeKey
   }
 
index 40e2e18f303122685c70eb09e824175ab5309cef..b7e0a4a7d8d80bc98d55a859cc241f1600bfd41f 100644 (file)
@@ -188,6 +188,9 @@ export class WorkerChoiceStrategyContext<
     let retriesCount = 0
     do {
       workerNodeKey = workerChoiceStrategy.choose()
+      if (workerNodeKey != null && workerNodeKey < 0) {
+        workerNodeKey = undefined
+      }
       if (workerNodeKey == null && chooseCount > 0) {
         retriesCount++
       }
index fde371048c5755c77736c6a25545fe19aa90df8d..8e795512694aa41a2ea82b9c727e3ebd348b01c9 100644 (file)
@@ -40,10 +40,7 @@ export class DynamicThreadPool<
 
   /** @inheritDoc */
   protected shallCreateDynamicWorker (): boolean {
-    return (
-      (!this.full && this.internalBusy()) ||
-      (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
-    )
+    return (!this.full && this.internalBusy()) || this.empty
   }
 
   /** @inheritDoc */
index 3fa8e2d6c29ab8626969d0a86f62648ca35990f7..84a0195e0cfd3b1aaef9b818476891b11a61cbc5 100644 (file)
@@ -862,8 +862,8 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.info.started).toBe(false)
     expect(pool.info.ready).toBe(false)
-    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes).toStrictEqual([])
+    expect(pool.readyEventEmitted).toBe(false)
     await expect(pool.execute()).rejects.toThrow(
       new Error('Cannot execute a task on not started pool')
     )
index fa318f4bc62d451f2f08661ff8bd291e65f48b2f..d2a250c15b6b1032a6687634caa6cc905e115e00 100644 (file)
@@ -1,7 +1,11 @@
 import { expect } from 'expect'
-import { DynamicClusterPool, PoolEvents } from '../../../lib/index.cjs'
+import {
+  DynamicClusterPool,
+  PoolEvents,
+  WorkerChoiceStrategies
+} from '../../../lib/index.cjs'
 import { TaskFunctions } from '../../test-types.cjs'
-import { sleep, waitWorkerEvents } from '../../test-utils.cjs'
+import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
 
 describe('Dynamic cluster pool test suite', () => {
   const min = 1
@@ -160,18 +164,25 @@ describe('Dynamic cluster pool test suite', () => {
       './tests/worker-files/thread/testWorker.mjs'
     )
     expect(pool.starting).toBe(false)
-    expect(pool.workerNodes.length).toBe(pool.info.minSize)
-    for (let run = 0; run < 4; run++) {
-      // pool.enableTasksQueue(true, { concurrency: 2 })
-      const maxMultiplier = 10000
-      const promises = new Set()
-      for (let i = 0; i < max * maxMultiplier; i++) {
-        promises.add(pool.execute())
+    for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+      pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+      expect(pool.readyEventEmitted).toBe(false)
+      for (let run = 0; run < 2; run++) {
+        run % 2 !== 0 && pool.enableTasksQueue(true)
+        const maxMultiplier = 4
+        const promises = new Set()
+        expect(pool.workerNodes.length).toBe(pool.info.minSize)
+        for (let i = 0; i < max * maxMultiplier; i++) {
+          promises.add(pool.execute())
+        }
+        await Promise.all(promises)
+        expect(pool.readyEventEmitted).toBe(true)
+        expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
+        expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
+        await waitPoolEvents(pool, PoolEvents.empty, 1)
+        expect(pool.readyEventEmitted).toBe(false)
+        expect(pool.workerNodes.length).toBe(pool.info.minSize)
       }
-      await Promise.all(promises)
-      expect(pool.workerNodes.length).toBe(max)
-      await waitWorkerEvents(pool, 'exit', max)
-      expect(pool.workerNodes.length).toBe(pool.info.minSize)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
index e0e05c4e0b284592da86ca329d67b496f22383c0..2b5dd6112fac7c9664ec6a0ad7a0ce027aa4f371 100644 (file)
@@ -1,7 +1,11 @@
 import { expect } from 'expect'
-import { DynamicThreadPool, PoolEvents } from '../../../lib/index.cjs'
+import {
+  DynamicThreadPool,
+  PoolEvents,
+  WorkerChoiceStrategies
+} from '../../../lib/index.cjs'
 import { TaskFunctions } from '../../test-types.cjs'
-import { sleep, waitWorkerEvents } from '../../test-utils.cjs'
+import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
 
 describe('Dynamic thread pool test suite', () => {
   const min = 1
@@ -160,18 +164,25 @@ describe('Dynamic thread pool test suite', () => {
       './tests/worker-files/thread/testWorker.mjs'
     )
     expect(pool.starting).toBe(false)
-    expect(pool.workerNodes.length).toBe(pool.info.minSize)
-    for (let run = 0; run < 4; run++) {
-      // pool.enableTasksQueue(true, { concurrency: 2 })
-      const maxMultiplier = 10000
-      const promises = new Set()
-      for (let i = 0; i < max * maxMultiplier; i++) {
-        promises.add(pool.execute())
+    for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+      pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+      expect(pool.readyEventEmitted).toBe(false)
+      for (let run = 0; run < 2; run++) {
+        run % 2 !== 0 && pool.enableTasksQueue(true)
+        const maxMultiplier = 4
+        const promises = new Set()
+        expect(pool.workerNodes.length).toBe(pool.info.minSize)
+        for (let i = 0; i < max * maxMultiplier; i++) {
+          promises.add(pool.execute())
+        }
+        await Promise.all(promises)
+        expect(pool.readyEventEmitted).toBe(true)
+        expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
+        expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
+        await waitPoolEvents(pool, PoolEvents.empty, 1)
+        expect(pool.readyEventEmitted).toBe(false)
+        expect(pool.workerNodes.length).toBe(pool.info.minSize)
       }
-      await Promise.all(promises)
-      expect(pool.workerNodes.length).toBe(max)
-      await waitWorkerEvents(pool, 'exit', max)
-      expect(pool.workerNodes.length).toBe(pool.info.minSize)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
index 85415d63d2b9cdb3835c9e1fd5354c59211bb74b..44e7d80ba7c7f4049ec2305ab6b039fb38039124 100644 (file)
@@ -300,8 +300,8 @@ describe('Fixed thread pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.started).toBe(false)
-    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.emitter.eventNames()).toStrictEqual([])
+    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(numberOfThreads)
     expect(poolDestroy).toBe(1)