refactor: factor out task execution semantic with tasks queue
[poolifier.git] / src / pools / abstract-pool.ts
index 6081902720554c2bcdb809d37f04d5f93ed89da7..be8ec1a0125644bbcb00b3a5ec48e3056e16ed11 100644 (file)
@@ -5,8 +5,7 @@ import { type TransferListItem } from 'node:worker_threads'
 import type {
   MessageValue,
   PromiseResponseWrapper,
-  Task,
-  Writable
+  Task
 } from '../utility-types'
 import {
   DEFAULT_TASK_NAME,
@@ -265,10 +264,10 @@ export abstract class AbstractPool<
     }
     if (
       workerChoiceStrategyOptions.choiceRetries != null &&
-      workerChoiceStrategyOptions.choiceRetries <= 0
+      workerChoiceStrategyOptions.choiceRetries < 0
     ) {
       throw new RangeError(
-        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero`
       )
     }
     if (
@@ -292,7 +291,7 @@ export abstract class AbstractPool<
   }
 
   private checkValidTasksQueueOptions (
-    tasksQueueOptions: Writable<TasksQueueOptions>
+    tasksQueueOptions: TasksQueueOptions
   ): void {
     if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
       throw new TypeError('Invalid tasks queue options: must be a plain object')
@@ -313,28 +312,22 @@ export abstract class AbstractPool<
         `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
       )
     }
-    if (
-      tasksQueueOptions?.queueMaxSize != null &&
-      tasksQueueOptions?.size != null
-    ) {
+    if (tasksQueueOptions?.queueMaxSize != null) {
       throw new Error(
-        'Invalid tasks queue options: cannot specify both queueMaxSize and size'
+        'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
       )
     }
-    if (tasksQueueOptions?.queueMaxSize != null) {
-      tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
-    }
     if (
       tasksQueueOptions?.size != null &&
       !Number.isSafeInteger(tasksQueueOptions.size)
     ) {
       throw new TypeError(
-        'Invalid worker node tasks queue max size: must be an integer'
+        'Invalid worker node tasks queue size: must be an integer'
       )
     }
     if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
       throw new RangeError(
-        `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
+        `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
       )
     }
   }
@@ -740,6 +733,14 @@ export abstract class AbstractPool<
     return []
   }
 
+  private shallExecuteTask (workerNodeKey: number): boolean {
+    return (
+      this.tasksQueueSize(workerNodeKey) === 0 &&
+      this.workerNodes[workerNodeKey].usage.tasks.executing <
+        (this.opts.tasksQueueOptions?.concurrency as number)
+    )
+  }
+
   /** @inheritDoc */
   public async execute (
     data?: Data,
@@ -749,9 +750,11 @@ export abstract class AbstractPool<
     return await new Promise<Response>((resolve, reject) => {
       if (!this.started) {
         reject(new Error('Cannot execute a task on destroyed pool'))
+        return
       }
       if (name != null && typeof name !== 'string') {
         reject(new TypeError('name argument must be a string'))
+        return
       }
       if (
         name != null &&
@@ -759,22 +762,15 @@ export abstract class AbstractPool<
         name.trim().length === 0
       ) {
         reject(new TypeError('name argument must not be an empty string'))
+        return
       }
       if (transferList != null && !Array.isArray(transferList)) {
         reject(new TypeError('transferList argument must be an array'))
+        return
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
       const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
-      if (
-        name != null &&
-        Array.isArray(workerInfo.taskFunctions) &&
-        !workerInfo.taskFunctions.includes(name)
-      ) {
-        reject(
-          new Error(`Task function '${name}' is not registered in the pool`)
-        )
-      }
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@ -792,9 +788,7 @@ export abstract class AbstractPool<
       if (
         this.opts.enableTasksQueue === false ||
         (this.opts.enableTasksQueue === true &&
-          this.tasksQueueSize(workerNodeKey) === 0 &&
-          this.workerNodes[workerNodeKey].usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number))
+          this.shallExecuteTask(workerNodeKey))
       ) {
         this.executeTask(workerNodeKey, task)
       } else {
@@ -1225,11 +1219,7 @@ export abstract class AbstractPool<
           ...(this.dequeueTask(workerNodeKey) as Task<Data>),
           workerId: destinationWorkerNode.info.id as number
         }
-        if (
-          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
-          destinationWorkerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-        ) {
+        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
@@ -1260,11 +1250,7 @@ export abstract class AbstractPool<
           ...(sourceWorkerNode.popTask() as Task<Data>),
           workerId: destinationWorkerNode.info.id as number
         }
-        if (
-          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
-          destinationWorkerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-        ) {
+        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
@@ -1290,6 +1276,9 @@ export abstract class AbstractPool<
   }
 
   private tasksStealingOnBackPressure (workerId: number): void {
+    if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
+      return
+    }
     const sourceWorkerNode =
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     const workerNodes = this.workerNodes
@@ -1310,11 +1299,7 @@ export abstract class AbstractPool<
           ...(sourceWorkerNode.popTask() as Task<Data>),
           workerId: workerNode.info.id as number
         }
-        if (
-          this.tasksQueueSize(workerNodeKey) === 0 &&
-          workerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-        ) {
+        if (this.shallExecuteTask(workerNodeKey)) {
           this.executeTask(workerNodeKey, task)
         } else {
           this.enqueueTask(workerNodeKey, task)