From: Jérôme Benoit Date: Mon, 18 Dec 2023 21:20:41 +0000 (+0100) Subject: fix: fix pool destroying with tasks queuing enabled X-Git-Tag: v3.1.6~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=85b2561d52f5fd8a5f6d47e21f9f63fbeeaa8e6b;p=poolifier.git fix: fix pool destroying with tasks queuing enabled Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index d7568294..b229afa6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix pool destroying with tasks queuing enabled. + ## [3.1.5] - 2023-12-18 ### Added diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 92a29b27..11cc9cc1 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1005,6 +1005,10 @@ export abstract class AbstractPool< workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { + if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { + reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) + return + } const killMessageListener = (message: MessageValue): void => { this.checkMessageWorkerId(message) if (message.kill === 'success') { @@ -1706,7 +1710,7 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) this.promiseResponseMap.delete(taskId as string) workerNode?.emit('taskFinished', taskId) - if (this.opts.enableTasksQueue === true) { + if (this.opts.enableTasksQueue === true && !this.destroying) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 1b80aa4c..84935385 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1278,6 +1278,63 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that destroy() waits for queued tasks to finish', async () => { + const tasksFinishedTimeout = 2500 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier) + expect(elapsedTime).toBeGreaterThanOrEqual(2000) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout) + }) + + it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { + const tasksFinishedTimeout = 1000 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(0) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100) + }) + it('Verify that pool asynchronous resource track tasks execution', async () => { let taskAsyncId let initCalls = 0 @@ -1613,6 +1670,11 @@ describe('Abstract pool test suite', () => { await expect( pool.sendKillMessageToWorker(workerNodeKey) ).resolves.toBeUndefined() + await expect( + pool.sendKillMessageToWorker(numberOfWorkers) + ).rejects.toStrictEqual( + new Error(`Invalid worker node key '${numberOfWorkers}'`) + ) await pool.destroy() })