docs: refine README.md
[poolifier.git] / src / pools / abstract-pool.ts
index 5e88a7cd3392e98ae9e4757a4d4a00c174b4d471..4f60dff0279643a864b8c570eaddf3e3af9e7546 100644 (file)
@@ -47,6 +47,7 @@ import {
 import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   checkFilePath,
+  checkValidPriority,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
   getDefaultTasksQueueOptions,
@@ -136,7 +137,7 @@ export abstract class AbstractPool<
   /**
    * The start timestamp of the pool.
    */
-  private readonly startTimestamp
+  private startTimestamp?: number
 
   /**
    * Constructs a new poolifier pool.
@@ -192,8 +193,6 @@ export abstract class AbstractPool<
     if (this.opts.startWorkers === true) {
       this.start()
     }
-
-    this.startTimestamp = performance.now()
   }
 
   private checkPoolType (): void {
@@ -486,6 +485,9 @@ export abstract class AbstractPool<
    * @returns The pool utilization.
    */
   private get utilization (): number {
+    if (this.startTimestamp == null) {
+      return 0
+    }
     const poolTimeCapacity =
       (performance.now() - this.startTimestamp) *
       (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
@@ -550,7 +552,7 @@ export abstract class AbstractPool<
     let requireSync = false
     checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     if (workerChoiceStrategyOptions != null) {
-      requireSync = this.setWorkerChoiceStrategyOptions(
+      requireSync = !this.setWorkerChoiceStrategyOptions(
         workerChoiceStrategyOptions
       )
     }
@@ -855,6 +857,8 @@ export abstract class AbstractPool<
     if (typeof fn.taskFunction !== 'function') {
       throw new TypeError('taskFunction property must be a function')
     }
+    checkValidPriority(fn.priority)
+    checkValidWorkerChoiceStrategy(fn.strategy)
     const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionProperties: buildTaskFunctionProperties(name, fn),
@@ -919,6 +923,25 @@ export abstract class AbstractPool<
     }
   }
 
+  /**
+   * Gets worker node task function priority, if any.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param name - The task function name.
+   * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+   */
+  private readonly getWorkerNodeTaskFunctionPriority = (
+    workerNodeKey: number,
+    name?: string
+  ): number | undefined => {
+    if (name != null) {
+      return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
+        (taskFunctionProperties: TaskFunctionProperties) =>
+          taskFunctionProperties.name === name
+      )?.priority
+    }
+  }
+
   /**
    * Gets the worker choice strategies registered in this pool.
    *
@@ -998,13 +1021,15 @@ export abstract class AbstractPool<
         return
       }
       const timestamp = performance.now()
-      const workerNodeKey = this.chooseWorkerNode(
+      const taskFunctionStrategy =
         this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
-      )
+      const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+        strategy: taskFunctionStrategy,
         transferList,
         timestamp,
         taskId: randomUUID()
@@ -1063,6 +1088,7 @@ export abstract class AbstractPool<
     }
     this.starting = true
     this.startMinimumNumberOfWorkers()
+    this.startTimestamp = performance.now()
     this.starting = false
     this.started = true
   }
@@ -1087,6 +1113,7 @@ export abstract class AbstractPool<
     this.emitter?.emit(PoolEvents.destroy, this.info)
     this.emitter?.emitDestroy()
     this.readyEventEmitted = false
+    delete this.startTimestamp
     this.destroying = false
     this.started = false
   }
@@ -1270,7 +1297,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Chooses a worker node for the next task.
+   * Chooses a worker node for the next task given the worker choice strategy.
    *
    * @param workerChoiceStrategy - The worker choice strategy.
    * @returns The chosen worker node key
@@ -1742,7 +1769,7 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.popTask()!
+      const task = sourceWorkerNode.dequeueTask(1)!
       this.handleTask(workerNodeKey, task)
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1793,7 +1820,7 @@ export abstract class AbstractPool<
         }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.popTask()!
+        const task = sourceWorkerNode.dequeueTask(1)!
         this.handleTask(workerNodeKey, task)
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
@@ -1946,7 +1973,9 @@ export abstract class AbstractPool<
           this.opts.tasksQueueOptions?.size ??
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
-          ).size
+          ).size,
+        tasksQueueBucketSize:
+          (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
       }
     )
     // Flag the worker node as ready at pool startup.
@@ -2027,8 +2056,11 @@ export abstract class AbstractPool<
     return tasksQueueSize
   }
 
-  private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].dequeueTask()
+  private dequeueTask (
+    workerNodeKey: number,
+    bucket?: number
+  ): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].dequeueTask(bucket)
   }
 
   private tasksQueueSize (workerNodeKey: number): number {