]> Piment Noir Git Repositories - poolifier.git/commitdiff
fix: worker index identification at tasks stealing under back pressure
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 19 Aug 2025 12:13:33 +0000 (14:13 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 19 Aug 2025 12:13:33 +0000 (14:13 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/worker/abstract-worker.ts

index 84ef7c6a1281fff9d0ab4948c9f43e51b9bab87d..65ad5e3a40ec9877b8e1a3bf87fe7fe6032c4cb6 100644 (file)
@@ -90,6 +90,8 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public get info (): PoolInfo {
+    const taskStatisticsRequirements =
+      this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
     return {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       defaultStrategy: this.opts.workerChoiceStrategy!,
@@ -101,10 +103,8 @@ export abstract class AbstractPool<
       type: this.type,
       version,
       worker: this.worker,
-      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-        .runTime.aggregate === true &&
-        this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-          .waitTime.aggregate && {
+      ...(taskStatisticsRequirements?.runTime.aggregate === true &&
+        taskStatisticsRequirements.waitTime.aggregate && {
         utilization: round(this.utilization),
       }),
       busyWorkerNodes: this.workerNodes.reduce(
@@ -172,8 +172,7 @@ export abstract class AbstractPool<
           0
         ),
       }),
-      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-        .runTime.aggregate === true && {
+      ...(taskStatisticsRequirements?.runTime.aggregate === true && {
         runTime: {
           maximum: round(
             max(
@@ -191,8 +190,7 @@ export abstract class AbstractPool<
               )
             )
           ),
-          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-            .runTime.average && {
+          ...(taskStatisticsRequirements.runTime.average && {
             average: round(
               average(
                 this.workerNodes.reduce<number[]>(
@@ -205,8 +203,7 @@ export abstract class AbstractPool<
               )
             ),
           }),
-          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-            .runTime.median && {
+          ...(taskStatisticsRequirements.runTime.median && {
             median: round(
               median(
                 this.workerNodes.reduce<number[]>(
@@ -221,8 +218,7 @@ export abstract class AbstractPool<
           }),
         },
       }),
-      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-        .waitTime.aggregate === true && {
+      ...(taskStatisticsRequirements?.waitTime.aggregate === true && {
         waitTime: {
           maximum: round(
             max(
@@ -240,8 +236,7 @@ export abstract class AbstractPool<
               )
             )
           ),
-          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-            .waitTime.average && {
+          ...(taskStatisticsRequirements.waitTime.average && {
             average: round(
               average(
                 this.workerNodes.reduce<number[]>(
@@ -254,8 +249,7 @@ export abstract class AbstractPool<
               )
             ),
           }),
-          ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-            .waitTime.median && {
+          ...(taskStatisticsRequirements.waitTime.median && {
             median: round(
               median(
                 this.workerNodes.reduce<number[]>(
@@ -270,8 +264,7 @@ export abstract class AbstractPool<
           }),
         },
       }),
-      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-        .elu.aggregate === true && {
+      ...(taskStatisticsRequirements?.elu.aggregate === true && {
         elu: {
           active: {
             maximum: round(
@@ -292,8 +285,7 @@ export abstract class AbstractPool<
                 )
               )
             ),
-            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-              .elu.average && {
+            ...(taskStatisticsRequirements.elu.average && {
               average: round(
                 average(
                   this.workerNodes.reduce<number[]>(
@@ -306,8 +298,7 @@ export abstract class AbstractPool<
                 )
               ),
             }),
-            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-              .elu.median && {
+            ...(taskStatisticsRequirements.elu.median && {
               median: round(
                 median(
                   this.workerNodes.reduce<number[]>(
@@ -340,8 +331,7 @@ export abstract class AbstractPool<
                 )
               )
             ),
-            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-              .elu.average && {
+            ...(taskStatisticsRequirements.elu.average && {
               average: round(
                 average(
                   this.workerNodes.reduce<number[]>(
@@ -354,8 +344,7 @@ export abstract class AbstractPool<
                 )
               ),
             }),
-            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
-              .elu.median && {
+            ...(taskStatisticsRequirements.elu.median && {
               median: round(
                 median(
                   this.workerNodes.reduce<number[]>(
@@ -518,6 +507,9 @@ export abstract class AbstractPool<
     const poolTimeCapacity =
       (performance.now() - this.startTimestamp) *
       (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+    if (!Number.isFinite(poolTimeCapacity) || poolTimeCapacity <= 0) {
+      return 0
+    }
     const totalTasksRunTime = this.workerNodes.reduce(
       (accumulator, workerNode) =>
         accumulator + (workerNode.usage.runTime.aggregate ?? 0),
@@ -1239,6 +1231,7 @@ export abstract class AbstractPool<
    * @returns Worker nodes back pressure boolean status.
    */
   protected internalBackPressure (): boolean {
+    if (this.workerNodes.length === 0) return false
     return (
       this.workerNodes.reduce(
         (accumulator, _, workerNodeKey) =>
@@ -1255,6 +1248,7 @@ export abstract class AbstractPool<
    * @returns Worker nodes busyness boolean status.
    */
   protected internalBusy (): boolean {
+    if (this.workerNodes.length === 0) return false
     return (
       this.workerNodes.reduce(
         (accumulator, _, workerNodeKey) =>
@@ -1881,7 +1875,7 @@ export abstract class AbstractPool<
         (workerNodeA, workerNodeB) =>
           workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
       )
-    for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+    for (const workerNode of workerNodes) {
       if (sourceWorkerNode.usage.tasks.queued === 0) {
         break
       }
@@ -1892,6 +1886,7 @@ export abstract class AbstractPool<
           // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
           this.opts.tasksQueueOptions!.size! - sizeOffset
       ) {
+        const workerNodeKey = this.workerNodes.indexOf(workerNode)
         workerNode.info.backPressureStealing = true
         this.stealTask(sourceWorkerNode, workerNodeKey)
         workerNode.info.backPressureStealing = false
@@ -1977,10 +1972,9 @@ export abstract class AbstractPool<
    * @param workerNode - The worker node.
    */
   private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
-    if (
+    const taskStatisticsRequirements =
       this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-        .runTime.aggregate === true
-    ) {
+    if (taskStatisticsRequirements?.runTime.aggregate === true) {
       workerNode.usage.runTime.aggregate = min(
         ...this.workerNodes.map(
           workerNode =>
@@ -1988,10 +1982,7 @@ export abstract class AbstractPool<
         )
       )
     }
-    if (
-      this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-        .waitTime.aggregate === true
-    ) {
+    if (taskStatisticsRequirements?.waitTime.aggregate === true) {
       workerNode.usage.waitTime.aggregate = min(
         ...this.workerNodes.map(
           workerNode =>
@@ -1999,10 +1990,7 @@ export abstract class AbstractPool<
         )
       )
     }
-    if (
-      this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
-        .aggregate === true
-    ) {
+    if (taskStatisticsRequirements?.elu.aggregate === true) {
       workerNode.usage.elu.active.aggregate = min(
         ...this.workerNodes.map(
           workerNode =>
@@ -2144,15 +2132,22 @@ export abstract class AbstractPool<
     while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
       const destinationWorkerNodeKey = this.workerNodes.reduce(
         (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
-          return sourceWorkerNodeKey !== workerNodeKey &&
-            workerNode.info.ready &&
-            workerNode.usage.tasks.queued <
-              workerNodes[minWorkerNodeKey].usage.tasks.queued
+          if (workerNodeKey === sourceWorkerNodeKey || !workerNode.info.ready) {
+            return minWorkerNodeKey
+          }
+          if (minWorkerNodeKey === -1) {
+            return workerNodeKey
+          }
+          return workerNode.usage.tasks.queued <
+            workerNodes[minWorkerNodeKey].usage.tasks.queued
             ? workerNodeKey
             : minWorkerNodeKey
         },
-        0
+        -1
       )
+      if (destinationWorkerNodeKey === -1) {
+        break
+      }
       this.handleTask(
         destinationWorkerNodeKey,
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -2250,14 +2245,12 @@ export abstract class AbstractPool<
    * @param workerNodeKey - The worker node key.
    */
   private sendStatisticsMessageToWorker (workerNodeKey: number): void {
+    const taskStatisticsRequirements =
+      this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
     this.sendToWorker(workerNodeKey, {
       statistics: {
-        elu:
-          this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-            .elu.aggregate ?? false,
-        runTime:
-          this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
-            .runTime.aggregate ?? false,
+        elu: taskStatisticsRequirements?.elu.aggregate ?? false,
+        runTime: taskStatisticsRequirements?.runTime.aggregate ?? false,
       },
     })
   }
@@ -2315,12 +2308,12 @@ export abstract class AbstractPool<
     message: MessageValue<Data>
   ): Promise<boolean> {
     const targetWorkerNodeKeys = [...this.workerNodes.keys()]
+    const responsesReceived: MessageValue<Response>[] = []
     const taskFunctionOperationsListener = (
       message: MessageValue<Response>,
       resolve: (value: boolean | PromiseLike<boolean>) => void,
       reject: (reason?: unknown) => void
     ): void => {
-      const responsesReceived: MessageValue<Response>[] = []
       this.checkMessageWorkerId(message)
       if (
         message.taskFunctionOperationStatus != null &&
@@ -2467,8 +2460,12 @@ export abstract class AbstractPool<
     }
     destinationWorkerNode.info.stealing = true
     sourceWorkerNode.info.stolen = true
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()!
+    const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()
+    if (stolenTask == null) {
+      sourceWorkerNode.info.stolen = false
+      destinationWorkerNode.info.stealing = false
+      return
+    }
     sourceWorkerNode.info.stolen = false
     destinationWorkerNode.info.stealing = false
     this.handleTask(destinationWorkerNodeKey, stolenTask)
@@ -2554,6 +2551,9 @@ export abstract class AbstractPool<
   private readonly workerNodeStealTask = (
     workerNodeKey: number
   ): Task<Data> | undefined => {
+    const workerNode = this.workerNodes[workerNodeKey]
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    if (workerNode == null) return
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -2561,8 +2561,8 @@ export abstract class AbstractPool<
           workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
       )
     const sourceWorkerNode = workerNodes.find(
-      (sourceWorkerNode, sourceWorkerNodeKey) =>
-        sourceWorkerNodeKey !== workerNodeKey &&
+      sourceWorkerNode =>
+        sourceWorkerNode !== workerNode &&
         sourceWorkerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
index 67c8708922162fb807239c23ebb94817f7ef578a..3ea7aadec60f236a21c46351f8e6e0569e33e40f 100644 (file)
@@ -198,10 +198,14 @@ export abstract class AbstractWorker<
         DEFAULT_TASK_NAME,
         this.taskFunctions.get(DEFAULT_TASK_NAME)
       ),
-      buildTaskFunctionProperties(
-        defaultTaskFunctionName,
-        this.taskFunctions.get(defaultTaskFunctionName)
-      ),
+      ...(defaultTaskFunctionName !== DEFAULT_TASK_NAME
+        ? [
+            buildTaskFunctionProperties(
+              defaultTaskFunctionName,
+              this.taskFunctions.get(defaultTaskFunctionName)
+            ),
+          ]
+        : []),
       ...taskFunctionsProperties,
     ]
   }
@@ -700,6 +704,7 @@ export abstract class AbstractWorker<
       this.checkActive.bind(this),
       (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
     )
+    this.activeInterval.unref()
   }
 
   /**