docs: refine README.md
[poolifier.git] / src / pools / abstract-pool.ts
index 88c04a854fde383dfa8f93de6bd3be670a036aa9..4f60dff0279643a864b8c570eaddf3e3af9e7546 100644 (file)
@@ -44,9 +44,10 @@ import {
   type WorkerChoiceStrategy,
   type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types.js'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   checkFilePath,
+  checkValidPriority,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
   getDefaultTasksQueueOptions,
@@ -87,7 +88,7 @@ export abstract class AbstractPool<
   /**
    * The task execution response promise map:
    * - `key`: The message id of each submitted task.
-   * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
+   * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
    *
    * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
    */
@@ -95,9 +96,9 @@ export abstract class AbstractPool<
     new Map<string, PromiseResponseWrapper<Response>>()
 
   /**
-   * Worker choice strategy context referencing a worker choice algorithm implementation.
+   * Worker choice strategies context referencing worker choice algorithms implementation.
    */
-  protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
+  protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
   Worker,
   Data,
   Response
@@ -136,7 +137,7 @@ export abstract class AbstractPool<
   /**
    * The start timestamp of the pool.
    */
-  private readonly startTimestamp
+  private startTimestamp?: number
 
   /**
    * Constructs a new poolifier pool.
@@ -169,13 +170,14 @@ export abstract class AbstractPool<
     if (this.opts.enableEvents === true) {
       this.initializeEventEmitter()
     }
-    this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
+    this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
     Worker,
     Data,
     Response
     >(
       this,
-      this.opts.workerChoiceStrategy,
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      [this.opts.workerChoiceStrategy!],
       this.opts.workerChoiceStrategyOptions
     )
 
@@ -191,8 +193,6 @@ export abstract class AbstractPool<
     if (this.opts.startWorkers === true) {
       this.start()
     }
-
-    this.startTimestamp = performance.now()
   }
 
   private checkPoolType (): void {
@@ -296,13 +296,13 @@ export abstract class AbstractPool<
       started: this.started,
       ready: this.ready,
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      strategy: this.opts.workerChoiceStrategy!,
-      strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
+      defaultStrategy: this.opts.workerChoiceStrategy!,
+      strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
       minSize: this.minimumNumberOfWorkers,
       maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
-      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
         .runTime.aggregate === true &&
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+        this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
           .waitTime.aggregate && {
         utilization: round(this.utilization)
       }),
@@ -365,7 +365,7 @@ export abstract class AbstractPool<
           accumulator + workerNode.usage.tasks.failed,
         0
       ),
-      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
         .runTime.aggregate === true && {
         runTime: {
           minimum: round(
@@ -382,7 +382,7 @@ export abstract class AbstractPool<
               )
             )
           ),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
             .runTime.average && {
             average: round(
               average(
@@ -394,7 +394,7 @@ export abstract class AbstractPool<
               )
             )
           }),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
             .runTime.median && {
             median: round(
               median(
@@ -408,7 +408,7 @@ export abstract class AbstractPool<
           })
         }
       }),
-      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
         .waitTime.aggregate === true && {
         waitTime: {
           minimum: round(
@@ -425,7 +425,7 @@ export abstract class AbstractPool<
               )
             )
           ),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
             .waitTime.average && {
             average: round(
               average(
@@ -437,7 +437,7 @@ export abstract class AbstractPool<
               )
             )
           }),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
             .waitTime.median && {
             median: round(
               median(
@@ -485,6 +485,9 @@ export abstract class AbstractPool<
    * @returns The pool utilization.
    */
   private get utilization (): number {
+    if (this.startTimestamp == null) {
+      return 0
+    }
     const poolTimeCapacity =
       (performance.now() - this.startTimestamp) *
       (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
@@ -546,31 +549,52 @@ export abstract class AbstractPool<
     workerChoiceStrategy: WorkerChoiceStrategy,
     workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ): void {
+    let requireSync = false
     checkValidWorkerChoiceStrategy(workerChoiceStrategy)
-    this.opts.workerChoiceStrategy = workerChoiceStrategy
-    this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
-      this.opts.workerChoiceStrategy
-    )
     if (workerChoiceStrategyOptions != null) {
-      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+      requireSync = !this.setWorkerChoiceStrategyOptions(
+        workerChoiceStrategyOptions
+      )
     }
-    for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
-      workerNode.resetUsage()
-      this.sendStatisticsMessageToWorker(workerNodeKey)
+    if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
+      this.opts.workerChoiceStrategy = workerChoiceStrategy
+      this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
+        this.opts.workerChoiceStrategy,
+        this.opts.workerChoiceStrategyOptions
+      )
+      requireSync = true
+    }
+    if (requireSync) {
+      this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+        this.getWorkerWorkerChoiceStrategies(),
+        this.opts.workerChoiceStrategyOptions
+      )
+      for (const workerNodeKey of this.workerNodes.keys()) {
+        this.sendStatisticsMessageToWorker(workerNodeKey)
+      }
     }
   }
 
   /** @inheritDoc */
   public setWorkerChoiceStrategyOptions (
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
-  ): void {
+  ): boolean {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     if (workerChoiceStrategyOptions != null) {
       this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+      this.workerChoiceStrategiesContext?.setOptions(
+        this.opts.workerChoiceStrategyOptions
+      )
+      this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+        this.getWorkerWorkerChoiceStrategies(),
+        this.opts.workerChoiceStrategyOptions
+      )
+      for (const workerNodeKey of this.workerNodes.keys()) {
+        this.sendStatisticsMessageToWorker(workerNodeKey)
+      }
+      return true
     }
-    this.workerChoiceStrategyContext?.setOptions(
-      this.opts.workerChoiceStrategyOptions
-    )
+    return false
   }
 
   /** @inheritDoc */
@@ -632,13 +656,13 @@ export abstract class AbstractPool<
   }
 
   private setTaskStealing (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
     }
   }
 
   private unsetTaskStealing (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].off(
         'idle',
         this.handleWorkerNodeIdleEvent
@@ -647,7 +671,7 @@ export abstract class AbstractPool<
   }
 
   private setTasksStealingOnBackPressure (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].on(
         'backPressure',
         this.handleWorkerNodeBackPressureEvent
@@ -656,7 +680,7 @@ export abstract class AbstractPool<
   }
 
   private unsetTasksStealingOnBackPressure (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].off(
         'backPressure',
         this.handleWorkerNodeBackPressureEvent
@@ -799,7 +823,7 @@ export abstract class AbstractPool<
           }
         }
       }
-      for (const [workerNodeKey] of this.workerNodes.entries()) {
+      for (const workerNodeKey of this.workerNodes.keys()) {
         this.registerWorkerMessageListener(
           workerNodeKey,
           taskFunctionOperationsListener
@@ -811,17 +835,9 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public hasTaskFunction (name: string): boolean {
-    for (const workerNode of this.workerNodes) {
-      if (
-        Array.isArray(workerNode.info.taskFunctionsProperties) &&
-        workerNode.info.taskFunctionsProperties.some(
-          taskFunctionProperties => taskFunctionProperties.name === name
-        )
-      ) {
-        return true
-      }
-    }
-    return false
+    return this.listTaskFunctionsProperties().some(
+      taskFunctionProperties => taskFunctionProperties.name === name
+    )
   }
 
   /** @inheritDoc */
@@ -841,12 +857,17 @@ export abstract class AbstractPool<
     if (typeof fn.taskFunction !== 'function') {
       throw new TypeError('taskFunction property must be a function')
     }
+    checkValidPriority(fn.priority)
+    checkValidWorkerChoiceStrategy(fn.strategy)
     const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionProperties: buildTaskFunctionProperties(name, fn),
       taskFunction: fn.taskFunction.toString()
     })
     this.taskFunctions.set(name, fn)
+    this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+      this.getWorkerWorkerChoiceStrategies()
+    )
     return opResult
   }
 
@@ -866,6 +887,9 @@ export abstract class AbstractPool<
     })
     this.deleteTaskFunctionWorkerUsages(name)
     this.taskFunctions.delete(name)
+    this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+      this.getWorkerWorkerChoiceStrategies()
+    )
     return opResult
   }
 
@@ -882,6 +906,63 @@ export abstract class AbstractPool<
     return []
   }
 
+  /**
+   * Gets task function strategy, if any.
+   *
+   * @param name - The task function name.
+   * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
+   */
+  private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+    name?: string
+  ): WorkerChoiceStrategy | undefined => {
+    if (name != null) {
+      return this.listTaskFunctionsProperties().find(
+        (taskFunctionProperties: TaskFunctionProperties) =>
+          taskFunctionProperties.name === name
+      )?.strategy
+    }
+  }
+
+  /**
+   * Gets worker node task function priority, if any.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param name - The task function name.
+   * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+   */
+  private readonly getWorkerNodeTaskFunctionPriority = (
+    workerNodeKey: number,
+    name?: string
+  ): number | undefined => {
+    if (name != null) {
+      return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
+        (taskFunctionProperties: TaskFunctionProperties) =>
+          taskFunctionProperties.name === name
+      )?.priority
+    }
+  }
+
+  /**
+   * Gets the worker choice strategies registered in this pool.
+   *
+   * @returns The worker choice strategies.
+   */
+  private readonly getWorkerWorkerChoiceStrategies =
+    (): Set<WorkerChoiceStrategy> => {
+      return new Set([
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        this.opts.workerChoiceStrategy!,
+        ...(this.listTaskFunctionsProperties()
+          .map(
+            (taskFunctionProperties: TaskFunctionProperties) =>
+              taskFunctionProperties.strategy
+          )
+          .filter(
+            (strategy: WorkerChoiceStrategy | undefined) => strategy != null
+          ) as WorkerChoiceStrategy[])
+      ])
+    }
+
   /** @inheritDoc */
   public async setDefaultTaskFunction (name: string): Promise<boolean> {
     return await this.sendTaskFunctionOperationToWorkers({
@@ -940,11 +1021,15 @@ export abstract class AbstractPool<
         return
       }
       const timestamp = performance.now()
-      const workerNodeKey = this.chooseWorkerNode()
+      const taskFunctionStrategy =
+        this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
+      const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+        strategy: taskFunctionStrategy,
         transferList,
         timestamp,
         taskId: randomUUID()
@@ -1003,6 +1088,7 @@ export abstract class AbstractPool<
     }
     this.starting = true
     this.startMinimumNumberOfWorkers()
+    this.startTimestamp = performance.now()
     this.starting = false
     this.started = true
   }
@@ -1027,6 +1113,7 @@ export abstract class AbstractPool<
     this.emitter?.emit(PoolEvents.destroy, this.info)
     this.emitter?.emitDestroy()
     this.readyEventEmitted = false
+    delete this.startTimestamp
     this.destroying = false
     this.started = false
   }
@@ -1111,7 +1198,7 @@ export abstract class AbstractPool<
       const workerUsage = this.workerNodes[workerNodeKey].usage
       ++workerUsage.tasks.executing
       updateWaitTimeWorkerUsage(
-        this.workerChoiceStrategyContext,
+        this.workerChoiceStrategiesContext,
         workerUsage,
         task
       )
@@ -1129,7 +1216,7 @@ export abstract class AbstractPool<
       ].getTaskFunctionWorkerUsage(task.name!)!
       ++taskFunctionWorkerUsage.tasks.executing
       updateWaitTimeWorkerUsage(
-        this.workerChoiceStrategyContext,
+        this.workerChoiceStrategiesContext,
         taskFunctionWorkerUsage,
         task
       )
@@ -1153,12 +1240,12 @@ export abstract class AbstractPool<
       const workerUsage = this.workerNodes[workerNodeKey].usage
       updateTaskStatisticsWorkerUsage(workerUsage, message)
       updateRunTimeWorkerUsage(
-        this.workerChoiceStrategyContext,
+        this.workerChoiceStrategiesContext,
         workerUsage,
         message
       )
       updateEluWorkerUsage(
-        this.workerChoiceStrategyContext,
+        this.workerChoiceStrategiesContext,
         workerUsage,
         message
       )
