feat: add transferable objects support to worker_threads pool
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 11 Aug 2023 21:48:49 +0000 (23:48 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 11 Aug 2023 21:48:49 +0000 (23:48 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/thread/fixed.ts
src/utility-types.ts

index 34007bc5257adb2d1704c44bca7b00ec478c1518..95fbef8b5516e1c536fd4fd2e4db5b4fc87c9d63 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Add array of transferable objects to the `execute()` method arguments.
+
 ## [2.6.23] - 2023-08-11
 
 ### Fixed
index 6f66d038c1db08c7936f8edb1630b41d53a8460d..9662c29830f0f7e8c799029d2d4d2ada207d9f8f 100644 (file)
@@ -5,7 +5,7 @@
 - [Pool](#pool)
   - [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
   - [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
-  - [`pool.execute(data, name)`](#poolexecutedata-name)
+  - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
   - [`pool.destroy()`](#pooldestroy)
   - [`PoolOptions`](#pooloptions)
     - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
 `filePath` (mandatory) Path to a file with a worker implementation  
 `opts` (optional) An object with the pool options properties described below
 
-### `pool.execute(data, name)`
+### `pool.execute(data, name, transferList)`
 
 `data` (optional) An object that you want to pass to your worker implementation  
 `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`
+`transferList` (optional) An array of transferable objects that you want to transfer to your [worker_threads](https://nodejs.org/api/worker_threads.html) worker implementation
 
 This method is available on both pool implementations and returns a promise with the task function execution response.
 
index 91cb5facd01f736d43703d4d0530ec7c01a3bf41..eb47bc2f24f67b983b4829a763705c6fb8b45828 100644 (file)
@@ -1,6 +1,7 @@
 import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
 import { existsSync } from 'node:fs'
+import { type TransferListItem } from 'node:worker_threads'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -27,12 +28,13 @@ import {
   PoolTypes,
   type TasksQueueOptions
 } from './pool'
-import type {
-  IWorker,
-  IWorkerNode,
-  WorkerInfo,
-  WorkerType,
-  WorkerUsage
+import {
+  type IWorker,
+  type IWorkerNode,
+  type WorkerInfo,
+  type WorkerType,
+  WorkerTypes,
+  type WorkerUsage
 } from './worker'
 import {
   type MeasurementStatisticsRequirements,
@@ -640,14 +642,25 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data, name?: string): Promise<Response> {
+  public async execute (
+    data?: Data,
+    name?: string,
+    transferList?: TransferListItem[]
+  ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
+      if (name != null && typeof name !== 'string') {
+        reject(new TypeError('name argument must be a string'))
+      }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+      }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        transferList,
         timestamp,
         workerId: this.getWorkerInfo(workerNodeKey).id as number,
         taskId: randomUUID()
@@ -850,10 +863,12 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param message - The message.
+   * @param transferList - The optional array of transferable objects.
    */
   protected abstract sendToWorker (
     workerNodeKey: number,
-    message: MessageValue<Data>
+    message: MessageValue<Data>,
+    transferList?: TransferListItem[]
   ): void
 
   /**
@@ -1156,7 +1171,13 @@ export abstract class AbstractPool<
    */
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
-    this.sendToWorker(workerNodeKey, task)
+    this.sendToWorker(
+      workerNodeKey,
+      task,
+      this.worker === WorkerTypes.thread && task.transferList != null
+        ? task.transferList
+        : undefined
+    )
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
index 45915e5581085b6cced9f5d51635205ee9da4556..1a05f633250adb8e88c72a63cda565c3265205bc 100644 (file)
@@ -1,4 +1,5 @@
 import { EventEmitter } from 'node:events'
+import { type TransferListItem } from 'node:worker_threads'
 import type {
   ErrorHandler,
   ExitHandler,
@@ -192,11 +193,16 @@ export interface IPool<
   /**
    * Executes the specified function in the worker constructor with the task data input parameter.
    *
-   * @param data - The task input data for the specified task function. This can only be structured-cloneable data.
-   * @param name - The name of the task function to execute. If not specified, the default task function will be executed.
+   * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
+   * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
+   * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the pool's worker_threads worker and they should not be used in the main thread afterwards.
    * @returns Promise that will be fulfilled when the task is completed.
    */
-  readonly execute: (data?: Data, name?: string) => Promise<Response>
+  readonly execute: (
+    data?: Data,
+    name?: string,
+    transferList?: TransferListItem[]
+  ) => Promise<Response>
   /**
    * Terminates all workers in this pool.
    */
index 7199baf8fdf5ad52bc43bcfdfe107de84719589a..2e708d973891be533e0d66f632ee5e4d983fed7a 100644 (file)
@@ -2,6 +2,7 @@ import {
   type MessageChannel,
   type MessagePort,
   SHARE_ENV,
+  type TransferListItem,
   Worker,
   type WorkerOptions,
   isMainThread
@@ -75,11 +76,12 @@ export class FixedThreadPool<
   /** @inheritDoc */
   protected sendToWorker (
     workerNodeKey: number,
-    message: MessageValue<Data>
+    message: MessageValue<Data>,
+    transferList?: TransferListItem[]
   ): void {
     (
       this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel
-    ).port1.postMessage(message)
+    ).port1.postMessage(message, transferList)
   }
 
   /** @inheritDoc */
index 123b5d6ea4fa397982c6195de0d9ccd95ba0c1df..d6869557d5af04ec06444e320d1fe6cac41cadc0 100644 (file)
@@ -1,5 +1,5 @@
 import type { EventLoopUtilization } from 'node:perf_hooks'
-import type { MessagePort } from 'node:worker_threads'
+import type { MessagePort, TransferListItem } from 'node:worker_threads'
 import type { KillBehavior } from './worker/worker-options'
 
 /**
@@ -75,6 +75,10 @@ export interface Task<Data = unknown> {
    * Task input data that will be passed to the worker.
    */
   readonly data?: Data
+  /**
+   * Array of transferable objects.
+   */
+  readonly transferList?: TransferListItem[]
   /**
    * Timestamp.
    */