feat: add transferable objects support to worker_threads pool
[poolifier.git] / src / pools / abstract-pool.ts
index 30b4d01f9846cd2c2327d5a52e4a6be594e8d62c..eb47bc2f24f67b983b4829a763705c6fb8b45828 100644 (file)
@@ -1,6 +1,7 @@
 import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
 import { existsSync } from 'node:fs'
+import { type TransferListItem } from 'node:worker_threads'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -27,12 +28,13 @@ import {
   PoolTypes,
   type TasksQueueOptions
 } from './pool'
-import type {
-  IWorker,
-  IWorkerNode,
-  WorkerInfo,
-  WorkerType,
-  WorkerUsage
+import {
+  type IWorker,
+  type IWorkerNode,
+  type WorkerInfo,
+  type WorkerType,
+  WorkerTypes,
+  type WorkerUsage
 } from './worker'
 import {
   type MeasurementStatisticsRequirements,
@@ -186,7 +188,7 @@ export abstract class AbstractPool<
         )
       } else if (max === 0) {
         throw new RangeError(
-          'Cannot instantiate a dynamic pool with a pool size equal to zero'
+          'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
         )
       } else if (min === max) {
         throw new RangeError(
@@ -615,33 +617,55 @@ export abstract class AbstractPool<
   protected abstract get busy (): boolean
 
   /**
-   * Whether worker nodes are executing at least one task.
+   * Whether worker nodes are executing concurrently their tasks quota or not.
    *
    * @returns Worker nodes busyness boolean status.
    */
   protected internalBusy (): boolean {
-    return (
-      this.workerNodes.findIndex(
-        workerNode =>
-          workerNode.info.ready && workerNode.usage.tasks.executing === 0
-      ) === -1
-    )
+    if (this.opts.enableTasksQueue === true) {
+      return (
+        this.workerNodes.findIndex(
+          workerNode =>
+            workerNode.info.ready &&
+            workerNode.usage.tasks.executing <
+              (this.opts.tasksQueueOptions?.concurrency as number)
+        ) === -1
+      )
+    } else {
+      return (
+        this.workerNodes.findIndex(
+          workerNode =>
+            workerNode.info.ready && workerNode.usage.tasks.executing === 0
+        ) === -1
+      )
+    }
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data, name?: string): Promise<Response> {
+  public async execute (
+    data?: Data,
+    name?: string,
+    transferList?: TransferListItem[]
+  ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
+      if (name != null && typeof name !== 'string') {
+        reject(new TypeError('name argument must be a string'))
+      }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+      }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        transferList,
         timestamp,
         workerId: this.getWorkerInfo(workerNodeKey).id as number,
-        id: randomUUID()
+        taskId: randomUUID()
       }
-      this.promiseResponseMap.set(task.id as string, {
+      this.promiseResponseMap.set(task.taskId as string, {
         resolve,
         reject,
         workerNodeKey
@@ -839,10 +863,12 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param message - The message.
+   * @param transferList - The optional array of transferable objects.
    */
   protected abstract sendToWorker (
     workerNodeKey: number,
-    message: MessageValue<Data>
+    message: MessageValue<Data>,
+    transferList?: TransferListItem[]
   ): void
 
   /**
@@ -1037,7 +1063,7 @@ export abstract class AbstractPool<
       if (message.ready != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
-      } else if (message.id != null) {
+      } else if (message.taskId != null) {
         // Task execution response received from worker
         this.handleTaskExecutionResponse(message)
       }
@@ -1054,7 +1080,9 @@ export abstract class AbstractPool<
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const promiseResponse = this.promiseResponseMap.get(message.id as string)
+    const promiseResponse = this.promiseResponseMap.get(
+      message.taskId as string
+    )
     if (promiseResponse != null) {
       if (message.taskError != null) {
         this.emitter?.emit(PoolEvents.taskError, message.taskError)
@@ -1064,7 +1092,7 @@ export abstract class AbstractPool<
       }
       const workerNodeKey = promiseResponse.workerNodeKey
       this.afterTaskExecutionHook(workerNodeKey, message)
-      this.promiseResponseMap.delete(message.id as string)
+      this.promiseResponseMap.delete(message.taskId as string)
       if (
         this.opts.enableTasksQueue === true &&
         this.tasksQueueSize(workerNodeKey) > 0 &&
@@ -1143,7 +1171,13 @@ export abstract class AbstractPool<
    */
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
-    this.sendToWorker(workerNodeKey, task)
+    this.sendToWorker(
+      workerNodeKey,
+      task,
+      this.worker === WorkerTypes.thread && task.transferList != null
+        ? task.transferList
+        : undefined
+    )
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {