Merge pull request #2343 from poolifier/combined-prs-branch
[poolifier.git] / src / pools / abstract-pool.ts
index 2c4fdf5a99f8e3ce2accd39311caa5e8e776a737..8f57a9067e0967bf87dd303efabfe694b00bdbc6 100644 (file)
@@ -2025,6 +2025,12 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTasksQueuePriority (workerNodeKey: number): void {
+    this.workerNodes[workerNodeKey].setTasksQueuePriority(
+      this.getTasksQueuePriority()
+    )
+  }
+
   /**
    * This method is the message listener registered on each worker.
    */
@@ -2043,6 +2049,7 @@ export abstract class AbstractPool<
       if (workerInfo != null) {
         workerInfo.taskFunctionsProperties = taskFunctionsProperties
         this.sendStatisticsMessageToWorker(workerNodeKey)
+        this.setTasksQueuePriority(workerNodeKey)
       }
     } else if (taskId != null) {
       // Task execution response received from worker
@@ -2067,6 +2074,7 @@ export abstract class AbstractPool<
     workerNode.info.ready = ready
     workerNode.info.taskFunctionsProperties = taskFunctionsProperties
     this.sendStatisticsMessageToWorker(workerNodeKey)
+    this.setTasksQueuePriority(workerNodeKey)
     this.checkAndEmitReadyEvent()
   }
 
@@ -2154,6 +2162,12 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey]?.info
   }
 
+  private getTasksQueuePriority (): boolean {
+    return this.listTaskFunctionsProperties().some(
+      taskFunctionProperties => taskFunctionProperties.priority != null
+    )
+  }
+
   /**
    * Creates a worker node.
    *
@@ -2171,7 +2185,8 @@ export abstract class AbstractPool<
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
           ).size,
-        tasksQueueBucketSize: defaultBucketSize
+        tasksQueueBucketSize: defaultBucketSize,
+        tasksQueuePriority: this.getTasksQueuePriority()
       }
     )
     // Flag the worker node as ready at pool startup.