@@ -1178,19 +1265,19 @@ export abstract class AbstractPool<
       ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
       updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
       updateRunTimeWorkerUsage(
-        this.workerChoiceStrategyContext,
+        this.workerChoiceStrategiesContext,
         taskFunctionWorkerUsage,
         message
       )
       updateEluWorkerUsage(
-        this.workerChoiceStrategyContext,
+        this.workerChoiceStrategiesContext,
         taskFunctionWorkerUsage,
         message
       )
       needWorkerChoiceStrategyUpdate = true
     }
     if (needWorkerChoiceStrategyUpdate) {
-      this.workerChoiceStrategyContext?.update(workerNodeKey)
+      this.workerChoiceStrategiesContext?.update(workerNodeKey)
     }
   }
 
@@ -1210,24 +1297,25 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Chooses a worker node for the next task.
-   *
-   * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
+   * Chooses a worker node for the next task given the worker choice strategy.
    *
+   * @param workerChoiceStrategy - The worker choice strategy.
    * @returns The chosen worker node key
    */
-  private chooseWorkerNode (): number {
+  private chooseWorkerNode (
+    workerChoiceStrategy?: WorkerChoiceStrategy
+  ): number {
     if (this.shallCreateDynamicWorker()) {
       const workerNodeKey = this.createAndSetupDynamicWorkerNode()
       if (
-        this.workerChoiceStrategyContext?.getStrategyPolicy()
-          .dynamicWorkerUsage === true
+        this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+        true
       ) {
         return workerNodeKey
       }
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    return this.workerChoiceStrategyContext!.execute()
+    return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
   }
 
   /**
@@ -1364,10 +1452,10 @@ export abstract class AbstractPool<
     const workerNode = this.workerNodes[workerNodeKey]
     workerNode.info.dynamic = true
     if (
-      this.workerChoiceStrategyContext?.getStrategyPolicy()
-        .dynamicWorkerReady === true ||
-      this.workerChoiceStrategyContext?.getStrategyPolicy()
-        .dynamicWorkerUsage === true
+      this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
+        true ||
+      this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+        true
     ) {
       workerNode.info.ready = true
     }
@@ -1462,11 +1550,11 @@ export abstract class AbstractPool<
     this.sendToWorker(workerNodeKey, {
       statistics: {
         runTime:
-          this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+          this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
             .runTime.aggregate ?? false,
         elu:
-          this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
-            .aggregate ?? false
+          this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+            .elu.aggregate ?? false
       }
     })
   }
@@ -1681,7 +1769,7 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.popTask()!
+      const task = sourceWorkerNode.dequeueTask(1)!
       this.handleTask(workerNodeKey, task)
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1732,7 +1820,7 @@ export abstract class AbstractPool<
         }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.popTask()!
+        const task = sourceWorkerNode.dequeueTask(1)!
         this.handleTask(workerNodeKey, task)
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
@@ -1885,7 +1973,9 @@ export abstract class AbstractPool<
           this.opts.tasksQueueOptions?.size ??
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
-          ).size
+          ).size,
+        tasksQueueBucketSize:
+          (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
       }
     )
     // Flag the worker node as ready at pool startup.
@@ -1927,7 +2017,7 @@ export abstract class AbstractPool<
     const workerNodeKey = this.workerNodes.indexOf(workerNode)
     if (workerNodeKey !== -1) {
       this.workerNodes.splice(workerNodeKey, 1)
-      this.workerChoiceStrategyContext?.remove(workerNodeKey)
+      this.workerChoiceStrategiesContext?.remove(workerNodeKey)
     }
     this.checkAndEmitEmptyEvent()
   }
@@ -1966,8 +2056,11 @@ export abstract class AbstractPool<
     return tasksQueueSize
   }
 
-  private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].dequeueTask()
+  private dequeueTask (
+    workerNodeKey: number,
+    bucket?: number
+  ): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].dequeueTask(bucket)
   }
 
   private tasksQueueSize (workerNodeKey: number): number {
@@ -1986,7 +2079,7 @@ export abstract class AbstractPool<
   }
 
   private flushTasksQueues (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.flushTasksQueue(workerNodeKey)
     }
   }