fix: fix pool destroying with tasks queuing enabled
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Dec 2023 21:20:41 +0000 (22:20 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Dec 2023 21:20:41 +0000 (22:20 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
tests/pools/abstract-pool.test.mjs

index d7568294382147e94da0c67d731609842b647b63..b229afa63a368f93302362628788070fd6e20633 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Fix pool destroying with tasks queuing enabled.
+
 ## [3.1.5] - 2023-12-18
 
 ### Added
index 92a29b27c3b1b35f5d205a337fd4420f5976b8ee..11cc9cc15ef419346827e5f627150f04e73ba158 100644 (file)
@@ -1005,6 +1005,10 @@ export abstract class AbstractPool<
     workerNodeKey: number
   ): Promise<void> {
     await new Promise<void>((resolve, reject) => {
+      if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
+        reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+        return
+      }
       const killMessageListener = (message: MessageValue<Response>): void => {
         this.checkMessageWorkerId(message)
         if (message.kill === 'success') {
@@ -1706,7 +1710,7 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.promiseResponseMap.delete(taskId as string)
       workerNode?.emit('taskFinished', taskId)
-      if (this.opts.enableTasksQueue === true) {
+      if (this.opts.enableTasksQueue === true && !this.destroying) {
         const workerNodeTasksUsage = workerNode.usage.tasks
         if (
           this.tasksQueueSize(workerNodeKey) > 0 &&
index 1b80aa4cdeb92efd76ed0c5f4e56e7357342c8a0..84935385a42b21cf8439501e2ef293f45e212b21 100644 (file)
@@ -1278,6 +1278,63 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify that destroy() waits for queued tasks to finish', async () => {
+    const tasksFinishedTimeout = 2500
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
+    expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout)
+  })
+
+  it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+    const tasksFinishedTimeout = 1000
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBe(0)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+  })
+
   it('Verify that pool asynchronous resource track tasks execution', async () => {
     let taskAsyncId
     let initCalls = 0
@@ -1613,6 +1670,11 @@ describe('Abstract pool test suite', () => {
     await expect(
       pool.sendKillMessageToWorker(workerNodeKey)
     ).resolves.toBeUndefined()
+    await expect(
+      pool.sendKillMessageToWorker(numberOfWorkers)
+    ).rejects.toStrictEqual(
+      new Error(`Invalid worker node key '${numberOfWorkers}'`)
+    )
     await pool.destroy()
   })