perf: use worker key as much as possible instead of a reference to the
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Apr 2023 15:13:07 +0000 (17:13 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Apr 2023 15:13:07 +0000 (17:13 +0200)
worker instance

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
16 files changed:
src/pools/abstract-pool.ts
src/pools/pool-internal.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/less-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/selection-strategies-utils.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/utility-types.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/selection-strategies/worker-choice-strategy-context.test.js

index 62def61e6dd28a1f042280f3a1a676bd07f226a2..172feda0555cac95e453860ebddda218e5a5755d 100644 (file)
@@ -35,12 +35,14 @@ export abstract class AbstractPool<
    * The promise response map.
    *
    * - `key`: The message id of each submitted task.
-   * - `value`: An object that contains the worker key, the promise resolve and reject callbacks.
+   * - `value`: An object that contains the worker, the promise resolve and reject callbacks.
    *
    * When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message.
    */
-  protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
-    new Map<string, PromiseResponseWrapper<Response>>()
+  protected promiseResponseMap: Map<
+  string,
+  PromiseResponseWrapper<Worker, Response>
+  > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
 
   /**
    * Worker choice strategy instance implementing the worker choice algorithm.
@@ -83,17 +85,17 @@ export abstract class AbstractPool<
     this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       this,
       () => {
-        const workerCreated = this.createAndSetupWorker()
-        this.registerWorkerMessageListener(workerCreated, message => {
+        const createdWorker = this.createAndSetupWorker()
+        this.registerWorkerMessageListener(createdWorker, message => {
           if (
             isKillBehavior(KillBehaviors.HARD, message.kill) ||
-            this.getWorkerTasksUsage(workerCreated)?.running === 0
+            this.getWorkerTasksUsage(createdWorker)?.running === 0
           ) {
             // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-            void this.destroyWorker(workerCreated)
+            void this.destroyWorker(createdWorker)
           }
         })
-        return workerCreated
+        return this.getWorkerKey(createdWorker)
       },
       this.opts.workerChoiceStrategy
     )
@@ -155,8 +157,8 @@ export abstract class AbstractPool<
     workerChoiceStrategy: WorkerChoiceStrategy
   ): void {
     this.opts.workerChoiceStrategy = workerChoiceStrategy
-    for (const workerItem of this.workers) {
-      this.setWorker(workerItem.worker, {
+    for (const [index, workerItem] of this.workers.entries()) {
+      this.setWorker(index, workerItem.worker, {
         run: 0,
         running: 0,
         runTime: 0,
@@ -175,26 +177,23 @@ export abstract class AbstractPool<
   protected internalGetBusyStatus (): boolean {
     return (
       this.numberOfRunningTasks >= this.numberOfWorkers &&
-      this.findFreeWorker() === false
+      this.findFreeWorkerKey() === false
     )
   }
 
   /** {@inheritDoc} */
-  public findFreeWorker (): Worker | false {
-    for (const workerItem of this.workers) {
-      if (workerItem.tasksUsage.running === 0) {
-        // A worker is free, return the matching worker
-        return workerItem.worker
-      }
-    }
-    return false
+  public findFreeWorkerKey (): number | false {
+    const freeWorkerKey = this.workers.findIndex(workerItem => {
+      return workerItem.tasksUsage.running === 0
+    })
+    return freeWorkerKey !== -1 ? freeWorkerKey : false
   }
 
   /** {@inheritDoc} */
   public async execute (data: Data): Promise<Response> {
-    const worker = this.chooseWorker()
+    const [workerKey, worker] = this.chooseWorker()
     const messageId = crypto.randomUUID()
-    const res = this.internalExecute(this.getWorkerKey(worker), messageId)
+    const res = this.internalExecute(workerKey, worker, messageId)
     this.checkAndEmitBusy()
     this.sendToWorker(worker, {
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@ -248,14 +247,14 @@ export abstract class AbstractPool<
    * Hook executed after the worker task promise resolution.
    * Can be overridden.
    *
-   * @param workerKey - The worker key.
+   * @param worker - The worker.
    * @param message - The received message.
    */
   protected afterPromiseResponseHook (
-    workerKey: number,
+    worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerTasksUsage = this.workers[workerKey].tasksUsage
+    const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage
     --workerTasksUsage.running
     ++workerTasksUsage.run
     if (message.error != null) {
@@ -287,10 +286,11 @@ export abstract class AbstractPool<
    *
    * The default implementation uses a round robin algorithm to distribute the load.
    *
-   * @returns Worker.
+   * @returns [worker key, worker].
    */
-  protected chooseWorker (): Worker {
-    return this.workerChoiceStrategyContext.execute()
+  protected chooseWorker (): [number, Worker] {
+    const workerKey = this.workerChoiceStrategyContext.execute()
+    return [workerKey, this.workers[workerKey].worker]
   }
 
   /**
@@ -344,7 +344,7 @@ export abstract class AbstractPool<
       this.removeWorker(worker)
     })
 
-    this.setWorker(worker, {
+    this.pushWorker(worker, {
       run: 0,
       running: 0,
       runTime: 0,
@@ -372,7 +372,7 @@ export abstract class AbstractPool<
           } else {
             promiseResponse.resolve(message.data as Response)
           }
-          this.afterPromiseResponseHook(promiseResponse.workerKey, message)
+          this.afterPromiseResponseHook(promiseResponse.worker, message)
           this.promiseResponseMap.delete(message.id)
         }
       }
@@ -381,11 +381,12 @@ export abstract class AbstractPool<
 
   private async internalExecute (
     workerKey: number,
+    worker: Worker,
     messageId: string
   ): Promise<Response> {
     this.beforePromiseResponseHook(workerKey)
     return await new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(messageId, { resolve, reject, workerKey })
+      this.promiseResponseMap.set(messageId, { resolve, reject, worker })
     })
   }
 
@@ -395,8 +396,13 @@ export abstract class AbstractPool<
     }
   }
 
-  /** {@inheritDoc} */
-  public getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
+  /**
+   * Gets worker tasks usage.
+   *
+   * @param worker - The worker.
+   * @returns The worker tasks usage.
+   */
+  private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
     const workerKey = this.getWorkerKey(worker)
     if (workerKey !== -1) {
       return this.workers[workerKey].tasksUsage
@@ -405,15 +411,33 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Sets the given worker.
+   * Pushes the given worker.
    *
    * @param worker - The worker.
    * @param tasksUsage - The worker tasks usage.
    */
-  private setWorker (worker: Worker, tasksUsage: TasksUsage): void {
+  private pushWorker (worker: Worker, tasksUsage: TasksUsage): void {
     this.workers.push({
       worker,
       tasksUsage
     })
   }
+
+  /**
+   * Sets the given worker.
+   *
+   * @param workerKey - The worker key.
+   * @param worker - The worker.
+   * @param tasksUsage - The worker tasks usage.
+   */
+  private setWorker (
+    workerKey: number,
+    worker: Worker,
+    tasksUsage: TasksUsage
+  ): void {
+    this.workers[workerKey] = {
+      worker,
+      tasksUsage
+    }
+  }
 }
index bd75cf4f04aa766020cca4c56b33706482e131cc..261a9d7beeb862258ebabceef5b69a71df091180 100644 (file)
@@ -67,21 +67,13 @@ export interface IPoolInternal<
   readonly numberOfRunningTasks: number
 
   /**
-   * Finds a free worker based on the number of tasks the worker has applied.
+   * Finds a free worker key based on the number of tasks the worker has applied.
    *
-   * If a worker is found with `0` running tasks, it is detected as free and returned.
+   * If a worker is found with `0` running tasks, it is detected as free and its key is returned.
    *
    * If no free worker is found, `false` is returned.
    *
-   * @returns A free worker if there is one, otherwise `false`.
+   * @returns A worker key if there is one, otherwise `false`.
    */
-  findFreeWorker: () => Worker | false
-
-  /**
-   * Gets worker tasks usage.
-   *
-   * @param worker - The worker.
-   * @returns The tasks usage on the worker.
-   */
-  getWorkerTasksUsage: (worker: Worker) => TasksUsage | undefined
+  findFreeWorkerKey: () => number | false
 }
index ebe74dcb07543b7c7698e2c2f284c33893b6e9ca..5b526331824be72af5048eb8c76fe26b001a373a 100644 (file)
@@ -17,7 +17,7 @@ export abstract class AbstractWorkerChoiceStrategy<
   Worker extends IPoolWorker,
   Data,
   Response
-> implements IWorkerChoiceStrategy<Worker> {
+> implements IWorkerChoiceStrategy {
   /** {@inheritDoc} */
   public readonly isDynamicPool: boolean
   /** {@inheritDoc} */
@@ -40,5 +40,5 @@ export abstract class AbstractWorkerChoiceStrategy<
   public abstract reset (): boolean
 
   /** {@inheritDoc} */
-  public abstract choose (): Worker
+  public abstract choose (): number
 }
index 5f07d60794b07e9c4eb9846f4d9ea9d1847a1caa..c122c2b11e418aa0da2e2cdef34c344e520be163 100644 (file)
@@ -20,18 +20,18 @@ export class DynamicPoolWorkerChoiceStrategy<
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
-  private readonly workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
+  private readonly workerChoiceStrategy: IWorkerChoiceStrategy
 
   /**
    * Constructs a worker choice strategy for dynamic pool.
    *
    * @param pool - The pool instance.
-   * @param createDynamicallyWorkerCallback - The worker creation callback for dynamic pool.
-   * @param workerChoiceStrategy - The worker choice strategy when the pull is busy.
+   * @param createWorkerCallback - The worker creation callback for dynamic pool.
+   * @param workerChoiceStrategy - The worker choice strategy when the pool is busy.
    */
   public constructor (
     pool: IPoolInternal<Worker, Data, Response>,
-    private readonly createDynamicallyWorkerCallback: () => Worker,
+    private readonly createWorkerCallback: () => number,
     workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
   ) {
     super(pool)
@@ -48,10 +48,10 @@ export class DynamicPoolWorkerChoiceStrategy<
   }
 
   /** {@inheritDoc} */
-  public choose (): Worker {
-    const freeWorker = this.pool.findFreeWorker()
-    if (freeWorker !== false) {
-      return freeWorker
+  public choose (): number {
+    const freeWorkerKey = this.pool.findFreeWorkerKey()
+    if (freeWorkerKey !== false) {
+      return freeWorkerKey
     }
 
     if (this.pool.busy) {
@@ -59,6 +59,6 @@ export class DynamicPoolWorkerChoiceStrategy<
     }
 
     // All workers are busy, create a new worker
-    return this.createDynamicallyWorkerCallback()
+    return this.createWorkerCallback()
   }
 }
index 48198371b54a7d2f8807dcfdd10e53cdc10a579f..af8eb75d03995d1eb37eeb8daf8d44a8cb6d9d70 100644 (file)
@@ -32,9 +32,9 @@ export class FairShareWorkerChoiceStrategy<
    *  Worker last virtual task execution timestamp.
    */
   private readonly workerLastVirtualTaskTimestamp: Map<
-  Worker,
+  number,
   WorkerVirtualTaskTimestamp
-  > = new Map<Worker, WorkerVirtualTaskTimestamp>()
+  > = new Map<number, WorkerVirtualTaskTimestamp>()
 
   /** {@inheritDoc} */
   public reset (): boolean {
@@ -43,39 +43,38 @@ export class FairShareWorkerChoiceStrategy<
   }
 
   /** {@inheritDoc} */
-  public choose (): Worker {
+  public choose (): number {
     let minWorkerVirtualTaskEndTimestamp = Infinity
-    let chosenWorker!: Worker
-    for (const workerItem of this.pool.workers) {
-      const worker = workerItem.worker
-      this.computeWorkerLastVirtualTaskTimestamp(worker)
+    let chosenWorkerKey!: number
+    for (const [index] of this.pool.workers.entries()) {
+      this.computeWorkerLastVirtualTaskTimestamp(index)
       const workerLastVirtualTaskEndTimestamp =
-        this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0
+        this.workerLastVirtualTaskTimestamp.get(index)?.end ?? 0
       if (
         workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
       ) {
         minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp
-        chosenWorker = worker
+        chosenWorkerKey = index
       }
     }
-    return chosenWorker
+    return chosenWorkerKey
   }
 
   /**
    * Computes worker last virtual task timestamp.
    *
-   * @param worker - The worker.
+   * @param workerKey - The worker key.
    */
-  private computeWorkerLastVirtualTaskTimestamp (worker: Worker): void {
+  private computeWorkerLastVirtualTaskTimestamp (workerKey: number): void {
     const workerVirtualTaskStartTimestamp = Math.max(
       Date.now(),
-      this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? -Infinity
+      this.workerLastVirtualTaskTimestamp.get(workerKey)?.end ?? -Infinity
     )
-    this.workerLastVirtualTaskTimestamp.set(worker, {
+    this.workerLastVirtualTaskTimestamp.set(workerKey, {
       start: workerVirtualTaskStartTimestamp,
       end:
         workerVirtualTaskStartTimestamp +
-        (this.pool.getWorkerTasksUsage(worker)?.avgRunTime ?? 0)
+        (this.pool.workers[workerKey].tasksUsage.avgRunTime ?? 0)
     })
   }
 }
index 31e7881a2d214d9be2b36a7fcafd909a667d3a89..2bbafb59ec5b07dfca9001f587cb4878af87dca6 100644 (file)
@@ -25,20 +25,18 @@ export class LessBusyWorkerChoiceStrategy<
   }
 
   /** {@inheritDoc} */
-  public choose (): Worker {
+  public choose (): number {
     let minRunTime = Infinity
-    let lessBusyWorker!: Worker
-    for (const workerItem of this.pool.workers) {
-      const worker = workerItem.worker
-      const workerRunTime = this.pool.getWorkerTasksUsage(worker)
-        ?.runTime as number
+    let lessBusyWorkerKey!: number
+    for (const [index, workerItem] of this.pool.workers.entries()) {
+      const workerRunTime = workerItem.tasksUsage.runTime
       if (!this.isDynamicPool && workerRunTime === 0) {
-        return worker
+        return index
       } else if (workerRunTime < minRunTime) {
         minRunTime = workerRunTime
-        lessBusyWorker = worker
+        lessBusyWorkerKey = index
       }
     }
-    return lessBusyWorker
+    return lessBusyWorkerKey
   }
 }
index 833e605babc9cb0b2f1fcf46d4a823b291e5fa73..2ed58c96eb3596dfbfafd6416b82b2e51f7e5a31 100644 (file)
@@ -19,21 +19,19 @@ export class LessUsedWorkerChoiceStrategy<
   }
 
   /** {@inheritDoc} */
-  public choose (): Worker {
+  public choose (): number {
     let minNumberOfTasks = Infinity
-    let lessUsedWorker!: Worker
-    for (const workerItem of this.pool.workers) {
-      const worker = workerItem.worker
-      const tasksUsage = this.pool.getWorkerTasksUsage(worker)
-      const workerTasks =
-        (tasksUsage?.run as number) + (tasksUsage?.running as number)
+    let lessUsedWorkerKey!: number
+    for (const [index, workerItem] of this.pool.workers.entries()) {
+      const tasksUsage = workerItem.tasksUsage
+      const workerTasks = tasksUsage?.run + tasksUsage?.running
       if (!this.isDynamicPool && workerTasks === 0) {
-        return worker
+        return index
       } else if (workerTasks < minNumberOfTasks) {
         minNumberOfTasks = workerTasks
-        lessUsedWorker = worker
+        lessUsedWorkerKey = index
       }
     }
-    return lessUsedWorker
+    return lessUsedWorkerKey
   }
 }
index 9bf49a3c3597bef69d5c4ef5b0408ccfccc5206a..e265172fad11beede47047155d360b38c5cff4a0 100644 (file)
@@ -25,12 +25,12 @@ export class RoundRobinWorkerChoiceStrategy<
   }
 
   /** {@inheritDoc} */
-  public choose (): Worker {
-    const chosenWorker = this.pool.workers[this.nextWorkerId].worker
+  public choose (): number {
+    const chosenWorkerKey = this.nextWorkerId
     this.nextWorkerId =
       this.nextWorkerId === this.pool.workers.length - 1
         ? 0
         : this.nextWorkerId + 1
-    return chosenWorker
+    return chosenWorkerKey
   }
 }
index 9ae6676550be28564a12eae35f6e64ce15db640d..a05fd9b784c9da682cd0e270f09b81ce41325d70 100644 (file)
@@ -1,5 +1,3 @@
-import type { IPoolWorker } from '../pool-worker'
-
 /**
  * Enumeration of worker choice strategies.
  */
@@ -40,10 +38,8 @@ export interface RequiredStatistics {
 
 /**
  * Worker choice strategy interface.
- *
- * @typeParam Worker - Type of worker which manages the strategy.
  */
-export interface IWorkerChoiceStrategy<Worker extends IPoolWorker> {
+export interface IWorkerChoiceStrategy {
   /**
    * Is the pool attached to the strategy dynamic?.
    */
@@ -57,7 +53,7 @@ export interface IWorkerChoiceStrategy<Worker extends IPoolWorker> {
    */
   reset: () => boolean
   /**
-   * Chooses a worker in the pool.
+   * Chooses a worker in the pool and returns its key.
    */
-  choose: () => Worker
+  choose: () => number
 }
index 1330ec19bcf5f182df56c213715358bd4659e847..10c93335ea98cd1c3b019addb88603ed2749672e 100644 (file)
@@ -25,18 +25,20 @@ export function getWorkerChoiceStrategy<
 > (
   pool: IPoolInternal<Worker, Data, Response>,
   workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-): IWorkerChoiceStrategy<Worker> {
+): IWorkerChoiceStrategy {
   switch (workerChoiceStrategy) {
     case WorkerChoiceStrategies.ROUND_ROBIN:
-      return new RoundRobinWorkerChoiceStrategy(pool)
+      return new RoundRobinWorkerChoiceStrategy<Worker, Data, Response>(pool)
     case WorkerChoiceStrategies.LESS_USED:
-      return new LessUsedWorkerChoiceStrategy(pool)
+      return new LessUsedWorkerChoiceStrategy<Worker, Data, Response>(pool)
     case WorkerChoiceStrategies.LESS_BUSY:
-      return new LessBusyWorkerChoiceStrategy(pool)
+      return new LessBusyWorkerChoiceStrategy<Worker, Data, Response>(pool)
     case WorkerChoiceStrategies.FAIR_SHARE:
-      return new FairShareWorkerChoiceStrategy(pool)
+      return new FairShareWorkerChoiceStrategy<Worker, Data, Response>(pool)
     case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
-      return new WeightedRoundRobinWorkerChoiceStrategy(pool)
+      return new WeightedRoundRobinWorkerChoiceStrategy<Worker, Data, Response>(
+        pool
+      )
     default:
       throw new Error(
         // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
index 5c5e6474eb3c88bf2838bca9be65add4081194ef..e3d2be3f3e1c20d158d70dc0e85943e59bfeac4c 100644 (file)
@@ -41,8 +41,8 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   /**
    * Per worker virtual task runtime map.
    */
-  private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
-  Worker,
+  private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
+  number,
   TaskRunTime
   >()
 
@@ -66,60 +66,56 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   }
 
   /** {@inheritDoc} */
-  public choose (): Worker {
-    const chosenWorker = this.pool.workers[this.currentWorkerId].worker
-    if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorker)) {
-      this.initWorkerTaskRunTime(chosenWorker)
+  public choose (): number {
+    const chosenWorkerKey = this.currentWorkerId
+    if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
+      this.initWorkerTaskRunTime(chosenWorkerKey)
     }
     const workerTaskRunTime =
-      this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0
+      this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
     const workerTaskWeight =
-      this.workersTaskRunTime.get(chosenWorker)?.weight ??
+      this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
       this.defaultWorkerWeight
     if (workerTaskRunTime < workerTaskWeight) {
       this.setWorkerTaskRunTime(
-        chosenWorker,
+        chosenWorkerKey,
         workerTaskWeight,
         workerTaskRunTime +
-          (this.getWorkerVirtualTaskRunTime(chosenWorker) ?? 0)
+          (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
       )
     } else {
       this.currentWorkerId =
         this.currentWorkerId === this.pool.workers.length - 1
           ? 0
           : this.currentWorkerId + 1
-      this.setWorkerTaskRunTime(
-        this.pool.workers[this.currentWorkerId].worker,
-        workerTaskWeight,
-        0
-      )
+      this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
     }
-    return chosenWorker
+    return chosenWorkerKey
   }
 
   private initWorkersTaskRunTime (): void {
-    for (const workerItem of this.pool.workers) {
-      this.initWorkerTaskRunTime(workerItem.worker)
+    for (const [index] of this.pool.workers.entries()) {
+      this.initWorkerTaskRunTime(index)
     }
   }
 
-  private initWorkerTaskRunTime (worker: Worker): void {
-    this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0)
+  private initWorkerTaskRunTime (workerKey: number): void {
+    this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
   }
 
   private setWorkerTaskRunTime (
-    worker: Worker,
+    workerKey: number,
     weight: number,
     runTime: number
   ): void {
-    this.workersTaskRunTime.set(worker, {
+    this.workersTaskRunTime.set(workerKey, {
       weight,
       runTime
     })
   }
 
-  private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
-    return this.pool.getWorkerTasksUsage(worker)?.avgRunTime
+  private getWorkerVirtualTaskRunTime (workerKey: number): number {
+    return this.pool.workers[workerKey].tasksUsage.avgRunTime
   }
 
   private computeWorkerWeight (): number {
index 1bba1a55eccdc82a02d6c0849c2413c68b70b9dc..c149cc2e9b48a7a6dae62466aea312fd9054e6dc 100644 (file)
@@ -21,18 +21,18 @@ export class WorkerChoiceStrategyContext<
   Data,
   Response
 > {
-  private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
+  private workerChoiceStrategy!: IWorkerChoiceStrategy
 
   /**
    * Worker choice strategy context constructor.
    *
    * @param pool - The pool instance.
-   * @param createDynamicallyWorkerCallback - The worker creation callback for dynamic pool.
+   * @param createWorkerCallback - The worker creation callback for dynamic pool.
    * @param workerChoiceStrategy - The worker choice strategy.
    */
   public constructor (
     private readonly pool: IPoolInternal<Worker, Data, Response>,
-    private readonly createDynamicallyWorkerCallback: () => Worker,
+    private readonly createWorkerCallback: () => number,
     workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
   ) {
     this.setWorkerChoiceStrategy(workerChoiceStrategy)
@@ -46,11 +46,11 @@ export class WorkerChoiceStrategyContext<
    */
   private getPoolWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-  ): IWorkerChoiceStrategy<Worker> {
+  ): IWorkerChoiceStrategy {
     if (this.pool.type === PoolType.DYNAMIC) {
       return new DynamicPoolWorkerChoiceStrategy(
         this.pool,
-        this.createDynamicallyWorkerCallback,
+        this.createWorkerCallback,
         workerChoiceStrategy
       )
     }
@@ -62,7 +62,7 @@ export class WorkerChoiceStrategyContext<
    *
    * @returns The worker choice strategy.
    */
-  public getWorkerChoiceStrategy (): IWorkerChoiceStrategy<Worker> {
+  public getWorkerChoiceStrategy (): IWorkerChoiceStrategy {
     return this.workerChoiceStrategy
   }
 
@@ -82,9 +82,9 @@ export class WorkerChoiceStrategyContext<
   /**
    * Chooses a worker with the underlying selection strategy.
    *
-   * @returns The chosen one.
+   * @returns The key of the chosen one.
    */
-  public execute (): Worker {
+  public execute (): number {
     return this.workerChoiceStrategy.choose()
   }
 }
index 54bb7461f6fa5cafb2cb04270ce732b093d8ce53..0fdaa36e19c34e1ace772866e174c4d5b6905381 100644 (file)
@@ -1,6 +1,7 @@
 import type { Worker as ClusterWorker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
 import type { KillBehavior } from './worker/worker-options'
+import type { IPoolWorker } from './pools/pool-worker'
 
 /**
  * Make all properties in T non-readonly.
@@ -47,7 +48,10 @@ export interface MessageValue<
  *
  * @typeParam Response - Type of execution response. This can only be serializable data.
  */
-export interface PromiseResponseWrapper<Response = unknown> {
+export interface PromiseResponseWrapper<
+  Worker extends IPoolWorker,
+  Response = unknown
+> {
   /**
    * Resolve callback to fulfill the promise.
    */
@@ -57,7 +61,7 @@ export interface PromiseResponseWrapper<Response = unknown> {
    */
   readonly reject: (reason?: string) => void
   /**
-   * The worker handling the promise key .
+   * The worker handling the promise.
    */
-  readonly workerKey: number
+  readonly worker: Worker
 }
index a87c8934701bd442af287fcdc91e27af33eeda41..5bb5cde50cfbc16bbb439e101cb20ad2a8f23add 100644 (file)
@@ -14,7 +14,7 @@ describe('Abstract pool test suite', () => {
   class StubPoolWithRemoveAllWorker extends FixedThreadPool {
     removeAllWorker () {
       this.workers = []
-      this.promiseMap.clear()
+      this.promiseResponseMap.clear()
     }
   }
   class StubPoolWithIsMain extends FixedThreadPool {
@@ -145,6 +145,7 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBe(0)
       expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+      expect(workerItem.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
   })
@@ -164,6 +165,7 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
       expect(workerItem.tasksUsage.runTime).toBe(0)
       expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+      expect(workerItem.tasksUsage.error).toBe(0)
     }
     await Promise.all(promises)
     for (const workerItem of pool.workers) {
@@ -172,6 +174,7 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
       expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
   })
@@ -193,6 +196,7 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
       expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.error).toBe(0)
     }
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
     for (const workerItem of pool.workers) {
@@ -201,6 +205,7 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBe(0)
       expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+      expect(workerItem.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
   })
index ac3e27204697e10663542ab3701e67fa47340269..a62571797c08010ef6da62b947c5a81e67af1e84 100644 (file)
@@ -134,14 +134,14 @@ describe('Selection strategies test suite', () => {
     )
     let results = new Set()
     for (let i = 0; i < max; i++) {
-      results.add(pool.chooseWorker().id)
+      results.add(pool.chooseWorker()[1].id)
     }
     expect(results.size).toBe(max)
     await pool.destroy()
     pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js')
     results = new Set()
     for (let i = 0; i < max; i++) {
-      results.add(pool.chooseWorker().threadId)
+      results.add(pool.chooseWorker()[1].threadId)
     }
     expect(results.size).toBe(max)
     await pool.destroy()
@@ -357,18 +357,18 @@ describe('Selection strategies test suite', () => {
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.FAIR_SHARE
     )
-    for (const worker of pool.workerChoiceStrategyContext
+    for (const workerKey of pool.workerChoiceStrategyContext
       .getWorkerChoiceStrategy()
       .workerLastVirtualTaskTimestamp.keys()) {
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workerLastVirtualTaskTimestamp.get(worker).start
+          .workerLastVirtualTaskTimestamp.get(workerKey).start
       ).toBe(0)
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workerLastVirtualTaskTimestamp.get(worker).end
+          .workerLastVirtualTaskTimestamp.get(workerKey).end
       ).toBe(0)
     }
     // We need to clean up the resources after our test
@@ -456,18 +456,18 @@ describe('Selection strategies test suite', () => {
         .workerLastVirtualTaskTimestamp
     ).toBeUndefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
-    for (const worker of pool.workerChoiceStrategyContext
+    for (const workerKey of pool.workerChoiceStrategyContext
       .getWorkerChoiceStrategy()
       .workerLastVirtualTaskTimestamp.keys()) {
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workerLastVirtualTaskTimestamp.get(worker).start
+          .workerLastVirtualTaskTimestamp.get(workerKey).start
       ).toBe(0)
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workerLastVirtualTaskTimestamp.get(worker).end
+          .workerLastVirtualTaskTimestamp.get(workerKey).end
       ).toBe(0)
     }
     await pool.destroy()
@@ -481,18 +481,20 @@ describe('Selection strategies test suite', () => {
         .workerChoiceStrategy.workerLastVirtualTaskTimestamp
     ).toBeUndefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
-    for (const worker of pool.workerChoiceStrategyContext
+    for (const workerKey of pool.workerChoiceStrategyContext
       .getWorkerChoiceStrategy()
       .workerChoiceStrategy.workerLastVirtualTaskTimestamp.keys()) {
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(worker).start
+          .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(workerKey)
+          .start
       ).toBe(0)
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(worker).end
+          .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(workerKey)
+          .end
       ).toBe(0)
     }
     // We need to clean up the resources after our test
@@ -515,18 +517,18 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const worker of pool.workerChoiceStrategyContext
+    for (const workerKey of pool.workerChoiceStrategyContext
       .getWorkerChoiceStrategy()
       .workersTaskRunTime.keys()) {
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workersTaskRunTime.get(worker).weight
+          .workersTaskRunTime.get(workerKey).weight
       ).toBeGreaterThan(0)
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workersTaskRunTime.get(worker).runTime
+          .workersTaskRunTime.get(workerKey).runTime
       ).toBe(0)
     }
     // We need to clean up the resources after our test
@@ -628,13 +630,13 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const worker of pool.workerChoiceStrategyContext
+    for (const workerKey of pool.workerChoiceStrategyContext
       .getWorkerChoiceStrategy()
       .workersTaskRunTime.keys()) {
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workersTaskRunTime.get(worker).runTime
+          .workersTaskRunTime.get(workerKey).runTime
       ).toBe(0)
     }
     await pool.destroy()
@@ -664,13 +666,13 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .workerChoiceStrategy.defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const worker of pool.workerChoiceStrategyContext
+    for (const workerKey of pool.workerChoiceStrategyContext
       .getWorkerChoiceStrategy()
       .workerChoiceStrategy.workersTaskRunTime.keys()) {
       expect(
         pool.workerChoiceStrategyContext
           .getWorkerChoiceStrategy()
-          .workerChoiceStrategy.workersTaskRunTime.get(worker).runTime
+          .workerChoiceStrategy.workersTaskRunTime.get(workerKey).runTime
       ).toBe(0)
     }
     // We need to clean up the resources after our test
index 9380a038800abcb246c532240717940fe735e79f..9ab2babfd31ce26fa6286338ced68d2e4f2ae7ea 100644 (file)
@@ -60,15 +60,15 @@ describe('Worker choice strategy context test suite', () => {
     const WorkerChoiceStrategyStub = sinon.createStubInstance(
       RoundRobinWorkerChoiceStrategy,
       {
-        choose: sinon.stub().returns('worker')
+        choose: sinon.stub().returns(0)
       }
     )
     workerChoiceStrategyContext.workerChoiceStrategy = WorkerChoiceStrategyStub
-    const chosenWorker = workerChoiceStrategyContext.execute()
+    const chosenWorkerKey = workerChoiceStrategyContext.execute()
     expect(
       workerChoiceStrategyContext.getWorkerChoiceStrategy().choose.calledOnce
     ).toBe(true)
-    expect(chosenWorker).toBe('worker')
+    expect(chosenWorkerKey).toBe(0)
   })
 
   it('Verify that execute() return the worker chosen by the strategy with dynamic pool', () => {
@@ -78,15 +78,15 @@ describe('Worker choice strategy context test suite', () => {
     const WorkerChoiceStrategyStub = sinon.createStubInstance(
       RoundRobinWorkerChoiceStrategy,
       {
-        choose: sinon.stub().returns('worker')
+        choose: sinon.stub().returns(0)
       }
     )
     workerChoiceStrategyContext.workerChoiceStrategy = WorkerChoiceStrategyStub
-    const chosenWorker = workerChoiceStrategyContext.execute()
+    const chosenWorkerKey = workerChoiceStrategyContext.execute()
     expect(
       workerChoiceStrategyContext.getWorkerChoiceStrategy().choose.calledOnce
     ).toBe(true)
-    expect(chosenWorker).toBe('worker')
+    expect(chosenWorkerKey).toBe(0)
   })
 
   it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => {