docs: add changelog entry
[poolifier.git] / tests / pools / abstract-pool.test.mjs
index cc4b0f387993f1a414a8b380dfc69ea54700b670..0f61dbd87d5e2911f9e1a648023ec1d6dade5005 100644 (file)
@@ -265,7 +265,7 @@ describe('Abstract pool test suite', () => {
         size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
         tasksStealingOnBackPressure: true,
-        tasksFinishedTimeout: 1000
+        tasksFinishedTimeout: 2000
       },
       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
       workerChoiceStrategyOptions: {
@@ -656,7 +656,7 @@ describe('Abstract pool test suite', () => {
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true,
-      tasksFinishedTimeout: 1000
+      tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(true, { concurrency: 2 })
     expect(pool.opts.enableTasksQueue).toBe(true)
@@ -665,7 +665,7 @@ describe('Abstract pool test suite', () => {
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true,
-      tasksFinishedTimeout: 1000
+      tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(false)
     expect(pool.opts.enableTasksQueue).toBe(false)
@@ -684,7 +684,7 @@ describe('Abstract pool test suite', () => {
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true,
-      tasksFinishedTimeout: 1000
+      tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -696,14 +696,14 @@ describe('Abstract pool test suite', () => {
       size: 2,
       taskStealing: false,
       tasksStealingOnBackPressure: false,
-      tasksFinishedTimeout: 2000
+      tasksFinishedTimeout: 3000
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
       size: 2,
       taskStealing: false,
       tasksStealingOnBackPressure: false,
-      tasksFinishedTimeout: 2000
+      tasksFinishedTimeout: 3000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -720,7 +720,7 @@ describe('Abstract pool test suite', () => {
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true,
-      tasksFinishedTimeout: 1000
+      tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -1278,6 +1278,63 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify that destroy() waits for queued tasks to finish', async () => {
+    const tasksFinishedTimeout = 2500
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
+    expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+  })
+
+  it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+    const tasksFinishedTimeout = 1000
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBe(0)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 300)
+  })
+
   it('Verify that pool asynchronous resource track tasks execution', async () => {
     let taskAsyncId
     let initCalls = 0
@@ -1613,6 +1670,11 @@ describe('Abstract pool test suite', () => {
     await expect(
       pool.sendKillMessageToWorker(workerNodeKey)
     ).resolves.toBeUndefined()
+    await expect(
+      pool.sendKillMessageToWorker(numberOfWorkers)
+    ).rejects.toStrictEqual(
+      new Error(`Invalid worker node key '${numberOfWorkers}'`)
+    )
     await pool.destroy()
   })