From ff733df7ebd4813628b27a5d27d509163e12af84 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 9 Apr 2023 11:51:08 +0200 Subject: [PATCH] feat: add option to enable worker tasks queue MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 2 +- README.md | 1 + package.json | 2 +- pnpm-lock.yaml | 8 +- src/pools/abstract-pool.ts | 90 +++++++++++++------ src/pools/pool.ts | 7 ++ .../selection-strategies-types.ts | 7 ++ tests/pools/abstract/abstract-pool.test.js | 3 + 8 files changed, 87 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b515d21..f00889ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Use monotonic high resolution timer for worker tasks run time. - Add worker tasks median run time to statistics. -- Add worker tasks queue. +- Add worker tasks queue (experimental). ## [2.4.4] - 2023-04-07 diff --git a/README.md b/README.md index 49f24ef5..13c8c7fe 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,7 @@ Node versions >= 16.x are supported. Default: `WorkerChoiceStrategies.ROUND_ROBIN` - `enableEvents` (optional) - Events emission enablement in this pool. Default: true +- `enableTasksQueue` (optional, experimental) - Tasks queue per worker enablement in this pool. Default: false ### `pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)` diff --git a/package.json b/package.json index c7942807..3529879b 100644 --- a/package.json +++ b/package.json @@ -111,7 +111,7 @@ "eslint-define-config": "^1.17.0", "eslint-import-resolver-typescript": "^3.5.5", "eslint-plugin-import": "^2.27.5", - "eslint-plugin-jsdoc": "^40.1.1", + "eslint-plugin-jsdoc": "^40.1.2", "eslint-plugin-n": "^15.7.0", "eslint-plugin-promise": "^6.1.1", "eslint-plugin-spellcheck": "^0.0.20", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 160db83f..21fb6776 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -53,8 +53,8 @@ devDependencies: specifier: ^2.27.5 version: 2.27.5(@typescript-eslint/parser@5.57.1)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0) eslint-plugin-jsdoc: - specifier: ^40.1.1 - version: 40.1.1(eslint@8.38.0) + specifier: ^40.1.2 + version: 40.1.2(eslint@8.38.0) eslint-plugin-n: specifier: ^15.7.0 version: 15.7.0(eslint@8.38.0) @@ -2311,8 +2311,8 @@ packages: - supports-color dev: true - /eslint-plugin-jsdoc@40.1.1(eslint@8.38.0): - resolution: {integrity: sha512-KxrQCq9pPt7LNeDBlLlnuJMpDFZnEQTs4e25NrT4u5cWmPw2P7F03F2qwPz0GMdlRZTyMOofuPAdiWytvPubvA==} + /eslint-plugin-jsdoc@40.1.2(eslint@8.38.0): + resolution: {integrity: sha512-U4Kt42OVjF0EXOWPEc8pjanT8O1ULvILwgA5k87CnhrCKG4xaJ8Sjsb6CWgDtaemOywN06u86duKU1yMaBp7IQ==} engines: {node: ^14 || ^16 || ^17 || ^18 || ^19} peerDependencies: eslint: ^7.0.0 || ^8.0.0 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index fd862810..16688133 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -77,8 +77,7 @@ export abstract class AbstractPool< this.chooseWorkerNode.bind(this) this.internalExecute.bind(this) - this.checkAndEmitFull.bind(this) - this.checkAndEmitBusy.bind(this) + this.checkAndEmitEvents.bind(this) this.sendToWorker.bind(this) this.setupHook() @@ -129,6 +128,7 @@ export abstract class AbstractPool< opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) this.opts.enableEvents = opts.enableEvents ?? true + this.opts.enableTasksQueue = opts.enableTasksQueue ?? false } private checkValidWorkerChoiceStrategy ( @@ -145,10 +145,26 @@ export abstract class AbstractPool< public abstract get type (): PoolType /** - * Number of tasks concurrently running in the pool. + * Number of tasks running in the pool. */ private get numberOfRunningTasks (): number { - return this.promiseResponseMap.size + return this.workerNodes.reduce( + (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running, + 0 + ) + } + + /** + * Number of tasks queued in the pool. + */ + private get numberOfQueuedTasks (): number { + if (this.opts.enableTasksQueue === false) { + return 0 + } + return this.workerNodes.reduce( + (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length, + 0 + ) } /** @@ -219,17 +235,16 @@ export abstract class AbstractPool< id: crypto.randomUUID() } const res = this.internalExecute(workerNodeKey, workerNode, submittedTask) - let currentTask: Task - // FIXME: Add sensible conditions to start tasks queuing on the worker node - if (this.tasksQueueLength(workerNodeKey) > 0) { - currentTask = this.dequeueTask(workerNodeKey) as Task + let currentTask: Task = submittedTask + if ( + this.opts.enableTasksQueue === true && + (this.busy || this.tasksQueueLength(workerNodeKey) > 0) + ) { this.enqueueTask(workerNodeKey, submittedTask) - } else { - currentTask = submittedTask + currentTask = this.dequeueTask(workerNodeKey) as Task } this.sendToWorker(workerNode.worker, currentTask) - this.checkAndEmitFull() - this.checkAndEmitBusy() + this.checkAndEmitEvents() // eslint-disable-next-line @typescript-eslint/return-await return res } @@ -238,6 +253,7 @@ export abstract class AbstractPool< public async destroy (): Promise { await Promise.all( this.workerNodes.map(async workerNode => { + this.flushTasksQueueByWorker(workerNode.worker) await this.destroyWorker(workerNode.worker) }) ) @@ -329,7 +345,8 @@ export abstract class AbstractPool< (message.kill != null && this.getWorkerTasksUsage(workerCreated)?.running === 0) ) { - // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) + // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) + this.flushTasksQueueByWorker(workerCreated) void this.destroyWorker(workerCreated) } }) @@ -399,7 +416,7 @@ export abstract class AbstractPool< } /** - * This function is the listener registered for each worker. + * This function is the listener registered for each worker message. * * @returns The listener function to execute when a message is received from a worker. */ @@ -416,6 +433,16 @@ export abstract class AbstractPool< } this.afterPromiseResponseHook(promiseResponse.worker, message) this.promiseResponseMap.delete(message.id) + const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker) + if ( + this.opts.enableTasksQueue === true && + this.tasksQueueLength(workerNodeKey) > 0 + ) { + this.sendToWorker( + promiseResponse.worker, + this.dequeueTask(workerNodeKey) as Task + ) + } } } } @@ -436,19 +463,14 @@ export abstract class AbstractPool< }) } - private checkAndEmitBusy (): void { - if (this.opts.enableEvents === true && this.busy) { - this.emitter?.emit(PoolEvents.busy) - } - } - - private checkAndEmitFull (): void { - if ( - this.type === PoolType.DYNAMIC && - this.opts.enableEvents === true && - this.full - ) { - this.emitter?.emit(PoolEvents.full) + private checkAndEmitEvents (): void { + if (this.opts.enableEvents === true) { + if (this.busy) { + this.emitter?.emit(PoolEvents.busy) + } + if (this.type === PoolType.DYNAMIC && this.full) { + this.emitter?.emit(PoolEvents.full) + } } } @@ -531,4 +553,18 @@ export abstract class AbstractPool< protected tasksQueueLength (workerNodeKey: number): number { return this.workerNodes[workerNodeKey].tasksQueue.length } + + protected flushTasksQueue (workerNodeKey: number): void { + if (this.tasksQueueLength(workerNodeKey) > 0) { + for (const task of this.workerNodes[workerNodeKey].tasksQueue) { + this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) + } + this.workerNodes[workerNodeKey].tasksQueue = [] + } + } + + protected flushTasksQueueByWorker (worker: Worker): void { + const workerNodeKey = this.getWorkerNodeKey(worker) + this.flushTasksQueue(workerNodeKey) + } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 60aceb78..9d18ed70 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -55,6 +55,13 @@ export interface PoolOptions { * @defaultValue true */ enableEvents?: boolean + /** + * Pool worker tasks queue. + * + * @experimental + * @defaultValue false + */ + enableTasksQueue?: boolean } /** diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index f5f06a6e..0e1cf641 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -29,6 +29,13 @@ export const WorkerChoiceStrategies = Object.freeze({ */ export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies +/** + * Worker choice strategy options. + */ +export interface WorkerChoiceStrategyOptions { + medRunTime?: boolean +} + /** * Pool worker tasks usage statistics requirements. */ diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 54646e3a..5f3617db 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -87,6 +87,7 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.enableEvents).toBe(true) expect(pool.emitter).toBeDefined() + expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.ROUND_ROBIN ) @@ -102,6 +103,7 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED, enableEvents: false, + enableTasksQueue: true, messageHandler: testHandler, errorHandler: testHandler, onlineHandler: testHandler, @@ -110,6 +112,7 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.enableEvents).toBe(false) expect(pool.emitter).toBeUndefined() + expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.LESS_USED ) -- 2.34.1