feat: use priority queue for task queueing
[poolifier.git] / src / pools / abstract-pool.ts
index 5e88a7cd3392e98ae9e4757a4d4a00c174b4d471..52829c28163637f97947343b88aab983ab1a101f 100644 (file)
@@ -47,6 +47,7 @@ import {
 import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   checkFilePath,
+  checkValidPriority,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
   getDefaultTasksQueueOptions,
@@ -855,6 +856,8 @@ export abstract class AbstractPool<
     if (typeof fn.taskFunction !== 'function') {
       throw new TypeError('taskFunction property must be a function')
     }
+    checkValidPriority(fn.priority)
+    checkValidWorkerChoiceStrategy(fn.strategy)
     const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionProperties: buildTaskFunctionProperties(name, fn),
@@ -919,6 +922,25 @@ export abstract class AbstractPool<
     }
   }
 
+  /**
+   * Gets worker node task function priority, if any.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param name - The task function name.
+   * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+   */
+  private readonly getWorkerNodeTaskFunctionPriority = (
+    workerNodeKey: number,
+    name?: string
+  ): number | undefined => {
+    if (name != null) {
+      return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
+        (taskFunctionProperties: TaskFunctionProperties) =>
+          taskFunctionProperties.name === name
+      )?.priority
+    }
+  }
+
   /**
    * Gets the worker choice strategies registered in this pool.
    *
@@ -998,13 +1020,15 @@ export abstract class AbstractPool<
         return
       }
       const timestamp = performance.now()
-      const workerNodeKey = this.chooseWorkerNode(
+      const taskFunctionStrategy =
         this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
-      )
+      const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+        strategy: taskFunctionStrategy,
         transferList,
         timestamp,
         taskId: randomUUID()
@@ -1742,7 +1766,7 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.popTask()!
+      const task = sourceWorkerNode.dequeueTask(1)!
       this.handleTask(workerNodeKey, task)
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1793,7 +1817,7 @@ export abstract class AbstractPool<
         }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.popTask()!
+        const task = sourceWorkerNode.dequeueTask(1)!
         this.handleTask(workerNodeKey, task)
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
@@ -1946,7 +1970,9 @@ export abstract class AbstractPool<
           this.opts.tasksQueueOptions?.size ??
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
-          ).size
+          ).size,
+        tasksQueueBucketSize:
+          (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
       }
     )
     // Flag the worker node as ready at pool startup.
@@ -2027,8 +2053,11 @@ export abstract class AbstractPool<
     return tasksQueueSize
   }
 
-  private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].dequeueTask()
+  private dequeueTask (
+    workerNodeKey: number,
+    bucket?: number
+  ): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].dequeueTask(bucket)
   }
 
   private tasksQueueSize (workerNodeKey: number): number {