From d07983740c0d4bec954b7ec29f7f7f11e6549658 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 11 Jun 2024 16:49:55 +0200 Subject: [PATCH] feat: add mapExecute() helper for bulk tasks execution MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit reference #2364 Signed-off-by: Jérôme Benoit --- docs/api.md | 8 ++++++++ .../fastify-worker_threads/src/main.ts | 1 - src/pools/abstract-pool.ts | 12 ++++++++++++ src/pools/pool.ts | 14 +++++++++++++- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/docs/api.md b/docs/api.md index 28e6aad2..513f62c5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -45,6 +45,14 @@ This method is available on both pool implementations and returns a promise with the task function execution response. +### `pool.mapExecute(data, name, transferList)` + +`data` Iterable objects that you want to pass to your worker task function implementation. +`name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'` +`transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation. + +This method is available on both pool implementations and returns a promise with the task function execution responses array. + ### `pool.start()` This method is available on both pool implementations and will start the minimum number of workers. diff --git a/examples/typescript/http-server-pool/fastify-worker_threads/src/main.ts b/examples/typescript/http-server-pool/fastify-worker_threads/src/main.ts index b9aba792..d30e913c 100644 --- a/examples/typescript/http-server-pool/fastify-worker_threads/src/main.ts +++ b/examples/typescript/http-server-pool/fastify-worker_threads/src/main.ts @@ -17,7 +17,6 @@ const fastify = Fastify({ const workerFile = join( dirname(fileURLToPath(import.meta.url)), - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `worker${extname(fileURLToPath(import.meta.url))}` ) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3b676faa..8dedf205 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1225,6 +1225,18 @@ export abstract class AbstractPool< }) } + + /** @inheritDoc */ + public mapExecute ( + data: Iterable, + name?: string, + transferList?: readonly TransferListItem[] + ): Promise { + return Promise.all( + [...data].map(data => this.execute(data, name, transferList)) + ) + } + /** * Starts the minimum number of workers. * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false diff --git a/src/pools/pool.ts b/src/pools/pool.ts index c2d54987..0df7549a 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -279,13 +279,25 @@ export interface IPool< * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data. * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed. * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. - * @returns Promise that will be fulfilled when the task is completed. + * @returns Promise with a task function response that will be fulfilled when the task is completed. */ readonly execute: ( data?: Data, name?: string, transferList?: readonly TransferListItem[] ) => Promise + /** + * Executes the specified function in the worker constructor with the tasks data iterable input parameter. + * @param data - The tasks iterable input data for the specified task function. This can only be an iterable of structured-cloneable data. + * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed. + * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. + * @returns Promise with an array of task function responses that will be fulfilled when the tasks are completed. + */ + readonly mapExecute: ( + data: Iterable, + name?: string, + transferList?: readonly TransferListItem[] + ) => Promise /** * Starts the minimum number of workers in this pool. */ -- 2.34.1