From 7d91a8cd5c644cf409ec7a174d72ec84fcc3e4f7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 11 Aug 2023 23:48:49 +0200 Subject: [PATCH] feat: add transferable objects support to worker_threads pool MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++++ docs/api.md | 5 +++-- src/pools/abstract-pool.ts | 39 +++++++++++++++++++++++++++++--------- src/pools/pool.ts | 12 +++++++++--- src/pools/thread/fixed.ts | 6 ++++-- src/utility-types.ts | 6 +++++- 6 files changed, 55 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34007bc5..95fbef8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add array of transferable objects to the `execute()` method arguments. + ## [2.6.23] - 2023-08-11 ### Fixed diff --git a/docs/api.md b/docs/api.md index 6f66d038..9662c298 100644 --- a/docs/api.md +++ b/docs/api.md @@ -5,7 +5,7 @@ - [Pool](#pool) - [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts) - [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts) - - [`pool.execute(data, name)`](#poolexecutedata-name) + - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist) - [`pool.destroy()`](#pooldestroy) - [`PoolOptions`](#pooloptions) - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions) @@ -33,10 +33,11 @@ `filePath` (mandatory) Path to a file with a worker implementation `opts` (optional) An object with the pool options properties described below -### `pool.execute(data, name)` +### `pool.execute(data, name, transferList)` `data` (optional) An object that you want to pass to your worker implementation `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'` +`transferList` (optional) An array of transferable objects that you want to transfer to your [worker_threads](https://nodejs.org/api/worker_threads.html) worker implementation This method is available on both pool implementations and returns a promise with the task function execution response. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 91cb5fac..eb47bc2f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import { existsSync } from 'node:fs' +import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, @@ -27,12 +28,13 @@ import { PoolTypes, type TasksQueueOptions } from './pool' -import type { - IWorker, - IWorkerNode, - WorkerInfo, - WorkerType, - WorkerUsage +import { + type IWorker, + type IWorkerNode, + type WorkerInfo, + type WorkerType, + WorkerTypes, + type WorkerUsage } from './worker' import { type MeasurementStatisticsRequirements, @@ -640,14 +642,25 @@ export abstract class AbstractPool< } /** @inheritDoc */ - public async execute (data?: Data, name?: string): Promise { + public async execute ( + data?: Data, + name?: string, + transferList?: TransferListItem[] + ): Promise { return await new Promise((resolve, reject) => { + if (name != null && typeof name !== 'string') { + reject(new TypeError('name argument must be a string')) + } + if (transferList != null && !Array.isArray(transferList)) { + reject(new TypeError('transferList argument must be an array')) + } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + transferList, timestamp, workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() @@ -850,10 +863,12 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. * @param message - The message. + * @param transferList - The optional array of transferable objects. */ protected abstract sendToWorker ( workerNodeKey: number, - message: MessageValue + message: MessageValue, + transferList?: TransferListItem[] ): void /** @@ -1156,7 +1171,13 @@ export abstract class AbstractPool< */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(workerNodeKey, task) + this.sendToWorker( + workerNodeKey, + task, + this.worker === WorkerTypes.thread && task.transferList != null + ? task.transferList + : undefined + ) } private enqueueTask (workerNodeKey: number, task: Task): number { diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 45915e55..1a05f633 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,4 +1,5 @@ import { EventEmitter } from 'node:events' +import { type TransferListItem } from 'node:worker_threads' import type { ErrorHandler, ExitHandler, @@ -192,11 +193,16 @@ export interface IPool< /** * Executes the specified function in the worker constructor with the task data input parameter. * - * @param data - The task input data for the specified task function. This can only be structured-cloneable data. - * @param name - The name of the task function to execute. If not specified, the default task function will be executed. + * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data. + * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed. + * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the pool's worker_threads worker and they should not be used in the main thread afterwards. * @returns Promise that will be fulfilled when the task is completed. */ - readonly execute: (data?: Data, name?: string) => Promise + readonly execute: ( + data?: Data, + name?: string, + transferList?: TransferListItem[] + ) => Promise /** * Terminates all workers in this pool. */ diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 7199baf8..2e708d97 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -2,6 +2,7 @@ import { type MessageChannel, type MessagePort, SHARE_ENV, + type TransferListItem, Worker, type WorkerOptions, isMainThread @@ -75,11 +76,12 @@ export class FixedThreadPool< /** @inheritDoc */ protected sendToWorker ( workerNodeKey: number, - message: MessageValue + message: MessageValue, + transferList?: TransferListItem[] ): void { ( this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel - ).port1.postMessage(message) + ).port1.postMessage(message, transferList) } /** @inheritDoc */ diff --git a/src/utility-types.ts b/src/utility-types.ts index 123b5d6e..d6869557 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,5 +1,5 @@ import type { EventLoopUtilization } from 'node:perf_hooks' -import type { MessagePort } from 'node:worker_threads' +import type { MessagePort, TransferListItem } from 'node:worker_threads' import type { KillBehavior } from './worker/worker-options' /** @@ -75,6 +75,10 @@ export interface Task { * Task input data that will be passed to the worker. */ readonly data?: Data + /** + * Array of transferable objects. + */ + readonly transferList?: TransferListItem[] /** * Timestamp. */ -- 2.34.1