From: Jérôme Benoit Date: Mon, 3 May 2021 07:48:11 +0000 (+0200) Subject: Fix busy event emission on fixed pool: (#332) X-Git-Tag: v2.0.2~6 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=14916bf9ec9e7e60b1c03e9b6d876fc23990afad;p=poolifier.git Fix busy event emission on fixed pool: (#332) Co-authored-by: Shinigami --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 257e4255..582788a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.0.2] - 2021-dd-05 + +### Bug fixes + +- Fix `busy` event emission on fixed pool type + ## [2.0.1] - 2021-16-03 ### Bug fixes diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c3f293e7..dc49dc8f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -173,14 +173,14 @@ export abstract class AbstractPool< this, () => { const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { + this.registerWorkerMessageListener(workerCreated, async message => { const tasksInProgress = this.tasks.get(workerCreated) if ( isKillBehavior(KillBehaviors.HARD, message.kill) || tasksInProgress === 0 ) { // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void this.destroyWorker(workerCreated) + await this.destroyWorker(workerCreated) } }) return workerCreated @@ -262,10 +262,9 @@ export abstract class AbstractPool< public execute (data: Data): Promise { // Configure worker to handle message with the specified task const worker = this.chooseWorker() - this.increaseWorkersTask(worker) - this.checkAndEmitBusy() const messageId = ++this.nextMessageId const res = this.internalExecute(worker, messageId) + this.checkAndEmitBusy() this.sendToWorker(worker, { data: data || ({} as Data), id: messageId }) return res } @@ -376,6 +375,7 @@ export abstract class AbstractPool< worker: Worker, messageId: number ): Promise { + this.increaseWorkersTask(worker) return new Promise((resolve, reject) => { this.promiseMap.set(messageId, { resolve, reject, worker }) }) diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index bb6a2e4a..df36568c 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -137,7 +137,9 @@ describe('Abstract pool test suite', () => { for (let i = 0; i < numberOfWorkers * 2; i++) { promises.push(pool.execute({ test: 'test' })) } - expect(poolBusy).toBe(numberOfWorkers) + // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. + // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. + expect(poolBusy).toBe(numberOfWorkers + 1) pool.destroy() }) }) diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 9a39c1d6..62d28ed6 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -28,6 +28,8 @@ describe('Dynamic cluster pool test suite', () => { } expect(pool.workers.length).toBeLessThanOrEqual(max) expect(pool.workers.length).toBeGreaterThan(min) + // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. + // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolBusy).toBe(max + 1) const numberOfExitEvents = await TestUtils.waitExits(pool, max - min) expect(numberOfExitEvents).toBe(max - min) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 7048bb79..4f375593 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -74,7 +74,9 @@ describe('Fixed cluster pool test suite', () => { for (let i = 0; i < numberOfWorkers * 2; i++) { promises.push(pool.execute({ test: 'test' })) } - expect(poolBusy).toBe(numberOfWorkers) + // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. + // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. + expect(poolBusy).toBe(numberOfWorkers + 1) }) it('Verify that is possible to have a worker that return undefined', async () => { diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 9cba566f..241a1b60 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -27,6 +27,8 @@ describe('Dynamic thread pool test suite', () => { promises.push(pool.execute({ test: 'test' })) } expect(pool.workers.length).toBe(max) + // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. + // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolBusy).toBe(max + 1) const res = await TestUtils.waitExits(pool, max - min) expect(res).toBe(max - min) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 81ef967e..6030866f 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -74,7 +74,9 @@ describe('Fixed thread pool test suite', () => { for (let i = 0; i < numberOfThreads * 2; i++) { promises.push(pool.execute({ test: 'test' })) } - expect(poolBusy).toBe(numberOfThreads) + // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. + // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool. + expect(poolBusy).toBe(numberOfThreads + 1) }) it('Verify that is possible to have a worker that return undefined', async () => {