feat: add tasks queue to pool data structure
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Apr 2023 19:26:07 +0000 (21:26 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Apr 2023 19:26:07 +0000 (21:26 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
25 files changed:
CHANGELOG.md
src/index.ts
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool-internal.ts
src/pools/pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/less-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/pools/worker.ts [moved from src/pools/pool-worker.ts with 64% similarity]
src/utility-types.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.js
tests/pools/thread/dynamic.test.js
tests/test-utils.js

index b2e14ab3b276f115407a0265d0405bda5bcf4b5c..9b515d216a332b27e833546551bcfccd9fef7c79 100644 (file)
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Use monotonic high resolution timer for worker tasks run time.
 - Add worker tasks median run time to statistics.
+- Add worker tasks queue.
 
 ## [2.4.4] - 2023-04-07
 
index 349af12c369e207713af37bfac8f18efb3c929ef..ebab64522029ddcdcac620fb510a22c409361b2a 100644 (file)
@@ -8,7 +8,7 @@ export type {
   ExitHandler,
   MessageHandler,
   OnlineHandler
-} from './pools/pool-worker'
+} from './pools/worker'
 export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
 export type { WorkerChoiceStrategy } from './pools/selection-strategies/selection-strategies-types'
 export { DynamicThreadPool } from './pools/thread/dynamic'
index 75b5cf796c8a6c198c48bb8ad843bd6eae33ba29..0fb695fbc5d72dfbaf268a219d32983597c80028 100644 (file)
@@ -4,9 +4,9 @@ import { EMPTY_FUNCTION, median } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
 import { PoolEvents, type PoolOptions } from './pool'
 import { PoolEmitter } from './pool'
-import type { IPoolInternal, TasksUsage, WorkerType } from './pool-internal'
+import type { IPoolInternal } from './pool-internal'
 import { PoolType } from './pool-internal'
-import type { IPoolWorker } from './pool-worker'
+import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy
@@ -22,12 +22,12 @@ import { CircularArray } from '../circular-array'
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export abstract class AbstractPool<
-  Worker extends IPoolWorker,
+  Worker extends IWorker,
   Data = unknown,
   Response = unknown
 > implements IPoolInternal<Worker, Data, Response> {
   /** @inheritDoc */
-  public readonly workers: Array<WorkerType<Worker>> = []
+  public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
 
   /** @inheritDoc */
   public readonly emitter?: PoolEmitter
@@ -152,13 +152,15 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Gets the given worker key.
+   * Gets the given worker its worker node key.
    *
    * @param worker - The worker.
-   * @returns The worker key if the worker is found in the pool, `-1` otherwise.
+   * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
    */
-  private getWorkerKey (worker: Worker): number {
-    return this.workers.findIndex(workerItem => workerItem.worker === worker)
+  private getWorkerNodeKey (worker: Worker): number {
+    return this.workerNodes.findIndex(
+      workerNode => workerNode.worker === worker
+    )
   }
 
   /** @inheritDoc */
@@ -167,16 +169,21 @@ export abstract class AbstractPool<
   ): void {
     this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     this.opts.workerChoiceStrategy = workerChoiceStrategy
-    for (const [index, workerItem] of this.workers.entries()) {
-      this.setWorker(index, workerItem.worker, {
-        run: 0,
-        running: 0,
-        runTime: 0,
-        runTimeHistory: new CircularArray(),
-        avgRunTime: 0,
-        medRunTime: 0,
-        error: 0
-      })
+    for (const [index, workerNode] of this.workerNodes.entries()) {
+      this.setWorkerNode(
+        index,
+        workerNode.worker,
+        {
+          run: 0,
+          running: 0,
+          runTime: 0,
+          runTimeHistory: new CircularArray(),
+          avgRunTime: 0,
+          medRunTime: 0,
+          error: 0
+        },
+        workerNode.tasksQueue
+      )
     }
     this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
       workerChoiceStrategy
@@ -192,22 +199,22 @@ export abstract class AbstractPool<
   protected internalBusy (): boolean {
     return (
       this.numberOfRunningTasks >= this.numberOfWorkers &&
-      this.findFreeWorkerKey() === -1
+      this.findFreeWorkerNodeKey() === -1
     )
   }
 
   /** @inheritDoc */
-  public findFreeWorkerKey (): number {
-    return this.workers.findIndex(workerItem => {
-      return workerItem.tasksUsage.running === 0
+  public findFreeWorkerNodeKey (): number {
+    return this.workerNodes.findIndex(workerNode => {
+      return workerNode.tasksUsage?.running === 0
     })
   }
 
   /** @inheritDoc */
   public async execute (data: Data): Promise<Response> {
-    const [workerKey, worker] = this.chooseWorker()
+    const [workerNodeKey, worker] = this.chooseWorker()
     const messageId = crypto.randomUUID()
-    const res = this.internalExecute(workerKey, worker, messageId)
+    const res = this.internalExecute(workerNodeKey, worker, messageId)
     this.checkAndEmitFull()
     this.checkAndEmitBusy()
     this.sendToWorker(worker, {
@@ -222,22 +229,21 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
-      this.workers.map(async workerItem => {
-        await this.destroyWorker(workerItem.worker)
+      this.workerNodes.map(async workerNode => {
+        await this.destroyWorker(workerNode.worker)
       })
     )
   }
 
   /**
-   * Shutdowns given worker in the pool.
+   * Shutdowns the given worker.
    *
-   * @param worker - A worker within `workers`.
+   * @param worker - A worker within `workerNodes`.
    */
   protected abstract destroyWorker (worker: Worker): void | Promise<void>
 
   /**
-   * Setup hook that can be overridden by a Poolifier pool implementation
-   * to run code before workers are created in the abstract constructor.
+   * Setup hook to run code before worker node are created in the abstract constructor.
    * Can be overridden
    *
    * @virtual
@@ -255,10 +261,10 @@ export abstract class AbstractPool<
    * Hook executed before the worker task promise resolution.
    * Can be overridden.
    *
-   * @param workerKey - The worker key.
+   * @param workerNodeKey - The worker node key.
    */
-  protected beforePromiseResponseHook (workerKey: number): void {
-    ++this.workers[workerKey].tasksUsage.running
+  protected beforePromiseResponseHook (workerNodeKey: number): void {
+    ++this.workerNodes[workerNodeKey].tasksUsage.running
   }
 
   /**
@@ -295,18 +301,18 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Chooses a worker for the next task.
+   * Chooses a worker node for the next task.
    *
    * The default uses a round robin algorithm to distribute the load.
    *
-   * @returns [worker key, worker].
+   * @returns [worker node key, worker].
    */
   protected chooseWorker (): [number, Worker] {
-    let workerKey: number
+    let workerNodeKey: number
     if (
       this.type === PoolType.DYNAMIC &&
       !this.full &&
-      this.findFreeWorkerKey() === -1
+      this.findFreeWorkerNodeKey() === -1
     ) {
       const createdWorker = this.createAndSetupWorker()
       this.registerWorkerMessageListener(createdWorker, message => {
@@ -319,11 +325,11 @@ export abstract class AbstractPool<
           void this.destroyWorker(createdWorker)
         }
       })
-      workerKey = this.getWorkerKey(createdWorker)
+      workerNodeKey = this.getWorkerNodeKey(createdWorker)
     } else {
-      workerKey = this.workerChoiceStrategyContext.execute()
+      workerNodeKey = this.workerChoiceStrategyContext.execute()
     }
-    return [workerKey, this.workers[workerKey].worker]
+    return [workerNodeKey, this.workerNodes[workerNodeKey].worker]
   }
 
   /**
@@ -338,7 +344,7 @@ export abstract class AbstractPool<
   ): void
 
   /**
-   * Registers a listener callback on a given worker.
+   * Registers a listener callback on the given worker.
    *
    * @param worker - The worker which should register a listener.
    * @param listener - The message listener callback.
@@ -353,17 +359,16 @@ export abstract class AbstractPool<
   protected abstract createWorker (): Worker
 
   /**
-   * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
+   * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
    *
    * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
    *
    * @param worker - The newly created worker.
-   * @virtual
    */
   protected abstract afterWorkerSetup (worker: Worker): void
 
   /**
-   * Creates a new worker for this pool and sets it up completely.
+   * Creates a new worker and sets it up completely in the pool worker nodes.
    *
    * @returns New, completely set up worker.
    */
@@ -375,18 +380,10 @@ export abstract class AbstractPool<
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
-      this.removeWorker(worker)
+      this.removeWorkerNode(worker)
     })
 
-    this.pushWorker(worker, {
-      run: 0,
-      running: 0,
-      runTime: 0,
-      runTimeHistory: new CircularArray(),
-      avgRunTime: 0,
-      medRunTime: 0,
-      error: 0
-    })
+    this.pushWorkerNode(worker)
 
     this.afterWorkerSetup(worker)
 
@@ -417,11 +414,11 @@ export abstract class AbstractPool<
   }
 
   private async internalExecute (
-    workerKey: number,
+    workerNodeKey: number,
     worker: Worker,
     messageId: string
   ): Promise<Response> {
-    this.beforePromiseResponseHook(workerKey)
+    this.beforePromiseResponseHook(workerNodeKey)
     return await new Promise<Response>((resolve, reject) => {
       this.promiseResponseMap.set(messageId, { resolve, reject, worker })
     })
@@ -444,58 +441,70 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Gets the given worker tasks usage in the pool.
+   * Gets the given worker its tasks usage in the pool.
    *
    * @param worker - The worker.
    * @returns The worker tasks usage.
    */
   private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
-    const workerKey = this.getWorkerKey(worker)
-    if (workerKey !== -1) {
-      return this.workers[workerKey].tasksUsage
+    const workerNodeKey = this.getWorkerNodeKey(worker)
+    if (workerNodeKey !== -1) {
+      return this.workerNodes[workerNodeKey].tasksUsage
     }
-    throw new Error('Worker could not be found in the pool')
+    throw new Error('Worker could not be found in the pool worker nodes')
   }
 
   /**
-   * Pushes the given worker in the pool.
+   * Pushes the given worker in the pool worker nodes.
    *
    * @param worker - The worker.
-   * @param tasksUsage - The worker tasks usage.
+   * @returns The worker nodes length.
    */
-  private pushWorker (worker: Worker, tasksUsage: TasksUsage): void {
-    this.workers.push({
+  private pushWorkerNode (worker: Worker): number {
+    return this.workerNodes.push({
       worker,
-      tasksUsage
+      tasksUsage: {
+        run: 0,
+        running: 0,
+        runTime: 0,
+        runTimeHistory: new CircularArray(),
+        avgRunTime: 0,
+        medRunTime: 0,
+        error: 0
+      },
+      tasksQueue: []
     })
   }
 
   /**
-   * Sets the given worker in the pool.
+   * Sets the given worker in the pool worker nodes.
    *
-   * @param workerKey - The worker key.
+   * @param workerNodeKey - The worker node key.
    * @param worker - The worker.
    * @param tasksUsage - The worker tasks usage.
+   * @param tasksQueue - The worker task queue.
    */
-  private setWorker (
-    workerKey: number,
+  private setWorkerNode (
+    workerNodeKey: number,
     worker: Worker,
-    tasksUsage: TasksUsage
+    tasksUsage: TasksUsage,
+    tasksQueue: Array<Task<Data>>
   ): void {
-    this.workers[workerKey] = {
+    this.workerNodes[workerNodeKey] = {
       worker,
-      tasksUsage
+      tasksUsage,
+      tasksQueue
     }
   }
 
   /**
-   * Removes the given worker from the pool.
+   * Removes the given worker from the pool worker nodes.
    *
-   * @param worker - The worker that will be removed.
+   * @param worker - The worker.
    */
-  protected removeWorker (worker: Worker): void {
-    const workerKey = this.getWorkerKey(worker)
-    this.workers.splice(workerKey, 1)
-    this.workerChoiceStrategyContext.remove(workerKey)
+  protected removeWorkerNode (worker: Worker): void {
+    const workerNodeKey = this.getWorkerNodeKey(worker)
+    this.workerNodes.splice(workerNodeKey, 1)
+    this.workerChoiceStrategyContext.remove(workerNodeKey)
   }
 }
index b84a80ce2b31b7fc807bceae6b29e1a7eb2af5bd..9f9abb01ddaa5f3015ef4a08bd2ee046dc9312c9 100644 (file)
@@ -41,11 +41,11 @@ export class DynamicClusterPool<
 
   /** @inheritDoc */
   public get full (): boolean {
-    return this.workers.length === this.max
+    return this.workerNodes.length === this.max
   }
 
   /** @inheritDoc */
   public get busy (): boolean {
-    return this.full && this.findFreeWorkerKey() === -1
+    return this.full && this.findFreeWorkerNodeKey() === -1
   }
 }
index 78de7073b615fc10936091aca0cd203165efaac8..eef394ece07192bcb7b4bad60a6d1c534d0b2e96 100644 (file)
@@ -102,7 +102,7 @@ export class FixedClusterPool<
 
   /** @inheritDoc */
   public get full (): boolean {
-    return this.workers.length === this.numberOfWorkers
+    return this.workerNodes.length === this.numberOfWorkers
   }
 
   /** @inheritDoc */
index bf799581f138f62c4ef055e6dc40a6ee998358ea..8912dd2c18f8a99a6791b006f672442e5be5ecdf 100644 (file)
@@ -1,6 +1,5 @@
-import type { CircularArray } from '../circular-array'
 import type { IPool } from './pool'
-import type { IPoolWorker } from './pool-worker'
+import type { IWorker, WorkerNode } from './worker'
 
 /**
  * Internal pool types.
@@ -12,29 +11,6 @@ export enum PoolType {
   DYNAMIC = 'dynamic'
 }
 
-/**
- * Internal tasks usage statistics.
- */
-export interface TasksUsage {
-  run: number
-  running: number
-  runTime: number
-  runTimeHistory: CircularArray<number>
-  avgRunTime: number
-  medRunTime: number
-  error: number
-}
-
-/**
- * Internal worker type.
- *
- * @typeParam Worker - Type of worker type items which manages this pool.
- */
-export interface WorkerType<Worker extends IPoolWorker> {
-  worker: Worker
-  tasksUsage: TasksUsage
-}
-
 /**
  * Internal contract definition for a poolifier pool.
  *
@@ -43,14 +19,14 @@ export interface WorkerType<Worker extends IPoolWorker> {
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export interface IPoolInternal<
-  Worker extends IPoolWorker,
+  Worker extends IWorker,
   Data = unknown,
   Response = unknown
 > extends IPool<Data, Response> {
   /**
-   * Pool worker type items array.
+   * Pool worker nodes.
    */
-  readonly workers: Array<WorkerType<Worker>>
+  readonly workerNodes: Array<WorkerNode<Worker, Data>>
 
   /**
    * Pool type.
@@ -74,13 +50,13 @@ export interface IPoolInternal<
   readonly busy: boolean
 
   /**
-   * Finds a free worker key based on the number of tasks the worker has applied.
+   * Finds a free worker node key based on the number of tasks the worker has applied.
    *
-   * If a worker is found with `0` running tasks, it is detected as free and its key is returned.
+   * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned.
    *
    * If no free worker is found, `-1` is returned.
    *
-   * @returns A worker key if there is one, `-1` otherwise.
+   * @returns A worker node key if there is one, `-1` otherwise.
    */
-  findFreeWorkerKey: () => number
+  findFreeWorkerNodeKey: () => number
 }
index f1da5356a2e5310d799b6e595a898a3aad783600..60aceb786549ac0daabee8253f6c1fad890918fe 100644 (file)
@@ -4,7 +4,7 @@ import type {
   ExitHandler,
   MessageHandler,
   OnlineHandler
-} from './pool-worker'
+} from './worker'
 import type { WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types'
 
 /**
index 8ab3d360ca67f210db221fcdd424039799cf6668..337b50b1f07eead51a7e6f1dd962bac4654fcc93 100644 (file)
@@ -1,6 +1,6 @@
 import type { IPoolInternal } from '../pool-internal'
 import { PoolType } from '../pool-internal'
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
 import type {
   IWorkerChoiceStrategy,
   RequiredStatistics
@@ -14,7 +14,7 @@ import type {
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export abstract class AbstractWorkerChoiceStrategy<
-  Worker extends IPoolWorker,
+  Worker extends IWorker,
   Data = unknown,
   Response = unknown
 > implements IWorkerChoiceStrategy {
@@ -46,5 +46,5 @@ export abstract class AbstractWorkerChoiceStrategy<
   public abstract choose (): number
 
   /** @inheritDoc */
-  public abstract remove (workerKey: number): boolean
+  public abstract remove (workerNodeKey: number): boolean
 }
index e16c1ea213d4de664273bdd1a3c2c05063537cba..c2f85f18ae911f0c8e35bbca6ff11712b360dff8 100644 (file)
@@ -1,4 +1,4 @@
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
@@ -22,7 +22,7 @@ interface WorkerVirtualTaskTimestamp {
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export class FairShareWorkerChoiceStrategy<
-    Worker extends IPoolWorker,
+    Worker extends IWorker,
     Data = unknown,
     Response = unknown
   >
@@ -36,7 +36,7 @@ export class FairShareWorkerChoiceStrategy<
   }
 
   /**
-   *  Worker last virtual task execution timestamp.
+   * Worker last virtual task execution timestamp.
    */
   private readonly workerLastVirtualTaskTimestamp: Map<
   number,
@@ -52,8 +52,8 @@ export class FairShareWorkerChoiceStrategy<
   /** @inheritDoc */
   public choose (): number {
     let minWorkerVirtualTaskEndTimestamp = Infinity
-    let chosenWorkerKey!: number
-    for (const [index] of this.pool.workers.entries()) {
+    let chosenWorkerNodeKey!: number
+    for (const [index] of this.pool.workerNodes.entries()) {
       this.computeWorkerLastVirtualTaskTimestamp(index)
       const workerLastVirtualTaskEndTimestamp =
         this.workerLastVirtualTaskTimestamp.get(index)?.end ?? 0
@@ -61,38 +61,38 @@ export class FairShareWorkerChoiceStrategy<
         workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
       ) {
         minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp
-        chosenWorkerKey = index
+        chosenWorkerNodeKey = index
       }
     }
-    return chosenWorkerKey
+    return chosenWorkerNodeKey
   }
 
   /** @inheritDoc */
-  public remove (workerKey: number): boolean {
-    const workerDeleted = this.workerLastVirtualTaskTimestamp.delete(workerKey)
+  public remove (workerNodeKey: number): boolean {
+    const deleted = this.workerLastVirtualTaskTimestamp.delete(workerNodeKey)
     for (const [key, value] of this.workerLastVirtualTaskTimestamp.entries()) {
-      if (key > workerKey) {
+      if (key > workerNodeKey) {
         this.workerLastVirtualTaskTimestamp.set(key - 1, value)
       }
     }
-    return workerDeleted
+    return deleted
   }
 
   /**
    * Computes worker last virtual task timestamp.
    *
-   * @param workerKey - The worker key.
+   * @param workerNodeKey - The worker node key.
    */
-  private computeWorkerLastVirtualTaskTimestamp (workerKey: number): void {
+  private computeWorkerLastVirtualTaskTimestamp (workerNodeKey: number): void {
     const workerVirtualTaskStartTimestamp = Math.max(
       performance.now(),
-      this.workerLastVirtualTaskTimestamp.get(workerKey)?.end ?? -Infinity
+      this.workerLastVirtualTaskTimestamp.get(workerNodeKey)?.end ?? -Infinity
     )
-    this.workerLastVirtualTaskTimestamp.set(workerKey, {
+    this.workerLastVirtualTaskTimestamp.set(workerNodeKey, {
       start: workerVirtualTaskStartTimestamp,
       end:
         workerVirtualTaskStartTimestamp +
-        (this.pool.workers[workerKey].tasksUsage.avgRunTime ?? 0)
+        (this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime ?? 0)
     })
   }
 }
index c03c9da0a1580ce645b96df5bf24b44111aa080a..d98732d66882a66e961a04dc49e4241ae5db6e7c 100644 (file)
@@ -1,4 +1,4 @@
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
@@ -13,7 +13,7 @@ import type {
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export class LessBusyWorkerChoiceStrategy<
-    Worker extends IPoolWorker,
+    Worker extends IWorker,
     Data = unknown,
     Response = unknown
   >
@@ -33,26 +33,26 @@ export class LessBusyWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): number {
-    const freeWorkerKey = this.pool.findFreeWorkerKey()
-    if (freeWorkerKey !== -1) {
-      return freeWorkerKey
+    const freeWorkerNodeKey = this.pool.findFreeWorkerNodeKey()
+    if (freeWorkerNodeKey !== -1) {
+      return freeWorkerNodeKey
     }
     let minRunTime = Infinity
-    let lessBusyWorkerKey!: number
-    for (const [index, workerItem] of this.pool.workers.entries()) {
-      const workerRunTime = workerItem.tasksUsage.runTime
+    let lessBusyWorkerNodeKey!: number
+    for (const [index, workerNode] of this.pool.workerNodes.entries()) {
+      const workerRunTime = workerNode.tasksUsage.runTime
       if (workerRunTime === 0) {
         return index
       } else if (workerRunTime < minRunTime) {
         minRunTime = workerRunTime
-        lessBusyWorkerKey = index
+        lessBusyWorkerNodeKey = index
       }
     }
-    return lessBusyWorkerKey
+    return lessBusyWorkerNodeKey
   }
 
   /** @inheritDoc */
-  public remove (workerKey: number): boolean {
+  public remove (workerNodeKey: number): boolean {
     return true
   }
 }
index 364c54a1d3b5d6d0a5a1c853221a247c15cdd1fa..acf1e5036aeed44a7ae8f6d11394160b95aec10a 100644 (file)
@@ -1,4 +1,4 @@
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type { IWorkerChoiceStrategy } from './selection-strategies-types'
 
@@ -10,7 +10,7 @@ import type { IWorkerChoiceStrategy } from './selection-strategies-types'
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export class LessUsedWorkerChoiceStrategy<
-    Worker extends IPoolWorker,
+    Worker extends IWorker,
     Data = unknown,
     Response = unknown
   >
@@ -23,27 +23,27 @@ export class LessUsedWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): number {
-    const freeWorkerKey = this.pool.findFreeWorkerKey()
-    if (freeWorkerKey !== -1) {
-      return freeWorkerKey
+    const freeWorkerNodeKey = this.pool.findFreeWorkerNodeKey()
+    if (freeWorkerNodeKey !== -1) {
+      return freeWorkerNodeKey
     }
     let minNumberOfTasks = Infinity
-    let lessUsedWorkerKey!: number
-    for (const [index, workerItem] of this.pool.workers.entries()) {
-      const tasksUsage = workerItem.tasksUsage
+    let lessUsedWorkerNodeKey!: number
+    for (const [index, workerNode] of this.pool.workerNodes.entries()) {
+      const tasksUsage = workerNode.tasksUsage
       const workerTasks = tasksUsage.run + tasksUsage.running
       if (workerTasks === 0) {
         return index
       } else if (workerTasks < minNumberOfTasks) {
         minNumberOfTasks = workerTasks
-        lessUsedWorkerKey = index
+        lessUsedWorkerNodeKey = index
       }
     }
-    return lessUsedWorkerKey
+    return lessUsedWorkerNodeKey
   }
 
   /** @inheritDoc */
-  public remove (workerKey: number): boolean {
+  public remove (workerNodeKey: number): boolean {
     return true
   }
 }
index 29b05fa507f0b3ca8a18a300ad1c6927d9b76a65..eda00c7582e305a57c10318d2878c24045fe21fc 100644 (file)
@@ -1,4 +1,4 @@
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type { IWorkerChoiceStrategy } from './selection-strategies-types'
 
@@ -10,43 +10,43 @@ import type { IWorkerChoiceStrategy } from './selection-strategies-types'
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export class RoundRobinWorkerChoiceStrategy<
-    Worker extends IPoolWorker,
+    Worker extends IWorker,
     Data = unknown,
     Response = unknown
   >
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
   /**
-   * Id of the next worker.
+   * Id of the next worker node.
    */
-  private nextWorkerId: number = 0
+  private nextWorkerNodeId: number = 0
 
   /** @inheritDoc */
   public reset (): boolean {
-    this.nextWorkerId = 0
+    this.nextWorkerNodeId = 0
     return true
   }
 
   /** @inheritDoc */
   public choose (): number {
-    const chosenWorkerKey = this.nextWorkerId
-    this.nextWorkerId =
-      this.nextWorkerId === this.pool.workers.length - 1
+    const chosenWorkerNodeKey = this.nextWorkerNodeId
+    this.nextWorkerNodeId =
+      this.nextWorkerNodeId === this.pool.workerNodes.length - 1
         ? 0
-        : this.nextWorkerId + 1
-    return chosenWorkerKey
+        : this.nextWorkerNodeId + 1
+    return chosenWorkerNodeKey
   }
 
   /** @inheritDoc */
-  public remove (workerKey: number): boolean {
-    if (this.nextWorkerId === workerKey) {
-      if (this.pool.workers.length === 0) {
-        this.nextWorkerId = 0
+  public remove (workerNodeKey: number): boolean {
+    if (this.nextWorkerNodeId === workerNodeKey) {
+      if (this.pool.workerNodes.length === 0) {
+        this.nextWorkerNodeId = 0
       } else {
-        this.nextWorkerId =
-          this.nextWorkerId > this.pool.workers.length - 1
-            ? this.pool.workers.length - 1
-            : this.nextWorkerId
+        this.nextWorkerNodeId =
+          this.nextWorkerNodeId > this.pool.workerNodes.length - 1
+            ? this.pool.workerNodes.length - 1
+            : this.nextWorkerNodeId
       }
     }
     return true
index 2207173448458a81be1d383f74a51d4b1b08964a..f5f06a6ef6afece164a4367a3026fc6db679adc3 100644 (file)
@@ -43,7 +43,7 @@ export interface RequiredStatistics {
  */
 export interface IWorkerChoiceStrategy {
   /**
-   * Required pool tasks usage statistics.
+   * Required tasks usage statistics.
    */
   readonly requiredStatistics: RequiredStatistics
   /**
@@ -51,13 +51,13 @@ export interface IWorkerChoiceStrategy {
    */
   reset: () => boolean
   /**
-   * Chooses a worker in the pool and returns its key.
+   * Chooses a worker node in the pool and returns its key.
    */
   choose: () => number
   /**
-   * Removes a worker reference from strategy internals.
+   * Removes a worker node key from strategy internals.
    *
-   * @param workerKey - The worker key.
+   * @param workerNodeKey - The worker node key.
    */
-  remove: (workerKey: number) => boolean
+  remove: (workerNodeKey: number) => boolean
 }
index 96b5867b5bfb1db98f8a8afd4505b5c24da75505..1bdc02fb189fcf5f21676af2593f2c24854c99dd 100644 (file)
@@ -1,6 +1,6 @@
 import { cpus } from 'node:os'
 import type { IPoolInternal } from '../pool-internal'
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
@@ -24,7 +24,7 @@ interface TaskRunTime {
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export class WeightedRoundRobinWorkerChoiceStrategy<
-    Worker extends IPoolWorker,
+    Worker extends IWorker,
     Data = unknown,
     Response = unknown
   >
@@ -38,15 +38,15 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   }
 
   /**
-   * Worker id where the current task will be submitted.
+   * Worker node id where the current task will be submitted.
    */
-  private currentWorkerId: number = 0
+  private currentWorkerNodeId: number = 0
   /**
    * Default worker weight.
    */
   private readonly defaultWorkerWeight: number
   /**
-   * Per worker virtual task runtime map.
+   * Workers' virtual task runtime.
    */
   private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
   number,
@@ -66,7 +66,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public reset (): boolean {
-    this.currentWorkerId = 0
+    this.currentWorkerNodeId = 0
     this.workersTaskRunTime.clear()
     this.initWorkersTaskRunTime()
     return true
@@ -74,76 +74,79 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): number {
-    const chosenWorkerKey = this.currentWorkerId
-    if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
-      this.initWorkerTaskRunTime(chosenWorkerKey)
+    const chosenWorkerNodeKey = this.currentWorkerNodeId
+    if (
+      this.isDynamicPool &&
+      !this.workersTaskRunTime.has(chosenWorkerNodeKey)
+    ) {
+      this.initWorkerTaskRunTime(chosenWorkerNodeKey)
     }
     const workerTaskRunTime =
-      this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
+      this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0
     const workerTaskWeight =
-      this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
+      this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ??
       this.defaultWorkerWeight
     if (workerTaskRunTime < workerTaskWeight) {
       this.setWorkerTaskRunTime(
-        chosenWorkerKey,
+        chosenWorkerNodeKey,
         workerTaskWeight,
         workerTaskRunTime +
-          (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
+          (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
       )
     } else {
-      this.currentWorkerId =
-        this.currentWorkerId === this.pool.workers.length - 1
+      this.currentWorkerNodeId =
+        this.currentWorkerNodeId === this.pool.workerNodes.length - 1
           ? 0
-          : this.currentWorkerId + 1
-      this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
+          : this.currentWorkerNodeId + 1
+      this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0)
     }
-    return chosenWorkerKey
+    return chosenWorkerNodeKey
   }
 
   /** @inheritDoc */
-  public remove (workerKey: number): boolean {
-    if (this.currentWorkerId === workerKey) {
-      if (this.pool.workers.length === 0) {
-        this.currentWorkerId = 0
+  public remove (workerNodeKey: number): boolean {
+    if (this.currentWorkerNodeId === workerNodeKey) {
+      if (this.pool.workerNodes.length === 0) {
+        this.currentWorkerNodeId = 0
       } else {
-        this.currentWorkerId =
-          this.currentWorkerId > this.pool.workers.length - 1
-            ? this.pool.workers.length - 1
-            : this.currentWorkerId
+        this.currentWorkerNodeId =
+          this.currentWorkerNodeId > this.pool.workerNodes.length - 1
+            ? this.pool.workerNodes.length - 1
+            : this.currentWorkerNodeId
       }
     }
-    const workerDeleted = this.workersTaskRunTime.delete(workerKey)
+    const deleted = this.workersTaskRunTime.delete(workerNodeKey)
     for (const [key, value] of this.workersTaskRunTime) {
-      if (key > workerKey) {
+      if (key > workerNodeKey) {
         this.workersTaskRunTime.set(key - 1, value)
       }
     }
-    return workerDeleted
+    return deleted
   }
 
   private initWorkersTaskRunTime (): void {
-    for (const [index] of this.pool.workers.entries()) {
+    for (const [index] of this.pool.workerNodes.entries()) {
       this.initWorkerTaskRunTime(index)
     }
   }
 
-  private initWorkerTaskRunTime (workerKey: number): void {
-    this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
+  private initWorkerTaskRunTime (workerNodeKey: number): void {
+    this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0)
   }
 
   private setWorkerTaskRunTime (
-    workerKey: number,
+    workerNodeKey: number,
     weight: number,
     runTime: number
   ): void {
-    this.workersTaskRunTime.set(workerKey, {
+    this.workersTaskRunTime.set(workerNodeKey, {
       weight,
       runTime
     })
   }
 
-  private getWorkerVirtualTaskRunTime (workerKey: number): number {
-    return this.pool.workers[workerKey].tasksUsage.avgRunTime
+  private getWorkerVirtualTaskRunTime (workerNodeKey: number): number {
+    return this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
   }
 
   private computeWorkerWeight (): number {
index 192cf005805ecfd70c70a337c9a7ac078ce46abb..0e5fd5468e32195fce4589562c97f3bc15e8aefd 100644 (file)
@@ -1,5 +1,5 @@
 import type { IPoolInternal } from '../pool-internal'
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
 import { LessBusyWorkerChoiceStrategy } from './less-busy-worker-choice-strategy'
 import { LessUsedWorkerChoiceStrategy } from './less-used-worker-choice-strategy'
@@ -20,7 +20,7 @@ import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-w
  * @typeParam Response - Type of response of execution. This can only be serializable data.
  */
 export class WorkerChoiceStrategyContext<
-  Worker extends IPoolWorker,
+  Worker extends IWorker,
   Data = unknown,
   Response = unknown
 > {
@@ -97,7 +97,7 @@ export class WorkerChoiceStrategyContext<
   /**
    * Executes the worker choice strategy algorithm in the context.
    *
-   * @returns The key of the chosen one.
+   * @returns The key of the worker node.
    */
   public execute (): number {
     return (
@@ -108,16 +108,16 @@ export class WorkerChoiceStrategyContext<
   }
 
   /**
-   * Removes a worker from the worker choice strategy in the context.
+   * Removes a worker node key from the worker choice strategy in the context.
    *
-   * @param workerKey - The key of the worker to remove.
+   * @param workerNodeKey - The key of the worker node.
    * @returns `true` if the removal is successful, `false` otherwise.
    */
-  public remove (workerKey: number): boolean {
+  public remove (workerNodeKey: number): boolean {
     return (
       this.workerChoiceStrategies.get(
         this.workerChoiceStrategyType
       ) as IWorkerChoiceStrategy
-    ).remove(workerKey)
+    ).remove(workerNodeKey)
   }
 }
index d8cc3a7427ca0f843c2c9f839747b3a663b9b95c..e1565d00b7949b2af74cc97f44f2bdc30185e61c 100644 (file)
@@ -42,11 +42,11 @@ export class DynamicThreadPool<
 
   /** @inheritDoc */
   public get full (): boolean {
-    return this.workers.length === this.max
+    return this.workerNodes.length === this.max
   }
 
   /** @inheritDoc */
   public get busy (): boolean {
-    return this.full && this.findFreeWorkerKey() === -1
+    return this.full && this.findFreeWorkerNodeKey() === -1
   }
 }
index a95a86034fcf13abc471862bbe9d51d8aa4604d1..bc665bf1a5f13e6c136a7281b96ffc465e470c03 100644 (file)
@@ -98,7 +98,7 @@ export class FixedThreadPool<
 
   /** @inheritDoc */
   public get full (): boolean {
-    return this.workers.length === this.numberOfWorkers
+    return this.workerNodes.length === this.numberOfWorkers
   }
 
   /** @inheritDoc */
similarity index 64%
rename from src/pools/pool-worker.ts
rename to src/pools/worker.ts
index e371e9c15fd7797b956023add35375eb04a326ac..4ff2c494664b571927f9d44e04fe3acbe3ec72e7 100644 (file)
@@ -1,3 +1,5 @@
+import type { CircularArray } from '../circular-array'
+
 /**
  * Callback invoked if the worker has received a message.
  */
@@ -19,9 +21,30 @@ export type OnlineHandler<Worker> = (this: Worker) => void
 export type ExitHandler<Worker> = (this: Worker, code: number) => void
 
 /**
- * Interface that describes the minimum required implementation of listener events for a pool worker.
+ * Worker task interface.
  */
-export interface IPoolWorker {
+export interface Task<Data = unknown> {
+  data: Data
+  id: string
+}
+
+/**
+ * Worker tasks usage statistics.
+ */
+export interface TasksUsage {
+  run: number
+  running: number
+  runTime: number
+  runTimeHistory: CircularArray<number>
+  avgRunTime: number
+  medRunTime: number
+  error: number
+}
+
+/**
+ * Worker interface.
+ */
+export interface IWorker {
   /**
    * Register an event listener.
    *
@@ -40,3 +63,12 @@ export interface IPoolWorker {
    */
   once: (event: 'exit', handler: ExitHandler<this>) => void
 }
+
+/**
+ * Worker node interface.
+ */
+export interface WorkerNode<Worker extends IWorker, Data = unknown> {
+  worker: Worker
+  tasksUsage: TasksUsage
+  tasksQueue: Array<Task<Data>>
+}
index 05af4e3d396b7c89d22ba0c895ceb6c4649e5f42..458b31a055ef7d4a92e86bd9f9fbd1be46512d80 100644 (file)
@@ -1,7 +1,7 @@
 import type { Worker as ClusterWorker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
 import type { KillBehavior } from './worker/worker-options'
-import type { IPoolWorker } from './pools/pool-worker'
+import type { IWorker } from './pools/worker'
 
 /**
  * Make all properties in T non-readonly.
@@ -50,7 +50,7 @@ export interface MessageValue<
  * @typeParam Response - Type of execution response. This can only be serializable data.
  */
 export interface PromiseResponseWrapper<
-  Worker extends IPoolWorker,
+  Worker extends IWorker,
   Response = unknown
 > {
   /**
index 5063c7fd31c6f058450536dfea3a6519873a88ba..54646e3aecbc3b5e87dc5bb791045b678862c6bd 100644 (file)
@@ -11,7 +11,7 @@ const { CircularArray } = require('../../../lib/circular-array')
 describe('Abstract pool test suite', () => {
   const numberOfWorkers = 1
   const workerNotFoundInPoolError = new Error(
-    'Worker could not be found in the pool'
+    'Worker could not be found in the pool worker nodes'
   )
   class StubPoolWithRemoveAllWorker extends FixedThreadPool {
     removeAllWorker () {
@@ -141,15 +141,29 @@ describe('Abstract pool test suite', () => {
       numberOfWorkers,
       './tests/worker-files/cluster/testWorker.js'
     )
-    for (const workerItem of pool.workers) {
-      expect(workerItem.tasksUsage).toBeDefined()
-      expect(workerItem.tasksUsage.run).toBe(0)
-      expect(workerItem.tasksUsage.running).toBe(0)
-      expect(workerItem.tasksUsage.runTime).toBe(0)
-      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
-      expect(workerItem.tasksUsage.avgRunTime).toBe(0)
-      expect(workerItem.tasksUsage.medRunTime).toBe(0)
-      expect(workerItem.tasksUsage.error).toBe(0)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage).toBeDefined()
+      expect(workerNode.tasksUsage.run).toBe(0)
+      expect(workerNode.tasksUsage.running).toBe(0)
+      expect(workerNode.tasksUsage.runTime).toBe(0)
+      expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+      expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+      expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+      expect(workerNode.tasksUsage.error).toBe(0)
+    }
+    await pool.destroy()
+  })
+
+  it('Verify that worker pool tasks queue are initialized', async () => {
+    const pool = new FixedClusterPool(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksQueue).toBeDefined()
+      expect(workerNode.tasksQueue).toBeInstanceOf(Array)
+      expect(workerNode.tasksQueue.length).toBe(0)
     }
     await pool.destroy()
   })
@@ -163,26 +177,28 @@ describe('Abstract pool test suite', () => {
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.push(pool.execute())
     }
-    for (const workerItem of pool.workers) {
-      expect(workerItem.tasksUsage).toBeDefined()
-      expect(workerItem.tasksUsage.run).toBe(0)
-      expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
-      expect(workerItem.tasksUsage.runTime).toBe(0)
-      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
-      expect(workerItem.tasksUsage.avgRunTime).toBe(0)
-      expect(workerItem.tasksUsage.medRunTime).toBe(0)
-      expect(workerItem.tasksUsage.error).toBe(0)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage).toBeDefined()
+      expect(workerNode.tasksUsage.run).toBe(0)
+      expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
+      expect(workerNode.tasksUsage.runTime).toBe(0)
+      expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+      expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+      expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+      expect(workerNode.tasksUsage.error).toBe(0)
     }
     await Promise.all(promises)
-    for (const workerItem of pool.workers) {
-      expect(workerItem.tasksUsage).toBeDefined()
-      expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
-      expect(workerItem.tasksUsage.running).toBe(0)
-      expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
-      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
-      expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
-      expect(workerItem.tasksUsage.medRunTime).toBe(0)
-      expect(workerItem.tasksUsage.error).toBe(0)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage).toBeDefined()
+      expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
+      expect(workerNode.tasksUsage.running).toBe(0)
+      expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+      expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+      expect(workerNode.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
   })
@@ -198,26 +214,28 @@ describe('Abstract pool test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
-    for (const workerItem of pool.workers) {
-      expect(workerItem.tasksUsage).toBeDefined()
-      expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
-      expect(workerItem.tasksUsage.running).toBe(0)
-      expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
-      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
-      expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
-      expect(workerItem.tasksUsage.medRunTime).toBe(0)
-      expect(workerItem.tasksUsage.error).toBe(0)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage).toBeDefined()
+      expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
+      expect(workerNode.tasksUsage.running).toBe(0)
+      expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+      expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+      expect(workerNode.tasksUsage.error).toBe(0)
     }
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
-    for (const workerItem of pool.workers) {
-      expect(workerItem.tasksUsage).toBeDefined()
-      expect(workerItem.tasksUsage.run).toBe(0)
-      expect(workerItem.tasksUsage.running).toBe(0)
-      expect(workerItem.tasksUsage.runTime).toBe(0)
-      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
-      expect(workerItem.tasksUsage.avgRunTime).toBe(0)
-      expect(workerItem.tasksUsage.medRunTime).toBe(0)
-      expect(workerItem.tasksUsage.error).toBe(0)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage).toBeDefined()
+      expect(workerNode.tasksUsage.run).toBe(0)
+      expect(workerNode.tasksUsage.running).toBe(0)
+      expect(workerNode.tasksUsage.runTime).toBe(0)
+      expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+      expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+      expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+      expect(workerNode.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
   })
index 74cb5d8ff646d78c121793a665d06409ea181ce5..33a5ab8c013bac7bb4d114fa4dda0c9cfff1153b 100644 (file)
@@ -32,8 +32,8 @@ describe('Dynamic cluster pool test suite', () => {
     for (let i = 0; i < max * 2; i++) {
       pool.execute()
     }
-    expect(pool.workers.length).toBeLessThanOrEqual(max)
-    expect(pool.workers.length).toBeGreaterThan(min)
+    expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
+    expect(pool.workerNodes.length).toBeGreaterThan(min)
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
@@ -42,19 +42,19 @@ describe('Dynamic cluster pool test suite', () => {
   })
 
   it('Verify scale worker up and down is working', async () => {
-    expect(pool.workers.length).toBe(min)
+    expect(pool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.length).toBeGreaterThan(min)
+    expect(pool.workerNodes.length).toBeGreaterThan(min)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.length).toBe(min)
+    expect(pool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.length).toBeGreaterThan(min)
+    expect(pool.workerNodes.length).toBeGreaterThan(min)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.length).toBe(min)
+    expect(pool.workerNodes.length).toBe(min)
   })
 
   it('Shutdown test', async () => {
@@ -93,13 +93,13 @@ describe('Dynamic cluster pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.length).toBe(min)
+    expect(longRunningPool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.length).toBe(max)
+    expect(longRunningPool.workerNodes.length).toBe(max)
     await TestUtils.waitExits(longRunningPool, max - min)
-    expect(longRunningPool.workers.length).toBe(min)
+    expect(longRunningPool.workerNodes.length).toBe(min)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
@@ -115,14 +115,14 @@ describe('Dynamic cluster pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.length).toBe(min)
+    expect(longRunningPool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.length).toBe(max)
+    expect(longRunningPool.workerNodes.length).toBe(max)
     await TestUtils.sleep(1500)
-    // Here we expect the workers to be at the max size since the task is still running
-    expect(longRunningPool.workers.length).toBe(max)
+    // Here we expect the workerNodes to be at the max size since the task is still running
+    expect(longRunningPool.workerNodes.length).toBe(max)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
index 5a7394a57aa5fe5abdf643a43d6bfcf48d35f67a..796c84baba1feb68953e8dd24b3eb610ac898382 100644 (file)
@@ -45,7 +45,7 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.ROUND_ROBIN
-      ).nextWorkerId
+      ).nextWorkerNodeId
     ).toBe(0)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -168,13 +168,13 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.ROUND_ROBIN
-      ).nextWorkerId
+      ).nextWorkerNodeId
     ).toBeDefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.ROUND_ROBIN
-      ).nextWorkerId
+      ).nextWorkerNodeId
     ).toBe(0)
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -186,13 +186,13 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.ROUND_ROBIN
-      ).nextWorkerId
+      ).nextWorkerNodeId
     ).toBeDefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.ROUND_ROBIN
-      ).nextWorkerId
+      ).nextWorkerNodeId
     ).toBe(0)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -395,18 +395,18 @@ describe('Selection strategies test suite', () => {
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.FAIR_SHARE
     )
-    for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(WorkerChoiceStrategies.FAIR_SHARE)
       .workerLastVirtualTaskTimestamp.keys()) {
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.FAIR_SHARE)
-          .workerLastVirtualTaskTimestamp.get(workerKey).start
+          .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
       ).toBe(0)
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.FAIR_SHARE)
-          .workerLastVirtualTaskTimestamp.get(workerKey).end
+          .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
       ).toBe(0)
     }
     // We need to clean up the resources after our test
@@ -477,7 +477,7 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.FAIR_SHARE
       ).workerLastVirtualTaskTimestamp.size
-    ).toBe(pool.workers.length)
+    ).toBe(pool.workerNodes.length)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -501,7 +501,7 @@ describe('Selection strategies test suite', () => {
     //     pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
     //       WorkerChoiceStrategies.FAIR_SHARE
     //     ).workerLastVirtualTaskTimestamp.size
-    //   ).toBe(pool.workers.length)
+    //   ).toBe(pool.workerNodes.length)
     // }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -518,18 +518,18 @@ describe('Selection strategies test suite', () => {
       ).workerLastVirtualTaskTimestamp
     ).toBeDefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
-    for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(WorkerChoiceStrategies.FAIR_SHARE)
       .workerLastVirtualTaskTimestamp.keys()) {
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.FAIR_SHARE)
-          .workerLastVirtualTaskTimestamp.get(workerKey).start
+          .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
       ).toBe(0)
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.FAIR_SHARE)
-          .workerLastVirtualTaskTimestamp.get(workerKey).end
+          .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
       ).toBe(0)
     }
     await pool.destroy()
@@ -544,18 +544,18 @@ describe('Selection strategies test suite', () => {
       ).workerLastVirtualTaskTimestamp
     ).toBeDefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
-    for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(WorkerChoiceStrategies.FAIR_SHARE)
       .workerLastVirtualTaskTimestamp.keys()) {
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.FAIR_SHARE)
-          .workerLastVirtualTaskTimestamp.get(workerKey).start
+          .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
       ).toBe(0)
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.FAIR_SHARE)
-          .workerLastVirtualTaskTimestamp.get(workerKey).end
+          .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
       ).toBe(0)
     }
     // We need to clean up the resources after our test
@@ -574,25 +574,25 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-      ).currentWorkerId
+      ).currentWorkerNodeId
     ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
       ).defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
       .workersTaskRunTime.keys()) {
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
-          .workersTaskRunTime.get(workerKey).weight
+          .workersTaskRunTime.get(workerNodeKey).weight
       ).toBeGreaterThan(0)
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
-          .workersTaskRunTime.get(workerKey).runTime
+          .workersTaskRunTime.get(workerNodeKey).runTime
       ).toBe(0)
     }
     // We need to clean up the resources after our test
@@ -663,7 +663,7 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
       ).workersTaskRunTime.size
-    ).toBe(pool.workers.length)
+    ).toBe(pool.workerNodes.length)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -690,7 +690,7 @@ describe('Selection strategies test suite', () => {
         pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
           WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
         ).workersTaskRunTime.size
-      ).toBe(pool.workers.length)
+      ).toBe(pool.workerNodes.length)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -704,7 +704,7 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-      ).currentWorkerId
+      ).currentWorkerNodeId
     ).toBeDefined()
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -720,20 +720,20 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-      ).currentWorkerId
+      ).currentWorkerNodeId
     ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
       ).defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
       .workersTaskRunTime.keys()) {
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
-          .workersTaskRunTime.get(workerKey).runTime
+          .workersTaskRunTime.get(workerNodeKey).runTime
       ).toBe(0)
     }
     await pool.destroy()
@@ -745,7 +745,7 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-      ).currentWorkerId
+      ).currentWorkerNodeId
     ).toBeDefined()
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -761,20 +761,20 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-      ).currentWorkerId
+      ).currentWorkerNodeId
     ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
       ).defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
       .workersTaskRunTime.keys()) {
       expect(
         pool.workerChoiceStrategyContext.workerChoiceStrategies
           .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
-          .workersTaskRunTime.get(workerKey).runTime
+          .workersTaskRunTime.get(workerNodeKey).runTime
       ).toBe(0)
     }
     // We need to clean up the resources after our test
index cc3e66a49e752b349e4ae8153facbddfadca81ac..05f0bb4c3a6497c246a52621e16c9a8d62430739 100644 (file)
@@ -34,7 +34,7 @@ describe('Weighted round robin strategy worker choice strategy test suite', () =
       .returns()
     const resetResult = strategy.reset()
     expect(resetResult).toBe(true)
-    expect(strategy.currentWorkerId).toBe(0)
+    expect(strategy.currentWorkerNodeId).toBe(0)
     expect(workersTaskRunTimeClearStub.calledOnce).toBe(true)
     expect(initWorkersTaskRunTimeStub.calledOnce).toBe(true)
   })
index 9f69860aa408e42c444434dadcaf6f6ca7e1bfe5..feb4955d9a9951fbd2dc341028acb76bc638c661 100644 (file)
@@ -32,8 +32,8 @@ describe('Dynamic thread pool test suite', () => {
     for (let i = 0; i < max * 2; i++) {
       pool.execute()
     }
-    expect(pool.workers.length).toBeLessThanOrEqual(max)
-    expect(pool.workers.length).toBeGreaterThan(min)
+    expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
+    expect(pool.workerNodes.length).toBeGreaterThan(min)
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
@@ -42,19 +42,19 @@ describe('Dynamic thread pool test suite', () => {
   })
 
   it('Verify scale thread up and down is working', async () => {
-    expect(pool.workers.length).toBe(min)
+    expect(pool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.length).toBe(max)
+    expect(pool.workerNodes.length).toBe(max)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.length).toBe(min)
+    expect(pool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.length).toBe(max)
+    expect(pool.workerNodes.length).toBe(max)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.length).toBe(min)
+    expect(pool.workerNodes.length).toBe(min)
   })
 
   it('Shutdown test', async () => {
@@ -93,13 +93,13 @@ describe('Dynamic thread pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.length).toBe(min)
+    expect(longRunningPool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.length).toBe(max)
+    expect(longRunningPool.workerNodes.length).toBe(max)
     await TestUtils.waitExits(longRunningPool, max - min)
-    expect(longRunningPool.workers.length).toBe(min)
+    expect(longRunningPool.workerNodes.length).toBe(min)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
@@ -115,14 +115,14 @@ describe('Dynamic thread pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.length).toBe(min)
+    expect(longRunningPool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.length).toBe(max)
+    expect(longRunningPool.workerNodes.length).toBe(max)
     await TestUtils.sleep(1500)
-    // Here we expect the workers to be at the max size since the task is still running
-    expect(longRunningPool.workers.length).toBe(max)
+    // Here we expect the workerNodes to be at the max size since the task is still running
+    expect(longRunningPool.workerNodes.length).toBe(max)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
index 894471697cb42cc408c906021ee28e87147fcfbc..dc8d5941f51b86da8433d239687f9cb061eb5cb3 100644 (file)
@@ -4,8 +4,8 @@ class TestUtils {
   static async waitExits (pool, numberOfExitEventsToWait) {
     return new Promise(resolve => {
       let exitEvents = 0
-      for (const workerItem of pool.workers) {
-        workerItem.worker.on('exit', () => {
+      for (const workerNode of pool.workerNodes) {
+        workerNode.worker.on('exit', () => {
           ++exitEvents
           if (exitEvents === numberOfExitEventsToWait) {
             resolve(exitEvents)