Fix busy event emission on fixed pool: (#332)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 3 May 2021 07:48:11 +0000 (09:48 +0200)
committerGitHub <noreply@github.com>
Mon, 3 May 2021 07:48:11 +0000 (09:48 +0200)
Co-authored-by: Shinigami <chrissi92@hotmail.de>
CHANGELOG.md
src/pools/abstract-pool.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js

index 257e4255bd94cb9c93a5979937b87e99e94be51a..582788a3bb757ea68ced26d1d12481af2d02a6ed 100644 (file)
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [2.0.2] - 2021-dd-05
+
+### Bug fixes
+
+- Fix `busy` event emission on fixed pool type
+
 ## [2.0.1] - 2021-16-03
 
 ### Bug fixes
index c3f293e78bc693fba6aea17e36c880968ee58443..dc49dc8f6edada76925ddb66ae27b0224c9cce8c 100644 (file)
@@ -173,14 +173,14 @@ export abstract class AbstractPool<
       this,
       () => {
         const workerCreated = this.createAndSetupWorker()
-        this.registerWorkerMessageListener(workerCreated, message => {
+        this.registerWorkerMessageListener(workerCreated, async message => {
           const tasksInProgress = this.tasks.get(workerCreated)
           if (
             isKillBehavior(KillBehaviors.HARD, message.kill) ||
             tasksInProgress === 0
           ) {
             // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-            void this.destroyWorker(workerCreated)
+            await this.destroyWorker(workerCreated)
           }
         })
         return workerCreated
@@ -262,10 +262,9 @@ export abstract class AbstractPool<
   public execute (data: Data): Promise<Response> {
     // Configure worker to handle message with the specified task
     const worker = this.chooseWorker()
-    this.increaseWorkersTask(worker)
-    this.checkAndEmitBusy()
     const messageId = ++this.nextMessageId
     const res = this.internalExecute(worker, messageId)
+    this.checkAndEmitBusy()
     this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
     return res
   }
@@ -376,6 +375,7 @@ export abstract class AbstractPool<
     worker: Worker,
     messageId: number
   ): Promise<Response> {
+    this.increaseWorkersTask(worker)
     return new Promise<Response>((resolve, reject) => {
       this.promiseMap.set(messageId, { resolve, reject, worker })
     })
index bb6a2e4a89037e84bc99a3bccd14e675894a9a14..df36568c9d3cfbd99689661334f6e26bddf0ae2b 100644 (file)
@@ -137,7 +137,9 @@ describe('Abstract pool test suite', () => {
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.push(pool.execute({ test: 'test' }))
     }
-    expect(poolBusy).toBe(numberOfWorkers)
+    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
+    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
+    expect(poolBusy).toBe(numberOfWorkers + 1)
     pool.destroy()
   })
 })
index 9a39c1d62925251e2b0d7727047ac8a3d6982003..62d28ed650c53d39cd86a722285068dafdb77bcc 100644 (file)
@@ -28,6 +28,8 @@ describe('Dynamic cluster pool test suite', () => {
     }
     expect(pool.workers.length).toBeLessThanOrEqual(max)
     expect(pool.workers.length).toBeGreaterThan(min)
+    // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
+    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
     const numberOfExitEvents = await TestUtils.waitExits(pool, max - min)
     expect(numberOfExitEvents).toBe(max - min)
index 7048bb7937946108777aa8faa65fe569031bc98e..4f375593a8c2c1160fb598fbc74d9be28ef4fee7 100644 (file)
@@ -74,7 +74,9 @@ describe('Fixed cluster pool test suite', () => {
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.push(pool.execute({ test: 'test' }))
     }
-    expect(poolBusy).toBe(numberOfWorkers)
+    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
+    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
+    expect(poolBusy).toBe(numberOfWorkers + 1)
   })
 
   it('Verify that is possible to have a worker that return undefined', async () => {
index 9cba566f916abccdd380b38a0795a5502992a933..241a1b60ce312a77dd265e3d1390ae6e25ab5786 100644 (file)
@@ -27,6 +27,8 @@ describe('Dynamic thread pool test suite', () => {
       promises.push(pool.execute({ test: 'test' }))
     }
     expect(pool.workers.length).toBe(max)
+    // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
+    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
     const res = await TestUtils.waitExits(pool, max - min)
     expect(res).toBe(max - min)
index 81ef967ec5e0540abdd94a697b642b3e08c69b3e..6030866fb70181e0c229baa3a7bbacebc570eeea 100644 (file)
@@ -74,7 +74,9 @@ describe('Fixed thread pool test suite', () => {
     for (let i = 0; i < numberOfThreads * 2; i++) {
       promises.push(pool.execute({ test: 'test' }))
     }
-    expect(poolBusy).toBe(numberOfThreads)
+    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
+    // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
+    expect(poolBusy).toBe(numberOfThreads + 1)
   })
 
   it('Verify that is possible to have a worker that return undefined', async () => {