test: improve task function ops
[poolifier.git] / tests / pools / abstract-pool.test.mjs
index ccd2423102427a40878faf2ee347aa20f3f58964..e79795188b272fe1842f539673f3f23841dff90f 100644 (file)
@@ -1,10 +1,15 @@
+// eslint-disable-next-line n/no-unsupported-features/node-builtins
+import { createHook, executionAsyncId } from 'node:async_hooks'
 import { EventEmitterAsyncResource } from 'node:events'
-import { dirname, join } from 'node:path'
 import { readFileSync } from 'node:fs'
+import { dirname, join } from 'node:path'
 import { fileURLToPath } from 'node:url'
-import { createHook, executionAsyncId } from 'node:async_hooks'
+
 import { expect } from 'expect'
 import { restore, stub } from 'sinon'
+
+import { CircularArray } from '../../lib/circular-array.cjs'
+import { Deque } from '../../lib/deque.cjs'
 import {
   DynamicClusterPool,
   DynamicThreadPool,
@@ -15,11 +20,9 @@ import {
   WorkerChoiceStrategies,
   WorkerTypes
 } from '../../lib/index.cjs'
-import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
+import { WorkerNode } from '../../lib/pools/worker-node.cjs'
 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
 import { waitPoolEvents } from '../test-utils.cjs'
-import { WorkerNode } from '../../lib/pools/worker-node.cjs'
 
 describe('Abstract pool test suite', () => {
   const version = JSON.parse(
@@ -221,6 +224,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.mjs'
     )
     expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
+    expect(pool.emitter.eventNames()).toStrictEqual([])
     expect(pool.opts).toStrictEqual({
       startWorkers: true,
       enableEvents: true,
@@ -228,7 +232,7 @@ describe('Abstract pool test suite', () => {
       enableTasksQueue: false,
       workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
     })
-    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
         runTime: { median: false },
@@ -284,7 +288,7 @@ describe('Abstract pool test suite', () => {
       errorHandler: testHandler,
       exitHandler: testHandler
     })
-    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
         runTime: { median: true },
@@ -443,7 +447,7 @@ describe('Abstract pool test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
     expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
-    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
         runTime: { median: false },
@@ -456,7 +460,7 @@ describe('Abstract pool test suite', () => {
       })
     }
     expect(
-      pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+      pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
     ).toStrictEqual({
       runTime: {
         aggregate: true,
@@ -482,7 +486,7 @@ describe('Abstract pool test suite', () => {
       runTime: { median: true },
       elu: { median: true }
     })
-    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
         runTime: { median: true },
@@ -495,7 +499,7 @@ describe('Abstract pool test suite', () => {
       })
     }
     expect(
-      pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+      pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
     ).toStrictEqual({
       runTime: {
         aggregate: true,
@@ -521,7 +525,7 @@ describe('Abstract pool test suite', () => {
       runTime: { median: false },
       elu: { median: false }
     })
-    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+    for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
         runTime: { median: false },
@@ -534,7 +538,7 @@ describe('Abstract pool test suite', () => {
       })
     }
     expect(
-      pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+      pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
     ).toStrictEqual({
       runTime: {
         aggregate: true,
@@ -702,7 +706,8 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.thread,
       started: true,
       ready: true,
-      strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      strategyRetries: 0,
       minSize: numberOfWorkers,
       maxSize: numberOfWorkers,
       workerNodes: numberOfWorkers,
@@ -724,7 +729,8 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.cluster,
       started: true,
       ready: true,
-      strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      strategyRetries: 0,
       minSize: Math.floor(numberOfWorkers / 2),
       maxSize: numberOfWorkers,
       workerNodes: Math.floor(numberOfWorkers / 2),
@@ -861,8 +867,8 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.info.started).toBe(false)
     expect(pool.info.ready).toBe(false)
-    expect(pool.readyEventEmitted).toBe(false)
     expect(pool.workerNodes).toStrictEqual([])
+    expect(pool.readyEventEmitted).toBe(false)
     await expect(pool.execute()).rejects.toThrow(
       new Error('Cannot execute a task on not started pool')
     )
@@ -969,7 +975,7 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
+  it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
     const pool = new DynamicThreadPool(
       Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
@@ -1020,7 +1026,7 @@ describe('Abstract pool test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.usage).toStrictEqual({
         tasks: {
-          executed: 0,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           maxQueued: 0,
@@ -1043,6 +1049,10 @@ describe('Abstract pool test suite', () => {
           }
         }
       })
+      expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+        numberOfWorkers * maxMultiplier
+      )
       expect(workerNode.usage.runTime.history.length).toBe(0)
       expect(workerNode.usage.waitTime.history.length).toBe(0)
       expect(workerNode.usage.elu.idle.history.length).toBe(0)
@@ -1073,7 +1083,8 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.cluster,
       started: true,
       ready: true,
-      strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      strategyRetries: expect.any(Number),
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
       workerNodes: expect.any(Number),
@@ -1113,7 +1124,8 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.thread,
       started: true,
       ready: true,
-      strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      strategyRetries: expect.any(Number),
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
       workerNodes: expect.any(Number),
@@ -1152,7 +1164,8 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.thread,
       started: true,
       ready: true,
-      strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      strategyRetries: expect.any(Number),
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
       workerNodes: expect.any(Number),
@@ -1194,7 +1207,8 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.thread,
       started: true,
       ready: true,
-      strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      strategyRetries: expect.any(Number),
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
       workerNodes: expect.any(Number),
@@ -1239,7 +1253,7 @@ describe('Abstract pool test suite', () => {
     const elapsedTime = performance.now() - startTime
     expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
     expect(elapsedTime).toBeGreaterThanOrEqual(2000)
-    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
+    expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
   })
 
   it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
@@ -1353,29 +1367,42 @@ describe('Abstract pool test suite', () => {
       new TypeError('name argument must not be an empty string')
     )
     await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
-      new TypeError('fn argument must be a function')
+      new TypeError('taskFunction property must be a function')
     )
     await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
-      new TypeError('fn argument must be a function')
+      new TypeError('taskFunction property must be a function')
     )
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'test'
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'test' }
     ])
+    expect([
+      ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+    ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
     const echoTaskFunction = data => {
       return data
     }
     await expect(
-      dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+      dynamicThreadPool.addTaskFunction('echo', {
+        taskFunction: echoTaskFunction,
+        strategy: WorkerChoiceStrategies.LEAST_ELU
+      })
     ).resolves.toBe(true)
     expect(dynamicThreadPool.taskFunctions.size).toBe(1)
-    expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
-      echoTaskFunction
-    )
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'test',
-      'echo'
+    expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+      taskFunction: echoTaskFunction,
+      strategy: WorkerChoiceStrategies.LEAST_ELU
+    })
+    expect([
+      ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+    ]).toStrictEqual([
+      WorkerChoiceStrategies.ROUND_ROBIN,
+      WorkerChoiceStrategies.LEAST_ELU
+    ])
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'test' },
+      { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
     ])
     const taskFunctionData = { test: 'test' }
     const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
@@ -1398,9 +1425,15 @@ describe('Abstract pool test suite', () => {
         },
         elu: {
           idle: {
+            aggregate: 0,
+            maximum: 0,
+            minimum: 0,
             history: new CircularArray()
           },
           active: {
+            aggregate: 0,
+            maximum: 0,
+            minimum: 0,
             history: new CircularArray()
           }
         }
@@ -1416,9 +1449,9 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.mjs'
     )
     await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'test'
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'test' }
     ])
     await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
       new Error('Cannot remove a task function not handled on the pool side')
@@ -1426,40 +1459,53 @@ describe('Abstract pool test suite', () => {
     const echoTaskFunction = data => {
       return data
     }
-    await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+    await dynamicThreadPool.addTaskFunction('echo', {
+      taskFunction: echoTaskFunction,
+      strategy: WorkerChoiceStrategies.LEAST_ELU
+    })
     expect(dynamicThreadPool.taskFunctions.size).toBe(1)
-    expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
-      echoTaskFunction
-    )
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'test',
-      'echo'
+    expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+      taskFunction: echoTaskFunction,
+      strategy: WorkerChoiceStrategies.LEAST_ELU
+    })
+    expect([
+      ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+    ]).toStrictEqual([
+      WorkerChoiceStrategies.ROUND_ROBIN,
+      WorkerChoiceStrategies.LEAST_ELU
+    ])
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'test' },
+      { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
     ])
     await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
       true
     )
     expect(dynamicThreadPool.taskFunctions.size).toBe(0)
     expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'test'
