fix: fix back pressure detection
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
index 8652287769af1d63267dde6cc4873e1284fa15e2..ec552e986003d859f38fe0557d6d8727c23d7ceb 100644 (file)
@@ -1,4 +1,3 @@
-const { MessageChannel } = require('worker_threads')
 const { expect } = require('expect')
 const {
   DynamicClusterPool,
@@ -34,7 +33,7 @@ describe('Abstract pool test suite', () => {
           }
         )
     ).toThrowError(
-      'Cannot start a pool from the same worker type as the current pool one'
+      'Cannot start a pool from a worker with the same type as the pool'
     )
   })
 
@@ -168,6 +167,13 @@ describe('Abstract pool test suite', () => {
       WorkerChoiceStrategies.ROUND_ROBIN
     )
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: false },
+      waitTime: { median: false },
+      elu: { median: false }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: false },
       waitTime: { median: false },
       elu: { median: false }
@@ -206,7 +212,17 @@ describe('Abstract pool test suite', () => {
       WorkerChoiceStrategies.LEAST_USED
     )
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: true },
+      waitTime: { median: false },
+      elu: { median: false },
+      weights: { 0: 300, 1: 200 }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: true },
+      waitTime: { median: false },
+      elu: { median: false },
       weights: { 0: 300, 1: 200 }
     })
     expect(pool.opts.messageHandler).toStrictEqual(testHandler)
@@ -227,18 +243,6 @@ describe('Abstract pool test suite', () => {
           }
         )
     ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
-    expect(
-      () =>
-        new FixedThreadPool(
-          numberOfWorkers,
-          './tests/worker-files/thread/testWorker.js',
-          {
-            workerChoiceStrategyOptions: 'invalidOptions'
-          }
-        )
-    ).toThrowError(
-      'Invalid worker choice strategy options: must be a plain object'
-    )
     expect(
       () =>
         new FixedThreadPool(
@@ -305,6 +309,13 @@ describe('Abstract pool test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: false },
+      waitTime: { median: false },
+      elu: { median: false }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: false },
       waitTime: { median: false },
       elu: { median: false }
@@ -312,6 +323,7 @@ describe('Abstract pool test suite', () => {
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
+        choiceRetries: 6,
         runTime: { median: false },
         waitTime: { median: false },
         elu: { median: false }
@@ -341,13 +353,23 @@ describe('Abstract pool test suite', () => {
       elu: { median: true }
     })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: true },
+      waitTime: { median: false },
+      elu: { median: true }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: true },
+      waitTime: { median: false },
       elu: { median: true }
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
+        choiceRetries: 6,
         runTime: { median: true },
+        waitTime: { median: false },
         elu: { median: true }
       })
     }
@@ -375,13 +397,23 @@ describe('Abstract pool test suite', () => {
       elu: { median: false }
     })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: false },
+      waitTime: { median: false },
+      elu: { median: false }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: false },
+      waitTime: { median: false },
       elu: { median: false }
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
+        choiceRetries: 6,
         runTime: { median: false },
+        waitTime: { median: false },
         elu: { median: false }
       })
     }
@@ -588,8 +620,7 @@ describe('Abstract pool test suite', () => {
         id: expect.any(Number),
         type: WorkerTypes.thread,
         dynamic: false,
-        ready: true,
-        messageChannel: expect.any(MessageChannel)
+        ready: true
       })
     }
   })
