feat: add transferable objects support to worker_threads pool
[poolifier.git] / src / pools / abstract-pool.ts
index 91cb5facd01f736d43703d4d0530ec7c01a3bf41..eb47bc2f24f67b983b4829a763705c6fb8b45828 100644 (file)
@@ -1,6 +1,7 @@
 import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
 import { existsSync } from 'node:fs'
+import { type TransferListItem } from 'node:worker_threads'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -27,12 +28,13 @@ import {
   PoolTypes,
   type TasksQueueOptions
 } from './pool'
-import type {
-  IWorker,
-  IWorkerNode,
-  WorkerInfo,
-  WorkerType,
-  WorkerUsage
+import {
+  type IWorker,
+  type IWorkerNode,
+  type WorkerInfo,
+  type WorkerType,
+  WorkerTypes,
+  type WorkerUsage
 } from './worker'
 import {
   type MeasurementStatisticsRequirements,
@@ -640,14 +642,25 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data, name?: string): Promise<Response> {
+  public async execute (
+    data?: Data,
+    name?: string,
+    transferList?: TransferListItem[]
+  ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
+      if (name != null && typeof name !== 'string') {
+        reject(new TypeError('name argument must be a string'))
+      }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+      }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        transferList,
         timestamp,
         workerId: this.getWorkerInfo(workerNodeKey).id as number,
         taskId: randomUUID()
@@ -850,10 +863,12 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param message - The message.
+   * @param transferList - The optional array of transferable objects.
    */
   protected abstract sendToWorker (
     workerNodeKey: number,
-    message: MessageValue<Data>
+    message: MessageValue<Data>,
+    transferList?: TransferListItem[]
   ): void
 
   /**
@@ -1156,7 +1171,13 @@ export abstract class AbstractPool<
    */
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
-    this.sendToWorker(workerNodeKey, task)
+    this.sendToWorker(
+      workerNodeKey,
+      task,
+      this.worker === WorkerTypes.thread && task.transferList != null
+        ? task.transferList
+        : undefined
+    )
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {