docs: add changelog entry
[poolifier.git] / tests / pools / abstract-pool.test.mjs
index 8c0319e7c04bbad87b5531e16d2b2a1f3726d28c..0f61dbd87d5e2911f9e1a648023ec1d6dade5005 100644 (file)
@@ -2,6 +2,7 @@ import { EventEmitterAsyncResource } from 'node:events'
 import { dirname, join } from 'node:path'
 import { readFileSync } from 'node:fs'
 import { fileURLToPath } from 'node:url'
+import { createHook, executionAsyncId } from 'node:async_hooks'
 import { expect } from 'expect'
 import { restore, stub } from 'sinon'
 import {
@@ -38,7 +39,16 @@ describe('Abstract pool test suite', () => {
     restore()
   })
 
-  it('Simulate pool creation from a non main thread/process', () => {
+  it('Verify that pool can be created and destroyed', async () => {
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    expect(pool).toBeInstanceOf(FixedThreadPool)
+    await pool.destroy()
+  })
+
+  it('Verify that pool cannot be created from a non main thread/process', () => {
     expect(
       () =>
         new StubPoolWithIsMain(
@@ -60,14 +70,18 @@ describe('Abstract pool test suite', () => {
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.mjs'
     )
-    expect(pool.starting).toBe(false)
     expect(pool.started).toBe(true)
+    expect(pool.starting).toBe(false)
+    expect(pool.destroying).toBe(false)
     await pool.destroy()
   })
 
   it('Verify that filePath is checked', () => {
     expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
-      new Error("Cannot find the worker file 'undefined'")
+      new TypeError('The worker file path must be specified')
+    )
+    expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+      new TypeError('The worker file path must be a string')
     )
     expect(
       () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
@@ -250,7 +264,8 @@ describe('Abstract pool test suite', () => {
         concurrency: 2,
         size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
-        tasksStealingOnBackPressure: true
+        tasksStealingOnBackPressure: true,
+        tasksFinishedTimeout: 2000
       },
       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
       workerChoiceStrategyOptions: {
@@ -634,41 +649,27 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.opts.enableTasksQueue).toBe(false)
     expect(pool.opts.tasksQueueOptions).toBeUndefined()
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeUndefined()
-      expect(workerNode.onBackPressure).toBeUndefined()
-    }
     pool.enableTasksQueue(true)
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
-    }
     pool.enableTasksQueue(true, { concurrency: 2 })
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
-    }
     pool.enableTasksQueue(false)
     expect(pool.opts.enableTasksQueue).toBe(false)
     expect(pool.opts.tasksQueueOptions).toBeUndefined()
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeUndefined()
-      expect(workerNode.onBackPressure).toBeUndefined()
-    }
     await pool.destroy()
   })
 
@@ -682,33 +683,32 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
         pool.opts.tasksQueueOptions.size
       )
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
     }
     pool.setTasksQueueOptions({
       concurrency: 2,
       size: 2,
       taskStealing: false,
-      tasksStealingOnBackPressure: false
+      tasksStealingOnBackPressure: false,
+      tasksFinishedTimeout: 3000
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
       size: 2,
       taskStealing: false,
-      tasksStealingOnBackPressure: false
+      tasksStealingOnBackPressure: false,
+      tasksFinishedTimeout: 3000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
         pool.opts.tasksQueueOptions.size
       )
-      expect(workerNode.onEmptyQueue).toBeUndefined()
-      expect(workerNode.onBackPressure).toBeUndefined()
     }
     pool.setTasksQueueOptions({
       concurrency: 1,
@@ -719,14 +719,13 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
         pool.opts.tasksQueueOptions.size
       )
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
     }
     expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
       new TypeError('Invalid tasks queue options: must be a plain object')
