## [Unreleased]
+### Added
+
+- Add array of transferable objects to the `execute()` method arguments.
+
## [2.6.23] - 2023-08-11
### Fixed
- [Pool](#pool)
- [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
- [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
- - [`pool.execute(data, name)`](#poolexecutedata-name)
+ - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
- [`pool.destroy()`](#pooldestroy)
- [`PoolOptions`](#pooloptions)
- [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
`filePath` (mandatory) Path to a file with a worker implementation
`opts` (optional) An object with the pool options properties described below
-### `pool.execute(data, name)`
+### `pool.execute(data, name, transferList)`
`data` (optional) An object that you want to pass to your worker implementation
`name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`
+`transferList` (optional) An array of transferable objects that you want to transfer to your [worker_threads](https://nodejs.org/api/worker_threads.html) worker implementation
This method is available on both pool implementations and returns a promise with the task function execution response.
import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
import { existsSync } from 'node:fs'
+import { type TransferListItem } from 'node:worker_threads'
import type {
MessageValue,
PromiseResponseWrapper,
PoolTypes,
type TasksQueueOptions
} from './pool'
-import type {
- IWorker,
- IWorkerNode,
- WorkerInfo,
- WorkerType,
- WorkerUsage
+import {
+ type IWorker,
+ type IWorkerNode,
+ type WorkerInfo,
+ type WorkerType,
+ WorkerTypes,
+ type WorkerUsage
} from './worker'
import {
type MeasurementStatisticsRequirements,
}
/** @inheritDoc */
- public async execute (data?: Data, name?: string): Promise<Response> {
+ public async execute (
+ data?: Data,
+ name?: string,
+ transferList?: TransferListItem[]
+ ): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
+ if (name != null && typeof name !== 'string') {
+ reject(new TypeError('name argument must be a string'))
+ }
+ if (transferList != null && !Array.isArray(transferList)) {
+ reject(new TypeError('transferList argument must be an array'))
+ }
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
+ transferList,
timestamp,
workerId: this.getWorkerInfo(workerNodeKey).id as number,
taskId: randomUUID()
*
* @param workerNodeKey - The worker node key.
* @param message - The message.
+ * @param transferList - The optional array of transferable objects.
*/
protected abstract sendToWorker (
workerNodeKey: number,
- message: MessageValue<Data>
+ message: MessageValue<Data>,
+ transferList?: TransferListItem[]
): void
/**
*/
private executeTask (workerNodeKey: number, task: Task<Data>): void {
this.beforeTaskExecutionHook(workerNodeKey, task)
- this.sendToWorker(workerNodeKey, task)
+ this.sendToWorker(
+ workerNodeKey,
+ task,
+ this.worker === WorkerTypes.thread && task.transferList != null
+ ? task.transferList
+ : undefined
+ )
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
import { EventEmitter } from 'node:events'
+import { type TransferListItem } from 'node:worker_threads'
import type {
ErrorHandler,
ExitHandler,
/**
* Executes the specified function in the worker constructor with the task data input parameter.
*
- * @param data - The task input data for the specified task function. This can only be structured-cloneable data.
- * @param name - The name of the task function to execute. If not specified, the default task function will be executed.
+ * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
+ * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
+ * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the pool's worker_threads worker and they should not be used in the main thread afterwards.
* @returns Promise that will be fulfilled when the task is completed.
*/
- readonly execute: (data?: Data, name?: string) => Promise<Response>
+ readonly execute: (
+ data?: Data,
+ name?: string,
+ transferList?: TransferListItem[]
+ ) => Promise<Response>
/**
* Terminates all workers in this pool.
*/
type MessageChannel,
type MessagePort,
SHARE_ENV,
+ type TransferListItem,
Worker,
type WorkerOptions,
isMainThread
/** @inheritDoc */
protected sendToWorker (
workerNodeKey: number,
- message: MessageValue<Data>
+ message: MessageValue<Data>,
+ transferList?: TransferListItem[]
): void {
(
this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel
- ).port1.postMessage(message)
+ ).port1.postMessage(message, transferList)
}
/** @inheritDoc */
import type { EventLoopUtilization } from 'node:perf_hooks'
-import type { MessagePort } from 'node:worker_threads'
+import type { MessagePort, TransferListItem } from 'node:worker_threads'
import type { KillBehavior } from './worker/worker-options'
/**
* Task input data that will be passed to the worker.
*/
readonly data?: Data
+ /**
+ * Array of transferable objects.
+ */
+ readonly transferList?: TransferListItem[]
/**
* Timestamp.
*/