chore: v2.6.24
[poolifier.git] / src / pools / abstract-pool.ts
index aec731a5c189903b46230ceceeb831768fa62425..b48608372574bc5c5af41332e9d85601a8224c95 100644 (file)
@@ -1,6 +1,7 @@
 import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
 import { existsSync } from 'node:fs'
+import { type TransferListItem } from 'node:worker_threads'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -615,28 +616,50 @@ export abstract class AbstractPool<
   protected abstract get busy (): boolean
 
   /**
-   * Whether worker nodes are executing at least one task.
+   * Whether worker nodes are executing concurrently their tasks quota or not.
    *
    * @returns Worker nodes busyness boolean status.
    */
   protected internalBusy (): boolean {
-    return (
-      this.workerNodes.findIndex(
-        workerNode =>
-          workerNode.info.ready && workerNode.usage.tasks.executing === 0
-      ) === -1
-    )
+    if (this.opts.enableTasksQueue === true) {
+      return (
+        this.workerNodes.findIndex(
+          workerNode =>
+            workerNode.info.ready &&
+            workerNode.usage.tasks.executing <
+              (this.opts.tasksQueueOptions?.concurrency as number)
+        ) === -1
+      )
+    } else {
+      return (
+        this.workerNodes.findIndex(
+          workerNode =>
+            workerNode.info.ready && workerNode.usage.tasks.executing === 0
+        ) === -1
+      )
+    }
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data, name?: string): Promise<Response> {
+  public async execute (
+    data?: Data,
+    name?: string,
+    transferList?: TransferListItem[]
+  ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
+      if (name != null && typeof name !== 'string') {
+        reject(new TypeError('name argument must be a string'))
+      }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+      }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        transferList,
         timestamp,
         workerId: this.getWorkerInfo(workerNodeKey).id as number,
         taskId: randomUUID()
@@ -839,10 +862,12 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param message - The message.
+   * @param transferList - The optional array of transferable objects.
    */
   protected abstract sendToWorker (
     workerNodeKey: number,
-    message: MessageValue<Data>
+    message: MessageValue<Data>,
+    transferList?: TransferListItem[]
   ): void
 
   /**
@@ -1145,7 +1170,7 @@ export abstract class AbstractPool<
    */
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
-    this.sendToWorker(workerNodeKey, task)
+    this.sendToWorker(workerNodeKey, task, task.transferList)
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {