docs: add changelog entry
[poolifier.git] / src / pools / abstract-pool.ts
index 670d490daa18c0cd4e3e03217df69275aa35b824..11cc9cc15ef419346827e5f627150f04e73ba158 100644 (file)
@@ -55,6 +55,7 @@ import {
   checkFilePath,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
+  getDefaultTasksQueueOptions,
   updateEluWorkerUsage,
   updateRunTimeWorkerUsage,
   updateTaskStatisticsWorkerUsage,
@@ -519,18 +520,6 @@ export abstract class AbstractPool<
     }
   }
 
-  /**
-   * Gets the given worker its worker node key.
-   *
-   * @param worker - The worker.
-   * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
-   */
-  private getWorkerNodeKeyByWorker (worker: Worker): number {
-    return this.workerNodes.findIndex(
-      workerNode => workerNode.worker === worker
-    )
-  }
-
   /**
    * Gets the worker node key given its worker id.
    *
@@ -618,12 +607,7 @@ export abstract class AbstractPool<
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {
     return {
-      ...{
-        size: Math.pow(this.maxSize, 2),
-        concurrency: 1,
-        taskStealing: true,
-        tasksStealingOnBackPressure: true
-      },
+      ...getDefaultTasksQueueOptions(this.maxSize),
       ...tasksQueueOptions
     }
   }
@@ -1021,6 +1005,10 @@ export abstract class AbstractPool<
     workerNodeKey: number
   ): Promise<void> {
     await new Promise<void>((resolve, reject) => {
+      if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
+        reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+        return
+      }
       const killMessageListener = (message: MessageValue<Response>): void => {
         this.checkMessageWorkerId(message)
         if (message.kill === 'success') {
@@ -1050,7 +1038,13 @@ export abstract class AbstractPool<
     this.flagWorkerNodeAsNotReady(workerNodeKey)
     const flushedTasks = this.flushTasksQueue(workerNodeKey)
     const workerNode = this.workerNodes[workerNodeKey]
-    await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
+    await waitWorkerNodeEvents(
+      workerNode,
+      'taskFinished',
+      flushedTasks,
+      this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+        getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+    )
     await this.sendKillMessageToWorker(workerNodeKey)
     await workerNode.terminate()
   }
@@ -1257,7 +1251,7 @@ export abstract class AbstractPool<
       if (this.started && this.opts.enableTasksQueue === true) {
         this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
-      workerNode.terminate().catch(error => {
+      workerNode?.terminate().catch(error => {
         this.emitter?.emit(PoolEvents.error, error)
       })
     })
@@ -1433,6 +1427,9 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
+    if (workerNodeKey === -1) {
+      return
+    }
     if (this.workerNodes.length <= 1) {
       return
     }
@@ -1713,7 +1710,7 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.promiseResponseMap.delete(taskId as string)
       workerNode?.emit('taskFinished', taskId)
-      if (this.opts.enableTasksQueue === true) {
+      if (this.opts.enableTasksQueue === true && !this.destroying) {
         const workerNodeTasksUsage = workerNode.usage.tasks
         if (
           this.tasksQueueSize(workerNodeKey) > 0 &&
@@ -1782,7 +1779,8 @@ export abstract class AbstractPool<
         env: this.opts.env,
         workerOptions: this.opts.workerOptions,
         tasksQueueBackPressureSize:
-          this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+          this.opts.tasksQueueOptions?.size ??
+          getDefaultTasksQueueOptions(this.maxSize).size
       }
     )
     // Flag the worker node as ready at pool startup.