From 32b141fddfba99a275b6e18b5abd97c7a66513be Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 18 Dec 2023 17:13:26 +0100 Subject: [PATCH] feat: add queued tasks end timeout support to worker node termination MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 6 ++++- docs/api.md | 3 ++- src/pools/abstract-pool.ts | 36 +++++++++++++----------------- src/pools/pool.ts | 6 +++++ src/pools/utils.ts | 20 ++++++++++++++++- tests/pools/abstract-pool.test.mjs | 21 +++++++++++------ tests/pools/utils.test.mjs | 12 ++++++++++ 7 files changed, 73 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94ca6454..22af6620 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add queued tasks end timeout support to worker node termination. + ## [3.1.4] - 2023-12-18 ### Fixed @@ -23,7 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Wait for queued tasks to end at worker termination. +- Wait for queued tasks to end at worker node termination. ## [3.1.1] - 2023-12-16 diff --git a/docs/api.md b/docs/api.md index 29ae7379..968d6ab5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -136,8 +136,9 @@ An object with these properties: - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer. - `taskStealing` (optional) - Task stealing enablement on idle. - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure. + - `tasksFinishedTimeout` (optional) - Queued tasks finished timeout in milliseconds at worker termination. - Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }` + Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true, tasksFinishedTimeout: 1000 }` - `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 670d490d..92a29b27 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -55,6 +55,7 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, + getDefaultTasksQueueOptions, updateEluWorkerUsage, updateRunTimeWorkerUsage, updateTaskStatisticsWorkerUsage, @@ -519,18 +520,6 @@ export abstract class AbstractPool< } } - /** - * Gets the given worker its worker node key. - * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. - */ - private getWorkerNodeKeyByWorker (worker: Worker): number { - return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker - ) - } - /** * Gets the worker node key given its worker id. * @@ -618,12 +607,7 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, + ...getDefaultTasksQueueOptions(this.maxSize), ...tasksQueueOptions } } @@ -1050,7 +1034,13 @@ export abstract class AbstractPool< this.flagWorkerNodeAsNotReady(workerNodeKey) const flushedTasks = this.flushTasksQueue(workerNodeKey) const workerNode = this.workerNodes[workerNodeKey] - await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks) + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout + ) await this.sendKillMessageToWorker(workerNodeKey) await workerNode.terminate() } @@ -1257,7 +1247,7 @@ export abstract class AbstractPool< if (this.started && this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode.terminate().catch(error => { + workerNode?.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1433,6 +1423,9 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + if (workerNodeKey === -1) { + return + } if (this.workerNodes.length <= 1) { return } @@ -1782,7 +1775,8 @@ export abstract class AbstractPool< env: this.opts.env, workerOptions: this.opts.workerOptions, tasksQueueBackPressureSize: - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions(this.maxSize).size } ) // Flag the worker node as ready at pool startup. diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 7bc8cf85..6293efb6 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -122,6 +122,12 @@ export interface TasksQueueOptions { * @defaultValue true */ readonly tasksStealingOnBackPressure?: boolean + /** + * Queued tasks finished timeout in milliseconds at worker node termination. + * + * @defaultValue 1000 + */ + readonly tasksFinishedTimeout?: number } /** diff --git a/src/pools/utils.ts b/src/pools/utils.ts index ae09e8a9..a6a52f0c 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -21,6 +21,18 @@ import { } from './worker' import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' +export const getDefaultTasksQueueOptions = ( + poolMaxSize: number +): Required => { + return { + size: Math.pow(poolMaxSize, 2), + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 1000 + } +} + export const checkFilePath = (filePath: string): void => { if (filePath == null) { throw new TypeError('The worker file path must be specified') @@ -324,7 +336,8 @@ export const waitWorkerNodeEvents = async < >( workerNode: IWorkerNode, workerNodeEvent: string, - numberOfEventsToWait: number + numberOfEventsToWait: number, + timeout: number ): Promise => { return await new Promise(resolve => { let events = 0 @@ -338,5 +351,10 @@ export const waitWorkerNodeEvents = async < resolve(events) } }) + if (timeout > 0) { + setTimeout(() => { + resolve(events) + }, timeout) + } }) } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 21cf10c8..cc4b0f38 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -264,7 +264,8 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 1000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { @@ -654,7 +655,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 1000 }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) @@ -662,7 +664,8 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 1000 }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) @@ -680,7 +683,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 1000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -691,13 +695,15 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -713,7 +719,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 1000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index 5feb6596..8c0a2b6c 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -7,11 +7,23 @@ import { } from '../../lib/circular-array.js' import { createWorker, + getDefaultTasksQueueOptions, updateMeasurementStatistics } from '../../lib/pools/utils.js' import { WorkerTypes } from '../../lib/index.js' describe('Pool utils test suite', () => { + it('Verify getDefaultTasksQueueOptions() behavior', () => { + const poolMaxSize = 4 + expect(getDefaultTasksQueueOptions(poolMaxSize)).toStrictEqual({ + concurrency: 1, + size: Math.pow(poolMaxSize, 2), + taskStealing: true, + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 1000 + }) + }) + it('Verify updateMeasurementStatistics() behavior', () => { const measurementStatistics = { history: new CircularArray() -- 2.34.1