fix: disable `tasksStealingOnBackPressure` by default
[poolifier.git] / tests / pools / abstract-pool.test.mjs
index e79795188b272fe1842f539673f3f23841dff90f..8cbbc0a5dc59e522f3328271dbfd8cd812586388 100644 (file)
@@ -9,7 +9,6 @@ import { expect } from 'expect'
 import { restore, stub } from 'sinon'
 
 import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
 import {
   DynamicClusterPool,
   DynamicThreadPool,
@@ -21,6 +20,7 @@ import {
   WorkerTypes
 } from '../../lib/index.cjs'
 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
 import { waitPoolEvents } from '../test-utils.cjs'
 
@@ -275,7 +275,7 @@ describe('Abstract pool test suite', () => {
         concurrency: 2,
         size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
-        tasksStealingOnBackPressure: true,
+        tasksStealingOnBackPressure: false,
         tasksFinishedTimeout: 2000
       },
       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
@@ -468,8 +468,8 @@ describe('Abstract pool test suite', () => {
         median: false
       },
       waitTime: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       },
       elu: {
@@ -507,8 +507,8 @@ describe('Abstract pool test suite', () => {
         median: true
       },
       waitTime: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       },
       elu: {
@@ -546,8 +546,8 @@ describe('Abstract pool test suite', () => {
         median: false
       },
       waitTime: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       },
       elu: {
@@ -591,7 +591,7 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true,
+      tasksStealingOnBackPressure: false,
       tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(true, { concurrency: 2 })
@@ -600,7 +600,7 @@ describe('Abstract pool test suite', () => {
       concurrency: 2,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true,
+      tasksStealingOnBackPressure: false,
       tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(false)
@@ -619,7 +619,7 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true,
+      tasksStealingOnBackPressure: false,
       tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
@@ -786,9 +786,10 @@ describe('Abstract pool test suite', () => {
     )
     for (const workerNode of pool.workerNodes) {
       expect(workerNode).toBeInstanceOf(WorkerNode)
-      expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+      expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
+      expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
     }
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -798,9 +799,10 @@ describe('Abstract pool test suite', () => {
     )
     for (const workerNode of pool.workerNodes) {
       expect(workerNode).toBeInstanceOf(WorkerNode)
-      expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+      expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
+      expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
     }
     await pool.destroy()
   })
@@ -817,6 +819,7 @@ describe('Abstract pool test suite', () => {
         type: WorkerTypes.cluster,
         dynamic: false,
         ready: true,
+        backPressure: false,
         stealing: false
       })
     }
@@ -833,6 +836,7 @@ describe('Abstract pool test suite', () => {
         type: WorkerTypes.thread,
         dynamic: false,
         ready: true,
+        backPressure: false,
         stealing: false
       })
     }
@@ -1372,6 +1376,36 @@ describe('Abstract pool test suite', () => {
     await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
       new TypeError('taskFunction property must be a function')
     )
+    await expect(
+      dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
+    ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+    await expect(
+      dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
+    ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+    await expect(
+      dynamicThreadPool.addTaskFunction('test', {
+        taskFunction: () => {},
+        priority: -21
+      })
+    ).rejects.toThrow(
+      new RangeError("Property 'priority' must be between -20 and 19")
+    )
+    await expect(
+      dynamicThreadPool.addTaskFunction('test', {
+        taskFunction: () => {},
+        priority: 20
+      })
+    ).rejects.toThrow(
+      new RangeError("Property 'priority' must be between -20 and 19")
+    )
+    await expect(
+      dynamicThreadPool.addTaskFunction('test', {
+        taskFunction: () => {},
+        strategy: 'invalidStrategy'
+      })
+    ).rejects.toThrow(
+      new Error("Invalid worker choice strategy 'invalidStrategy'")
+    )
     expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
       { name: DEFAULT_TASK_NAME },
       { name: 'test' }
@@ -1423,21 +1457,55 @@ describe('Abstract pool test suite', () => {
         waitTime: {
           history: new CircularArray()
         },
-        elu: {
-          idle: {
-            aggregate: 0,
-            maximum: 0,
-            minimum: 0,
-            history: new CircularArray()
-          },
-          active: {
-            aggregate: 0,
-            maximum: 0,
-            minimum: 0,
-            history: new CircularArray()
-          }
-        }
+        elu: expect.objectContaining({
+          idle: expect.objectContaining({
+            history: expect.any(CircularArray)
+          }),
+          active: expect.objectContaining({
+            history: expect.any(CircularArray)
+          })
+        })
       })
+      expect(
+        workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
+      ).toBeGreaterThan(0)
+      if (
+        workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
+        null
+      ) {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+        ).toBeUndefined()
+      } else {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+        ).toBeGreaterThan(0)
+      }
+      if (
+        workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
+      ) {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+        ).toBeUndefined()
+      } else {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+        ).toBeGreaterThanOrEqual(0)
+      }
+      if (
+        workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
+      ) {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+        ).toBeUndefined()
+      } else {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+        ).toBeGreaterThanOrEqual(0)
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+        ).toBeLessThanOrEqual(1)
+      }
     }
     await dynamicThreadPool.destroy()
   })