refactor: cleanup updateTaskStolenStatisticsWorkerUsage() signature
[poolifier.git] / src / pools / abstract-pool.ts
index 2f5c61896658311c96c551deee3fb99552273be2..a41996ff918d527d2ef81ac29f431cdd601b91c7 100644 (file)
@@ -5,8 +5,7 @@ import { type TransferListItem } from 'node:worker_threads'
 import type {
   MessageValue,
   PromiseResponseWrapper,
-  Task,
-  Writable
+  Task
 } from '../utility-types'
 import {
   DEFAULT_TASK_NAME,
@@ -256,19 +255,19 @@ export abstract class AbstractPool<
       )
     }
     if (
-      workerChoiceStrategyOptions.choiceRetries != null &&
-      !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+      workerChoiceStrategyOptions.retries != null &&
+      !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
     ) {
       throw new TypeError(
-        'Invalid worker choice strategy options: choice retries must be an integer'
+        'Invalid worker choice strategy options: retries must be an integer'
       )
     }
     if (
-      workerChoiceStrategyOptions.choiceRetries != null &&
-      workerChoiceStrategyOptions.choiceRetries < 0
+      workerChoiceStrategyOptions.retries != null &&
+      workerChoiceStrategyOptions.retries < 0
     ) {
       throw new RangeError(
-        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero`
+        `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
       )
     }
     if (
@@ -292,7 +291,7 @@ export abstract class AbstractPool<
   }
 
   private checkValidTasksQueueOptions (
-    tasksQueueOptions: Writable<TasksQueueOptions>
+    tasksQueueOptions: TasksQueueOptions
   ): void {
     if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
       throw new TypeError('Invalid tasks queue options: must be a plain object')
@@ -734,6 +733,14 @@ export abstract class AbstractPool<
     return []
   }
 
+  private shallExecuteTask (workerNodeKey: number): boolean {
+    return (
+      this.tasksQueueSize(workerNodeKey) === 0 &&
+      this.workerNodes[workerNodeKey].usage.tasks.executing <
+        (this.opts.tasksQueueOptions?.concurrency as number)
+    )
+  }
+
   /** @inheritDoc */
   public async execute (
     data?: Data,
@@ -781,9 +788,7 @@ export abstract class AbstractPool<
       if (
         this.opts.enableTasksQueue === false ||
         (this.opts.enableTasksQueue === true &&
-          this.tasksQueueSize(workerNodeKey) === 0 &&
-          this.workerNodes[workerNodeKey].usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number))
+          this.shallExecuteTask(workerNodeKey))
       ) {
         this.executeTask(workerNodeKey, task)
       } else {
@@ -1214,11 +1219,7 @@ export abstract class AbstractPool<
           ...(this.dequeueTask(workerNodeKey) as Task<Data>),
           workerId: destinationWorkerNode.info.id as number
         }
-        if (
-          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
-          destinationWorkerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-        ) {
+        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
@@ -1227,6 +1228,25 @@ export abstract class AbstractPool<
     }
   }
 
+  private updateTaskStolenStatisticsWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    if (workerNode?.usage != null) {
+      ++workerNode.usage.tasks.stolen
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      workerNode.getTaskFunctionWorkerUsage(taskName) != null
+    ) {
+      const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+        taskName
+      ) as WorkerUsage
+      ++taskFunctionWorkerUsage.tasks.stolen
+    }
+  }
+
   private taskStealingOnEmptyQueue (workerId: number): void {
     const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
     const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
@@ -1249,30 +1269,15 @@ export abstract class AbstractPool<
           ...(sourceWorkerNode.popTask() as Task<Data>),
           workerId: destinationWorkerNode.info.id as number
         }
-        if (
-          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
-          destinationWorkerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-        ) {
+        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
         }
-        if (destinationWorkerNode?.usage != null) {
-          ++destinationWorkerNode.usage.tasks.stolen
-        }
-        if (
-          this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
-          destinationWorkerNode.getTaskFunctionWorkerUsage(
-            task.name as string
-          ) != null
-        ) {
-          const taskFunctionWorkerUsage =
-            destinationWorkerNode.getTaskFunctionWorkerUsage(
-              task.name as string
-            ) as WorkerUsage
-          ++taskFunctionWorkerUsage.tasks.stolen
-        }
+        this.updateTaskStolenStatisticsWorkerUsage(
+          destinationWorkerNodeKey,
+          task.name as string
+        )
         break
       }
     }
@@ -1302,23 +1307,15 @@ export abstract class AbstractPool<
           ...(sourceWorkerNode.popTask() as Task<Data>),
           workerId: workerNode.info.id as number
         }
-        if (this.tasksQueueSize(workerNodeKey) === 0) {
+        if (this.shallExecuteTask(workerNodeKey)) {
           this.executeTask(workerNodeKey, task)
         } else {
           this.enqueueTask(workerNodeKey, task)
         }
-        if (workerNode?.usage != null) {
-          ++workerNode.usage.tasks.stolen
-        }
-        if (
-          this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
-          workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
-        ) {
-          const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
-            task.name as string
-          ) as WorkerUsage
-          ++taskFunctionWorkerUsage.tasks.stolen
-        }
+        this.updateTaskStolenStatisticsWorkerUsage(
+          workerNodeKey,
+          task.name as string
+        )
       }
     }
   }
@@ -1430,7 +1427,6 @@ export abstract class AbstractPool<
   private addWorkerNode (worker: Worker): number {
     const workerNode = new WorkerNode<Worker, Data>(
       worker,
-      this.worker,
       this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
     )
     // Flag the worker node as ready at pool startup.