feat: add mapExecute() helper for bulk tasks execution
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 11 Jun 2024 14:49:55 +0000 (16:49 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 11 Jun 2024 14:49:55 +0000 (16:49 +0200)
reference #2364

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
docs/api.md
examples/typescript/http-server-pool/fastify-worker_threads/src/main.ts
src/pools/abstract-pool.ts
src/pools/pool.ts

index 28e6aad2c3b56bf529890aad889c9029e4b69777..513f62c53a450dc6279e7807fcbd855fc63b525a 100644 (file)
 
 This method is available on both pool implementations and returns a promise with the task function execution response.
 
+### `pool.mapExecute(data, name, transferList)`
+
+`data` Iterable objects that you want to pass to your worker task function implementation.  
+`name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`  
+`transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation.
+
+This method is available on both pool implementations and returns a promise with the task function execution responses array.
+
 ### `pool.start()`
 
 This method is available on both pool implementations and will start the minimum number of workers.
index b9aba792c05a8e2459eecfac5a93457a4185da9a..d30e913cb8b7ca47cb6f8993cefc28ee6a177803 100644 (file)
@@ -17,7 +17,6 @@ const fastify = Fastify({
 
 const workerFile = join(
   dirname(fileURLToPath(import.meta.url)),
-  // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
   `worker${extname(fileURLToPath(import.meta.url))}`
 )
 
index 3b676faa7a4f28309743d38328ab0e35ab80818a..8dedf2052baa90603be809d488639f0e9cb256fd 100644 (file)
@@ -1225,6 +1225,18 @@ export abstract class AbstractPool<
     })
   }
 
+
+  /** @inheritDoc */
+  public mapExecute (
+    data: Iterable<Data>,
+    name?: string,
+    transferList?: readonly TransferListItem[]
+  ): Promise<Response[]> {
+    return Promise.all(
+      [...data].map(data => this.execute(data, name, transferList))
+    )
+  }
+
   /**
    * Starts the minimum number of workers.
    * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
index c2d5498753c61a385fa43bb2a68163b305ad03a2..0df7549ace8ba8589e69fae51b3e9fd4d237736f 100644 (file)
@@ -279,13 +279,25 @@ export interface IPool<
    * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
    * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
    * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
-   * @returns Promise that will be fulfilled when the task is completed.
+   * @returns Promise with a task function response that will be fulfilled when the task is completed.
    */
   readonly execute: (
     data?: Data,
     name?: string,
     transferList?: readonly TransferListItem[]
   ) => Promise<Response>
+  /**
+   * Executes the specified function in the worker constructor with the tasks data iterable input parameter.
+   * @param data - The tasks iterable input data for the specified task function. This can only be an iterable of structured-cloneable data.
+   * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
+   * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
+   * @returns Promise with an array of task function responses that will be fulfilled when the tasks are completed.
+   */
+  readonly mapExecute: (
+    data: Iterable<Data>,
+    name?: string,
+    transferList?: readonly TransferListItem[]
+  ) => Promise<Response[]>
   /**
    * Starts the minimum number of workers in this pool.
    */