@@ -819,6 +818,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -899,6 +899,24 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify that pool statuses are checked at start or destroy', async () => {
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    expect(pool.info.started).toBe(true)
+    expect(pool.info.ready).toBe(true)
+    expect(() => pool.start()).toThrow(
+      new Error('Cannot start an already started pool')
+    )
+    await pool.destroy()
+    expect(pool.info.started).toBe(false)
+    expect(pool.info.ready).toBe(false)
+    await expect(pool.destroy()).rejects.toThrow(
+      new Error('Cannot destroy an already destroyed pool')
+    )
+  })
+
   it('Verify that pool can be started after initialization', async () => {
     const pool = new FixedClusterPool(
       numberOfWorkers,
@@ -909,6 +927,7 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.info.started).toBe(false)
     expect(pool.info.ready).toBe(false)
+    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes).toStrictEqual([])
     await expect(pool.execute()).rejects.toThrow(
       new Error('Cannot execute a task on not started pool')
@@ -916,6 +935,8 @@ describe('Abstract pool test suite', () => {
     pool.start()
     expect(pool.info.started).toBe(true)
     expect(pool.info.ready).toBe(true)
+    await waitPoolEvents(pool, PoolEvents.ready, 1)
+    expect(pool.readyEventEmitted).toBe(true)
     expect(pool.workerNodes.length).toBe(numberOfWorkers)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode).toBeInstanceOf(WorkerNode)
@@ -963,6 +984,7 @@ describe('Abstract pool test suite', () => {
           executing: maxMultiplier,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -990,6 +1012,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -1031,6 +1054,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -1066,6 +1090,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -1249,7 +1274,101 @@ describe('Abstract pool test suite', () => {
       stolenTasks: expect.any(Number),
       failedTasks: expect.any(Number)
     })
-    expect(pool.hasBackPressure.called).toBe(true)
+    expect(pool.hasBackPressure.callCount).toBe(5)
+    await pool.destroy()
+  })
+
+  it('Verify that destroy() waits for queued tasks to finish', async () => {
+    const tasksFinishedTimeout = 2500
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
+    expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+  })
+
+  it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+    const tasksFinishedTimeout = 1000
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBe(0)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 300)
+  })
+
+  it('Verify that pool asynchronous resource track tasks execution', async () => {
+    let taskAsyncId
+    let initCalls = 0
+    let beforeCalls = 0
+    let afterCalls = 0
+    let resolveCalls = 0
+    const hook = createHook({
+      init (asyncId, type) {
+        if (type === 'poolifier:task') {
+          initCalls++
+          taskAsyncId = asyncId
+        }
+      },
+      before (asyncId) {
+        if (asyncId === taskAsyncId) beforeCalls++
+      },
+      after (asyncId) {
+        if (asyncId === taskAsyncId) afterCalls++
+      },
+      promiseResolve () {
+        if (executionAsyncId() === taskAsyncId) resolveCalls++
+      }
+    })
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    hook.enable()
+    await pool.execute()
+    hook.disable()
+    expect(initCalls).toBe(1)
+    expect(beforeCalls).toBe(1)
+    expect(afterCalls).toBe(1)
+    expect(resolveCalls).toBe(1)
     await pool.destroy()
   })
 
@@ -1332,6 +1451,7 @@ describe('Abstract pool test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -1428,23 +1548,24 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
     )
     await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    const workerId = dynamicThreadPool.workerNodes[0].info.id
     await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
       new Error(
-        "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
+        `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
       )
     )
     await expect(
       dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
     ).rejects.toThrow(
       new Error(
-        "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
+        `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
       )
     )
     await expect(
       dynamicThreadPool.setDefaultTaskFunction('unknown')
     ).rejects.toThrow(
       new Error(
-        "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
+        `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
       )
     )
     expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
@@ -1471,6 +1592,7 @@ describe('Abstract pool test suite', () => {
       'jsonIntegerSerialization',
       'factorial'
     ])
+    await dynamicThreadPool.destroy()
   })
 
   it('Verify that multiple task functions worker is working', async () => {
@@ -1505,6 +1627,7 @@ describe('Abstract pool test suite', () => {
             executing: 0,
             failed: 0,
             queued: 0,
+            sequentiallyStolen: 0,
             stolen: 0
           },
           runTime: {
@@ -1547,6 +1670,11 @@ describe('Abstract pool test suite', () => {
     await expect(
       pool.sendKillMessageToWorker(workerNodeKey)
     ).resolves.toBeUndefined()
+    await expect(
+      pool.sendKillMessageToWorker(numberOfWorkers)
+    ).rejects.toStrictEqual(
+      new Error(`Invalid worker node key '${numberOfWorkers}'`)
+    )
     await pool.destroy()
   })