Merge branch 'master' into combined-prs-branch
[poolifier.git] / tests / pools / abstract-pool.test.mjs
index 9c1e49a41e0bddb2298f1bcd9a5d038c8d46645c..cf4b55afc26a6a1278f886a0d4c2762b1c230f07 100644 (file)
@@ -2,6 +2,7 @@ import { EventEmitterAsyncResource } from 'node:events'
 import { dirname, join } from 'node:path'
 import { readFileSync } from 'node:fs'
 import { fileURLToPath } from 'node:url'
+import { createHook, executionAsyncId } from 'node:async_hooks'
 import { expect } from 'expect'
 import { restore, stub } from 'sinon'
 import {
@@ -77,7 +78,10 @@ describe('Abstract pool test suite', () => {
 
   it('Verify that filePath is checked', () => {
     expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
-      new Error("Cannot find the worker file 'undefined'")
+      new TypeError('The worker file path must be specified')
+    )
+    expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+      new TypeError('The worker file path must be a string')
     )
     expect(
       () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
@@ -120,6 +124,22 @@ describe('Abstract pool test suite', () => {
     )
   })
 
+  it('Verify that pool arguments number and pool type are checked', () => {
+    expect(
+      () =>
+        new FixedThreadPool(
+          numberOfWorkers,
+          './tests/worker-files/thread/testWorker.mjs',
+          undefined,
+          numberOfWorkers * 2
+        )
+    ).toThrow(
+      new Error(
+        'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+      )
+    )
+  })
+
   it('Verify that dynamic pool sizing is checked', () => {
     expect(
       () =>
@@ -206,27 +226,33 @@ describe('Abstract pool test suite', () => {
       enableEvents: true,
       restartWorkerOnError: true,
       enableTasksQueue: false,
-      workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
-      workerChoiceStrategyOptions: {
-        retries: 6,
-        runTime: { median: false },
-        waitTime: { median: false },
-        elu: { median: false }
-      }
+      workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
     })
     expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
-      retries: 6,
+      retries:
+        pool.info.maxSize +
+        Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
       runTime: { median: false },
       waitTime: { median: false },
-      elu: { median: false }
+      elu: { median: false },
+      weights: expect.objectContaining({
+        0: expect.any(Number),
+        [pool.info.maxSize - 1]: expect.any(Number)
+      })
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
-        retries: 6,
+        retries:
+          pool.info.maxSize +
+          Object.keys(workerChoiceStrategy.opts.weights).length,
         runTime: { median: false },
         waitTime: { median: false },
-        elu: { median: false }
+        elu: { median: false },
+        weights: expect.objectContaining({
+          0: expect.any(Number),
+          [pool.info.maxSize - 1]: expect.any(Number)
+        })
       })
     }
     await pool.destroy()
@@ -260,14 +286,12 @@ describe('Abstract pool test suite', () => {
         concurrency: 2,
         size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
-        tasksStealingOnBackPressure: true
+        tasksStealingOnBackPressure: true,
+        tasksFinishedTimeout: 2000
       },
       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
       workerChoiceStrategyOptions: {
-        retries: 6,
         runTime: { median: true },
-        waitTime: { median: false },
-        elu: { median: false },
         weights: { 0: 300, 1: 200 }
       },
       onlineHandler: testHandler,
@@ -276,7 +300,9 @@ describe('Abstract pool test suite', () => {
       exitHandler: testHandler
     })
     expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
-      retries: 6,
+      retries:
+        pool.info.maxSize +
+        Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
       runTime: { median: true },
       waitTime: { median: false },
       elu: { median: false },
@@ -285,7 +311,9 @@ describe('Abstract pool test suite', () => {
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
-        retries: 6,
+        retries:
+          pool.info.maxSize +
+          Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
         runTime: { median: true },
         waitTime: { median: false },
         elu: { median: false },
@@ -306,38 +334,6 @@ describe('Abstract pool test suite', () => {
           }
         )
     ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
-    expect(
-      () =>
-        new FixedThreadPool(
-          numberOfWorkers,
-          './tests/worker-files/thread/testWorker.mjs',
-          {
-            workerChoiceStrategyOptions: {
-              retries: 'invalidChoiceRetries'
-            }
-          }
-        )
-    ).toThrow(
-      new TypeError(
-        'Invalid worker choice strategy options: retries must be an integer'
-      )
-    )
-    expect(
-      () =>
-        new FixedThreadPool(
-          numberOfWorkers,
-          './tests/worker-files/thread/testWorker.mjs',
-          {
-            workerChoiceStrategyOptions: {
-              retries: -1
-            }
-          }
-        )
-    ).toThrow(
-      new RangeError(
-        "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
-      )
-    )
     expect(
       () =>
         new FixedThreadPool(
@@ -473,25 +469,32 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.mjs',
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
-    expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-      retries: 6,
-      runTime: { median: false },
-      waitTime: { median: false },
-      elu: { median: false }
-    })
+    expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
     expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
-      retries: 6,
+      retries:
+        pool.info.maxSize +
+        Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
       runTime: { median: false },
       waitTime: { median: false },
-      elu: { median: false }
+      elu: { median: false },
+      weights: expect.objectContaining({
+        0: expect.any(Number),
+        [pool.info.maxSize - 1]: expect.any(Number)
+      })
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
-        retries: 6,
+        retries:
+          pool.info.maxSize +
+          Object.keys(workerChoiceStrategy.opts.weights).length,
         runTime: { median: false },
         waitTime: { median: false },
-        elu: { median: false }
+        elu: { median: false },
+        weights: expect.objectContaining({
+          0: expect.any(Number),
+          [pool.info.maxSize - 1]: expect.any(Number)
+        })
       })
     }
     expect(
@@ -518,24 +521,34 @@ describe('Abstract pool test suite', () => {
       elu: { median: true }
     })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-      retries: 6,
       runTime: { median: true },
-      waitTime: { median: false },
       elu: { median: true }
     })
     expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
-      retries: 6,
+      retries:
+        pool.info.maxSize +
+        Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
       runTime: { median: true },
       waitTime: { median: false },
-      elu: { median: true }
+      elu: { median: true },
+      weights: expect.objectContaining({
+        0: expect.any(Number),
+        [pool.info.maxSize - 1]: expect.any(Number)
+      })
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
-        retries: 6,
+        retries:
+          pool.info.maxSize +
+          Object.keys(workerChoiceStrategy.opts.weights).length,
         runTime: { median: true },
         waitTime: { median: false },
-        elu: { median: true }
+        elu: { median: true },
+        weights: expect.objectContaining({
+          0: expect.any(Number),
+          [pool.info.maxSize - 1]: expect.any(Number)
+        })
       })
     }
     expect(
@@ -562,24 +575,34 @@ describe('Abstract pool test suite', () => {
       elu: { median: false }
     })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-      retries: 6,
       runTime: { median: false },
-      waitTime: { median: false },
       elu: { median: false }
     })
     expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
-      retries: 6,
+      retries:
+        pool.info.maxSize +
+        Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
       runTime: { median: false },
       waitTime: { median: false },
-      elu: { median: false }
+      elu: { median: false },
+      weights: expect.objectContaining({
+        0: expect.any(Number),
+        [pool.info.maxSize - 1]: expect.any(Number)
+      })
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
-        retries: 6,
+        retries:
+          pool.info.maxSize +
+          Object.keys(workerChoiceStrategy.opts.weights).length,
         runTime: { median: false },
         waitTime: { median: false },
-        elu: { median: false }
+        elu: { median: false },
+        weights: expect.objectContaining({
+          0: expect.any(Number),
+          [pool.info.maxSize - 1]: expect.any(Number)
+        })
       })
     }
     expect(
@@ -608,20 +631,6 @@ describe('Abstract pool test suite', () => {
         'Invalid worker choice strategy options: must be a plain object'
       )
     )
-    expect(() =>
-      pool.setWorkerChoiceStrategyOptions({
-        retries: 'invalidChoiceRetries'
-      })
-    ).toThrow(
-      new TypeError(
-        'Invalid worker choice strategy options: retries must be an integer'
-      )
-    )
-    expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
-      new RangeError(
-        "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
-      )
-    )
     expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
       new Error(
         'Invalid worker choice strategy options: must have a weight for each worker node'
@@ -650,7 +659,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(true, { concurrency: 2 })
     expect(pool.opts.enableTasksQueue).toBe(true)
@@ -658,7 +668,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 2,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(false)
     expect(pool.opts.enableTasksQueue).toBe(false)
@@ -676,7 +687,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -687,13 +699,15 @@ describe('Abstract pool test suite', () => {
       concurrency: 2,
       size: 2,
       taskStealing: false,
-      tasksStealingOnBackPressure: false
+      tasksStealingOnBackPressure: false,
+      tasksFinishedTimeout: 3000
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
       size: 2,
       taskStealing: false,
-      tasksStealingOnBackPressure: false
+      tasksStealingOnBackPressure: false,
+      tasksFinishedTimeout: 3000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -709,7 +723,8 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true
+      tasksStealingOnBackPressure: true,
+      tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -867,7 +882,8 @@ describe('Abstract pool test suite', () => {
         id: expect.any(Number),
         type: WorkerTypes.cluster,
         dynamic: false,
-        ready: true
+        ready: true,
+        stealing: false
       })
     }
     await pool.destroy()
@@ -882,7 +898,8 @@ describe('Abstract pool test suite', () => {
         id: expect.any(Number),
         type: WorkerTypes.thread,
         dynamic: false,
-        ready: true
+        ready: true,
+        stealing: false
       })
     }
     await pool.destroy()
@@ -1254,6 +1271,7 @@ describe('Abstract pool test suite', () => {
       maxSize: expect.any(Number),
       workerNodes: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
+      stealingWorkerNodes: expect.any(Number),
       busyWorkerNodes: expect.any(Number),
       executedTasks: expect.any(Number),
       executingTasks: expect.any(Number),
@@ -1263,7 +1281,101 @@ describe('Abstract pool test suite', () => {
       stolenTasks: expect.any(Number),
       failedTasks: expect.any(Number)
     })
-    expect(pool.hasBackPressure.called).toBe(true)
+    expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
+    await pool.destroy()
+  })
+
+  it('Verify that destroy() waits for queued tasks to finish', async () => {
+    const tasksFinishedTimeout = 2500
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
+    expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
+  })
+
+  it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+    const tasksFinishedTimeout = 1000
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/asyncWorker.mjs',
+      {
+        enableTasksQueue: true,
+        tasksQueueOptions: { tasksFinishedTimeout }
+      }
+    )
+    const maxMultiplier = 4
+    let tasksFinished = 0
+    for (const workerNode of pool.workerNodes) {
+      workerNode.on('taskFinished', () => {
+        ++tasksFinished
+      })
+    }
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      pool.execute()
+    }
+    expect(pool.info.queuedTasks).toBeGreaterThan(0)
+    const startTime = performance.now()
+    await pool.destroy()
+    const elapsedTime = performance.now() - startTime
+    expect(tasksFinished).toBe(0)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
+  })
+
+  it('Verify that pool asynchronous resource track tasks execution', async () => {
+    let taskAsyncId
+    let initCalls = 0
+    let beforeCalls = 0
+    let afterCalls = 0
+    let resolveCalls = 0
+    const hook = createHook({
+      init (asyncId, type) {
+        if (type === 'poolifier:task') {
+          initCalls++
+          taskAsyncId = asyncId
+        }
+      },
+      before (asyncId) {
+        if (asyncId === taskAsyncId) beforeCalls++
+      },
+      after (asyncId) {
+        if (asyncId === taskAsyncId) afterCalls++
+      },
+      promiseResolve () {
+        if (executionAsyncId() === taskAsyncId) resolveCalls++
+      }
+    })
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    hook.enable()
+    await pool.execute()
+    hook.disable()
+    expect(initCalls).toBe(1)
+    expect(beforeCalls).toBe(1)
+    expect(afterCalls).toBe(1)
+    expect(resolveCalls).toBe(1)
     await pool.destroy()
   })