@@ -734,31 +765,25 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'full' event can register a callback", async () => {
-    const pool = new DynamicThreadPool(
+  it("Verify that pool event emitter 'ready' event can register a callback", async () => {
+    const pool = new DynamicClusterPool(
       Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
-      './tests/worker-files/thread/testWorker.js'
+      './tests/worker-files/cluster/testWorker.js'
     )
-    const promises = new Set()
-    let poolFull = 0
     let poolInfo
-    pool.emitter.on(PoolEvents.full, (info) => {
-      ++poolFull
+    let poolReady = 0
+    pool.emitter.on(PoolEvents.ready, (info) => {
+      ++poolReady
       poolInfo = info
     })
-    for (let i = 0; i < numberOfWorkers * 2; i++) {
-      promises.add(pool.execute())
-    }
-    await Promise.all(promises)
-    // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
-    // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
-    expect(poolFull).toBe(numberOfWorkers * 2 - 1)
+    await waitPoolEvents(pool, PoolEvents.ready, 1)
+    expect(poolReady).toBe(1)
     expect(poolInfo).toStrictEqual({
       version,
       type: PoolTypes.dynamic,
-      worker: WorkerTypes.thread,
-      ready: expect.any(Boolean),
+      worker: WorkerTypes.cluster,
+      ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
@@ -772,25 +797,30 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'ready' event can register a callback", async () => {
-    const pool = new DynamicClusterPool(
-      Math.floor(numberOfWorkers / 2),
+  it("Verify that pool event emitter 'busy' event can register a callback", async () => {
+    const pool = new FixedThreadPool(
       numberOfWorkers,
-      './tests/worker-files/cluster/testWorker.js'
+      './tests/worker-files/thread/testWorker.js'
     )
+    const promises = new Set()
+    let poolBusy = 0
     let poolInfo
-    let poolReady = 0
-    pool.emitter.on(PoolEvents.ready, (info) => {
-      ++poolReady
+    pool.emitter.on(PoolEvents.busy, (info) => {
+      ++poolBusy
       poolInfo = info
     })
-    await waitPoolEvents(pool, PoolEvents.ready, 1)
-    expect(poolReady).toBe(1)
+    for (let i = 0; i < numberOfWorkers * 2; i++) {
+      promises.add(pool.execute())
+    }
+    await Promise.all(promises)
+    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
+    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
+    expect(poolBusy).toBe(numberOfWorkers + 1)
     expect(poolInfo).toStrictEqual({
       version,
-      type: PoolTypes.dynamic,
-      worker: WorkerTypes.cluster,
-      ready: true,
+      type: PoolTypes.fixed,
+      worker: WorkerTypes.thread,
+      ready: expect.any(Boolean),
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
@@ -804,28 +834,29 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'busy' event can register a callback", async () => {
-    const pool = new FixedThreadPool(
+  it("Verify that pool event emitter 'full' event can register a callback", async () => {
+    const pool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.js'
     )
     const promises = new Set()
-    let poolBusy = 0
+    let poolFull = 0
     let poolInfo
-    pool.emitter.on(PoolEvents.busy, (info) => {
-      ++poolBusy
+    pool.emitter.on(PoolEvents.full, (info) => {
+      ++poolFull
       poolInfo = info
     })
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.add(pool.execute())
     }
     await Promise.all(promises)
-    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
-    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
-    expect(poolBusy).toBe(numberOfWorkers + 1)
+    // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
+    // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
+    expect(poolFull).toBe(numberOfWorkers * 2 - 1)
     expect(poolInfo).toStrictEqual({
       version,
-      type: PoolTypes.fixed,
+      type: PoolTypes.dynamic,
       worker: WorkerTypes.thread,
       ready: expect.any(Boolean),
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
@@ -841,11 +872,37 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify that multiple tasks worker is working', async () => {
+  it('Verify that listTaskFunctions() is working', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
+      'default',
+      'jsonIntegerSerialization',
+      'factorial',
+      'fibonacci'
+    ])
+    const fixedClusterPool = new FixedClusterPool(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+    )
+    await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
+    expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
+      'default',
+      'jsonIntegerSerialization',
+      'factorial',
+      'fibonacci'
+    ])
+  })
+
+  it('Verify that multiple task functions worker is working', async () => {
     const pool = new DynamicClusterPool(
       Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
-      './tests/worker-files/cluster/testMultiTasksWorker.js'
+      './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
     )
     const data = { n: 10 }
     const result0 = await pool.execute(data)
@@ -856,5 +913,43 @@ describe('Abstract pool test suite', () => {
     expect(result2).toBe(3628800)
     const result3 = await pool.execute(data, 'fibonacci')
     expect(result3).toBe(55)
+    expect(pool.info.executingTasks).toBe(0)
+    expect(pool.info.executedTasks).toBe(4)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.info.taskFunctions).toStrictEqual([
+        'default',
+        'jsonIntegerSerialization',
+        'factorial',
+        'fibonacci'
+      ])
+      expect(workerNode.taskFunctionsUsage.size).toBe(3)
+      for (const name of pool.listTaskFunctions()) {
+        expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
+          tasks: {
+            executed: expect.any(Number),
+            executing: expect.any(Number),
+            failed: 0,
+            queued: 0
+          },
+          runTime: {
+            history: expect.any(CircularArray)
+          },
+          waitTime: {
+            history: expect.any(CircularArray)
+          },
+          elu: {
+            idle: {
+              history: expect.any(CircularArray)
+            },
+            active: {
+              history: expect.any(CircularArray)
+            }
+          }
+        })
+        expect(
+          workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
+        ).toBeGreaterThanOrEqual(0)
+      }
+    }
   })
 })