feat: add queued tasks end timeout support to worker node termination
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Dec 2023 16:13:26 +0000 (17:13 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Dec 2023 16:13:26 +0000 (17:13 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/utils.ts
tests/pools/abstract-pool.test.mjs
tests/pools/utils.test.mjs

index 94ca6454774420bc909af3367ea4c7d7e53f34f9..22af662020e7a0442015c71f95cfd93d6b210df8 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Add queued tasks end timeout support to worker node termination.
+
 ## [3.1.4] - 2023-12-18
 
 ### Fixed
@@ -23,7 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ### Fixed
 
-- Wait for queued tasks to end at worker termination.
+- Wait for queued tasks to end at worker node termination.
 
 ## [3.1.1] - 2023-12-16
 
index 29ae73799858356f87d10d197e20c9c67a7fe9b5..968d6ab5c31e2bcdaaaf82845f24981ac54f8122 100644 (file)
@@ -136,8 +136,9 @@ An object with these properties:
   - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
   - `taskStealing` (optional) - Task stealing enablement on idle.
   - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure.
+  - `tasksFinishedTimeout` (optional) - Queued tasks finished timeout in milliseconds at worker termination.
 
-  Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
+  Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true, tasksFinishedTimeout: 1000 }`
 
 - `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details.
 
index 670d490daa18c0cd4e3e03217df69275aa35b824..92a29b27c3b1b35f5d205a337fd4420f5976b8ee 100644 (file)
@@ -55,6 +55,7 @@ import {
   checkFilePath,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
+  getDefaultTasksQueueOptions,
   updateEluWorkerUsage,
   updateRunTimeWorkerUsage,
   updateTaskStatisticsWorkerUsage,
@@ -519,18 +520,6 @@ export abstract class AbstractPool<
     }
   }
 
-  /**
-   * Gets the given worker its worker node key.
-   *
-   * @param worker - The worker.
-   * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
-   */
-  private getWorkerNodeKeyByWorker (worker: Worker): number {
-    return this.workerNodes.findIndex(
-      workerNode => workerNode.worker === worker
-    )
-  }
-
   /**
    * Gets the worker node key given its worker id.
    *
@@ -618,12 +607,7 @@ export abstract class AbstractPool<
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {
     return {
-      ...{
-        size: Math.pow(this.maxSize, 2),
-        concurrency: 1,
-        taskStealing: true,
-        tasksStealingOnBackPressure: true
-      },
+      ...getDefaultTasksQueueOptions(this.maxSize),
       ...tasksQueueOptions
     }
   }
@@ -1050,7 +1034,13 @@ export abstract class AbstractPool<
     this.flagWorkerNodeAsNotReady(workerNodeKey)
     const flushedTasks = this.flushTasksQueue(workerNodeKey)
     const workerNode = this.workerNodes[workerNodeKey]
-    await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
+    await waitWorkerNodeEvents(
+      workerNode,
+      'taskFinished',
+      flushedTasks,
+      this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+        getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+    )
     await this.sendKillMessageToWorker(workerNodeKey)
     await workerNode.terminate()
   }
@@ -1257,7 +1247,7 @@ export abstract class AbstractPool<
       if (this.started && this.opts.enableTasksQueue === true) {
         this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
-      workerNode.terminate().catch(error => {
+      workerNode?.terminate().catch(error => {
         this.emitter?.emit(PoolEvents.error, error)
       })
     })
@@ -1433,6 +1423,9 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
+    if (workerNodeKey === -1) {
+      return
+    }
     if (this.workerNodes.length <= 1) {
       return
     }
@@ -1782,7 +1775,8 @@ export abstract class AbstractPool<
         env: this.opts.env,
         workerOptions: this.opts.workerOptions,
         tasksQueueBackPressureSize:
-          this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+          this.opts.tasksQueueOptions?.size ??
+          getDefaultTasksQueueOptions(this.maxSize).size
       }
     )
     // Flag the worker node as ready at pool startup.
index 7bc8cf850b105983907d7b74cbab618121f8560b..6293efb6edf8e89026242b9f6fce20dc9fe006d2 100644 (file)
@@ -122,6 +122,12 @@ export interface TasksQueueOptions {
    * @defaultValue true
    */
   readonly tasksStealingOnBackPressure?: boolean
+  /**
+   * Queued tasks finished timeout in milliseconds at worker node termination.
+   *
+   * @defaultValue 1000
+   */
+  readonly tasksFinishedTimeout?: number
 }
 
 /**
index ae09e8a9f5f7a5a0b2e22a1266c84534e4329e41..a6a52f0c663d246fcd4d2d5a1b560801eaa00921 100644 (file)
@@ -21,6 +21,18 @@ import {
 } from './worker'
 import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 
+export const getDefaultTasksQueueOptions = (
+  poolMaxSize: number
+): Required<TasksQueueOptions> => {
+  return {
+    size: Math.pow(poolMaxSize, 2),
+    concurrency: 1,
+    taskStealing: true,
+    tasksStealingOnBackPressure: true,
+    tasksFinishedTimeout: 1000
+  }
+}
+
 export const checkFilePath = (filePath: string): void => {
   if (filePath == null) {
     throw new TypeError('The worker file path must be specified')
@@ -324,7 +336,8 @@ export const waitWorkerNodeEvents = async <
 >(
   workerNode: IWorkerNode<Worker, Data>,
   workerNodeEvent: string,
-  numberOfEventsToWait: number
+  numberOfEventsToWait: number,
+  timeout: number
 ): Promise<number> => {
   return await new Promise<number>(resolve => {
     let events = 0
@@ -338,5 +351,10 @@ export const waitWorkerNodeEvents = async <
         resolve(events)
       }
     })
+    if (timeout > 0) {
+      setTimeout(() => {
+        resolve(events)
+      }, timeout)
+    }
   })
 }
index 21cf10c8ebab4ae298e2efb98e0b0f9921f1791e..cc4b0f387993f1a414a8b380dfc69ea54700b670 100644 (file)
@@ -264,7 +264,8 @@ describe('Abstract pool test suite', () => {
         concurrency: 2,
         size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
-        tasksStealingOnBackPressure: true
+        tasksStealingOnBackPressure: true,
+        tasksFinishedTimeout: 1000
       },
       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
       workerChoiceStrategyOptions: {
@@ -654,7 +655,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 1000
     })
     pool.enableTasksQueue(true, { concurrency: 2 })
     expect(pool.opts.enableTasksQueue).toBe(true)
@@ -662,7 +664,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 2,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 1000
     })
     pool.enableTasksQueue(false)
     expect(pool.opts.enableTasksQueue).toBe(false)
@@ -680,7 +683,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 1000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -691,13 +695,15 @@ describe('Abstract pool test suite', () => {
       concurrency: 2,
       size: 2,
       taskStealing: false,
-      tasksStealingOnBackPressure: false
+      tasksStealingOnBackPressure: false,
+      tasksFinishedTimeout: 2000
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
       size: 2,
       taskStealing: false,
-      tasksStealingOnBackPressure: false
+      tasksStealingOnBackPressure: false,
+      tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -713,7 +719,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 1000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
index 5feb6596296914252a94231e5de19132968a832f..8c0a2b6ccf2f0502b915f985512a1624b1c730e0 100644 (file)
@@ -7,11 +7,23 @@ import {
 } from '../../lib/circular-array.js'
 import {
   createWorker,
+  getDefaultTasksQueueOptions,
   updateMeasurementStatistics
 } from '../../lib/pools/utils.js'
 import { WorkerTypes } from '../../lib/index.js'
 
 describe('Pool utils test suite', () => {
+  it('Verify getDefaultTasksQueueOptions() behavior', () => {
+    const poolMaxSize = 4
+    expect(getDefaultTasksQueueOptions(poolMaxSize)).toStrictEqual({
+      concurrency: 1,
+      size: Math.pow(poolMaxSize, 2),
+      taskStealing: true,
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 1000
+    })
+  })
+
   it('Verify updateMeasurementStatistics() behavior', () => {
     const measurementStatistics = {
       history: new CircularArray()