+    expect([
+      ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+    ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'test' }
     ])
     await dynamicThreadPool.destroy()
   })
 
-  it('Verify that listTaskFunctionNames() is working', async () => {
+  it('Verify that listTaskFunctionsProperties() is working', async () => {
     const dynamicThreadPool = new DynamicThreadPool(
       Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
       './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
     )
     await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'jsonIntegerSerialization',
-      'factorial',
-      'fibonacci'
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'jsonIntegerSerialization' },
+      { name: 'factorial' },
+      { name: 'fibonacci' }
     ])
     await dynamicThreadPool.destroy()
     const fixedClusterPool = new FixedClusterPool(
@@ -1467,11 +1513,11 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
     )
     await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
-    expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'jsonIntegerSerialization',
-      'factorial',
-      'fibonacci'
+    expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'jsonIntegerSerialization' },
+      { name: 'factorial' },
+      { name: 'fibonacci' }
     ])
     await fixedClusterPool.destroy()
   })
@@ -1503,29 +1549,29 @@ describe('Abstract pool test suite', () => {
         `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
       )
     )
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'jsonIntegerSerialization',
-      'factorial',
-      'fibonacci'
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'jsonIntegerSerialization' },
+      { name: 'factorial' },
+      { name: 'fibonacci' }
     ])
     await expect(
       dynamicThreadPool.setDefaultTaskFunction('factorial')
     ).resolves.toBe(true)
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'factorial',
-      'jsonIntegerSerialization',
-      'fibonacci'
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'factorial' },
+      { name: 'jsonIntegerSerialization' },
+      { name: 'fibonacci' }
     ])
     await expect(
       dynamicThreadPool.setDefaultTaskFunction('fibonacci')
     ).resolves.toBe(true)
-    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'fibonacci',
-      'jsonIntegerSerialization',
-      'factorial'
+    expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'fibonacci' },
+      { name: 'jsonIntegerSerialization' },
+      { name: 'factorial' }
     ])
     await dynamicThreadPool.destroy()
   })
@@ -1548,15 +1594,17 @@ describe('Abstract pool test suite', () => {
     expect(pool.info.executingTasks).toBe(0)
     expect(pool.info.executedTasks).toBe(4)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.info.taskFunctionNames).toStrictEqual([
-        DEFAULT_TASK_NAME,
-        'jsonIntegerSerialization',
-        'factorial',
-        'fibonacci'
+      expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+        { name: DEFAULT_TASK_NAME },
+        { name: 'jsonIntegerSerialization' },
+        { name: 'factorial' },
+        { name: 'fibonacci' }
       ])
       expect(workerNode.taskFunctionsUsage.size).toBe(3)
-      for (const name of pool.listTaskFunctionNames()) {
-        expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
+      for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+        ).toStrictEqual({
           tasks: {
             executed: expect.any(Number),
             executing: 0,
@@ -1581,14 +1629,15 @@ describe('Abstract pool test suite', () => {
           }
         })
         expect(
-          workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
+          workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+            .tasks.executed
         ).toBeGreaterThan(0)
       }
       expect(
         workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
       ).toStrictEqual(
         workerNode.getTaskFunctionWorkerUsage(
-          workerNode.info.taskFunctionNames[1]
+          workerNode.info.taskFunctionsProperties[1].name
         )
       )
     }
@@ -1618,13 +1667,17 @@ describe('Abstract pool test suite', () => {
     await expect(
       pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
         taskFunctionOperation: 'add',
-        taskFunctionName: 'empty',
+        taskFunctionProperties: { name: 'empty' },
         taskFunction: (() => {}).toString()
       })
     ).resolves.toBe(true)
     expect(
-      pool.workerNodes[workerNodeKey].info.taskFunctionNames
-    ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
+      pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
+    ).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'test' },
+      { name: 'empty' }
+    ])
     await pool.destroy()
   })
 
@@ -1637,15 +1690,15 @@ describe('Abstract pool test suite', () => {
     await expect(
       pool.sendTaskFunctionOperationToWorkers({
         taskFunctionOperation: 'add',
-        taskFunctionName: 'empty',
+        taskFunctionProperties: { name: 'empty' },
         taskFunction: (() => {}).toString()
       })
     ).resolves.toBe(true)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.info.taskFunctionNames).toStrictEqual([
-        DEFAULT_TASK_NAME,
-        'test',
-        'empty'
+      expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+        { name: DEFAULT_TASK_NAME },
+        { name: 'test' },
+        { name: 'empty' }
       ])
     }
     await pool.destroy()