Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/fastify-cluster...
[poolifier.git] / src / pools / abstract-pool.ts
index a11a574147cf074dc938d1e0d47b3797e6e15ad2..062df07791af8666fd94282eac229aef466c7a1b 100644 (file)
@@ -427,21 +427,26 @@ export abstract class AbstractPool<
               )
             )
           ),
-          average: round(
-            average(
-              this.workerNodes.reduce<number[]>(
-                (accumulator, workerNode) =>
-                  accumulator.concat(workerNode.usage.runTime.history),
-                []
+          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+            .runTime.average && {
+            average: round(
+              average(
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.runTime.history),
+                  []
+                )
               )
             )
-          ),
+          }),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .runTime.median && {
             median: round(
               median(
-                this.workerNodes.map(
-                  (workerNode) => workerNode.usage.runTime?.median ?? 0
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.runTime.history),
+                  []
                 )
               )
             )
@@ -465,21 +470,26 @@ export abstract class AbstractPool<
               )
             )
           ),
-          average: round(
-            average(
-              this.workerNodes.reduce<number[]>(
-                (accumulator, workerNode) =>
-                  accumulator.concat(workerNode.usage.waitTime.history),
-                []
+          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+            .waitTime.average && {
+            average: round(
+              average(
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.waitTime.history),
+                  []
+                )
               )
             )
-          ),
+          }),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .waitTime.median && {
             median: round(
               median(
-                this.workerNodes.map(
-                  (workerNode) => workerNode.usage.waitTime?.median ?? 0
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.waitTime.history),
+                  []
                 )
               )
             )
@@ -775,6 +785,7 @@ export abstract class AbstractPool<
       if (
         this.opts.enableTasksQueue === false ||
         (this.opts.enableTasksQueue === true &&
+          this.tasksQueueSize(workerNodeKey) === 0 &&
           this.workerNodes[workerNodeKey].usage.tasks.executing <
             (this.opts.tasksQueueOptions?.concurrency as number))
       ) {
@@ -826,7 +837,7 @@ export abstract class AbstractPool<
    * @virtual
    */
   protected setupHook (): void {
-    // Intentionally empty
+    /** Intentionally empty */
   }
 
   /**
@@ -1189,15 +1200,8 @@ export abstract class AbstractPool<
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let destinationWorkerNodeKey!: number
       let minQueuedTasks = Infinity
-      let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
-          if (
-            workerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-          ) {
-            executeTask = true
-          }
           if (workerNode.usage.tasks.queued === 0) {
             destinationWorkerNodeKey = workerNodeId
             break
@@ -1209,12 +1213,16 @@ export abstract class AbstractPool<
         }
       }
       if (destinationWorkerNodeKey != null) {
+        const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
         const task = {
           ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-          workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
-            .id as number
+          workerId: destinationWorkerNode.info.id as number
         }
-        if (executeTask) {
+        if (
+          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
+          destinationWorkerNode.usage.tasks.executing <
+            (this.opts.tasksQueueOptions?.concurrency as number)
+        ) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
@@ -1246,8 +1254,9 @@ export abstract class AbstractPool<
           workerId: destinationWorkerNode.info.id as number
         }
         if (
+          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
           destinationWorkerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
@@ -1280,8 +1289,9 @@ export abstract class AbstractPool<
           workerId: workerNode.info.id as number
         }
         if (
+          this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           this.executeTask(workerNodeKey, task)
         } else {