perf: use optimized circular buffer implementation to store measurements history
[poolifier.git] / tests / pools / worker-node.test.mjs
index b72c28872dc65b04f55354403cd423ea1b369916..54a69ca458a8566067c2c8af8160e6605214a6cf 100644 (file)
-import { MessageChannel, Worker } from 'node:worker_threads'
-import cluster from 'node:cluster'
+import { Worker as ClusterWorker } from 'node:cluster'
+import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
+
 import { expect } from 'expect'
-import { WorkerNode } from '../../lib/pools/worker-node.js'
-import { WorkerTypes } from '../../lib/index.js'
-import { CircularArray } from '../../lib/circular-array.js'
-import { Deque } from '../../lib/deque.js'
-import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
+
+import { CircularBuffer } from '../../lib/circular-buffer.cjs'
+import { WorkerTypes } from '../../lib/index.cjs'
+import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
+import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
 
 describe('Worker node test suite', () => {
-  const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
-  const clusterWorker = cluster.fork()
-  const threadWorkerNode = new WorkerNode(threadWorker, 12)
-  const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
+  const threadWorkerNode = new WorkerNode(
+    WorkerTypes.thread,
+    './tests/worker-files/thread/testWorker.mjs',
+    { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
+  )
+  const clusterWorkerNode = new WorkerNode(
+    WorkerTypes.cluster,
+    './tests/worker-files/cluster/testWorker.cjs',
+    { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
+  )
 
   it('Worker node instantiation', () => {
     expect(() => new WorkerNode()).toThrow(
-      new TypeError('Cannot construct a worker node without a worker')
+      new TypeError('Cannot construct a worker node without a worker type')
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          'invalidWorkerType',
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 12 }
+        )
+    ).toThrow(
+      new TypeError(
+        "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs'
+        )
+    ).toThrow(
+      new TypeError(
+        'Cannot construct a worker node without worker node options'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          ''
+        )
+    ).toThrow(
+      new TypeError(
+        'Cannot construct a worker node with invalid options: must be a plain object'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          {}
+        )
+    ).toThrow(
+      new TypeError(
+        'Cannot construct a worker node without a tasks queue back pressure size option'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
+        )
+    ).toThrow(
+      new TypeError(
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+      )
     )
-    expect(() => new WorkerNode(threadWorker)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 0.2 }
+        )
+    ).toThrow(
       new TypeError(
-        'Cannot construct a worker node without a tasks queue back pressure size'
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
       )
     )
     expect(
-      () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 0 }
+        )
+    ).toThrow(
+      new RangeError(
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: -1 }
+        )
+    ).toThrow(
+      new RangeError(
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          {
+            tasksQueueBackPressureSize: 12
+          }
+        )
+    ).toThrow(
+      new TypeError(
+        'Cannot construct a worker node without a tasks queue bucket size option'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          {
+            tasksQueueBackPressureSize: 12,
+            tasksQueueBucketSize: 'invalidTasksQueueBucketSize'
+          }
+        )
     ).toThrow(
       new TypeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+        'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
       )
     )
-    expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
+        )
+    ).toThrow(
       new TypeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+        'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
       )
     )
-    expect(() => new WorkerNode(threadWorker, 0)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
+        )
+    ).toThrow(
       new RangeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+        'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
       )
     )
-    expect(() => new WorkerNode(threadWorker, -1)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
+        )
+    ).toThrow(
       new RangeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+        'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
       )
     )
     expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
-    expect(threadWorkerNode.worker).toBe(threadWorker)
+    expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
     expect(threadWorkerNode.info).toStrictEqual({
-      id: threadWorker.threadId,
+      id: threadWorkerNode.worker.threadId,
       type: WorkerTypes.thread,
       dynamic: false,
-      ready: false
+      ready: false,
+      stealing: false,
+      backPressure: false
     })
     expect(threadWorkerNode.usage).toStrictEqual({
       tasks: {
@@ -58,42 +201,45 @@ describe('Worker node test suite', () => {
         executing: 0,
         queued: 0,
         maxQueued: 0,
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
       runTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       waitTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       elu: {
         idle: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         },
         active: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         }
       }
     })
     expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
     expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
-    expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+    expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
     expect(threadWorkerNode.tasksQueue.size).toBe(0)
+    expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
     expect(threadWorkerNode.tasksQueueSize()).toBe(
       threadWorkerNode.tasksQueue.size
     )
-    expect(threadWorkerNode.onBackPressureStarted).toBe(false)
-    expect(threadWorkerNode.onIdleWorkerNodeCount).toBe(0)
+    expect(threadWorkerNode.setBackPressureFlag).toBe(false)
     expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
 
     expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
-    expect(clusterWorkerNode.worker).toBe(clusterWorker)
+    expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
     expect(clusterWorkerNode.info).toStrictEqual({
-      id: clusterWorker.id,
+      id: clusterWorkerNode.worker.id,
       type: WorkerTypes.cluster,
       dynamic: false,
-      ready: false
+      ready: false,
+      stealing: false,
+      backPressure: false
     })
     expect(clusterWorkerNode.usage).toStrictEqual({
       tasks: {
@@ -101,33 +247,34 @@ describe('Worker node test suite', () => {
         executing: 0,
         queued: 0,
         maxQueued: 0,
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
       runTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       waitTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       elu: {
         idle: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         },
         active: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         }
       }
     })
     expect(clusterWorkerNode.messageChannel).toBeUndefined()
     expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
-    expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+    expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
     expect(clusterWorkerNode.tasksQueue.size).toBe(0)
+    expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
     expect(clusterWorkerNode.tasksQueueSize()).toBe(
       clusterWorkerNode.tasksQueue.size
     )
-    expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
-    expect(clusterWorkerNode.onIdleWorkerNodeCount).toBe(0)
+    expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
     expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
   })
 
@@ -136,18 +283,25 @@ describe('Worker node test suite', () => {
       threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
     ).toThrow(
       new TypeError(
-        "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
+        "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
       )
     )
-    threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
+    threadWorkerNode.info.taskFunctionsProperties = [
+      { name: DEFAULT_TASK_NAME },
+      { name: 'fn1' }
+    ]
     expect(() =>
       threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
     ).toThrow(
       new TypeError(
-        "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
+        "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
       )
     )
-    threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
+    threadWorkerNode.info.taskFunctionsProperties = [
+      { name: DEFAULT_TASK_NAME },
+      { name: 'fn1' },
+      { name: 'fn2' }
+    ]
     expect(
       threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
     ).toStrictEqual({
@@ -155,21 +309,22 @@ describe('Worker node test suite', () => {
         executed: 0,
         executing: 0,
         queued: 0,
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
       runTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       waitTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       elu: {
         idle: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         },
         active: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         }
       }
     })
@@ -178,21 +333,22 @@ describe('Worker node test suite', () => {
         executed: 0,
         executing: 0,
         queued: 0,
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
       runTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       waitTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       elu: {
         idle: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         },
         active: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         }
       }
     })
@@ -201,21 +357,22 @@ describe('Worker node test suite', () => {
         executed: 0,
         executing: 0,
         queued: 0,
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
       runTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       waitTime: {
-        history: new CircularArray()
+        history: expect.any(CircularBuffer)
       },
       elu: {
         idle: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         },
         active: {
-          history: new CircularArray()
+          history: expect.any(CircularBuffer)
         }
       }
     })
@@ -223,10 +380,10 @@ describe('Worker node test suite', () => {
   })
 
   it('Worker node deleteTaskFunctionWorkerUsage()', () => {
-    expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
-      DEFAULT_TASK_NAME,
-      'fn1',
-      'fn2'
+    expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
+      { name: DEFAULT_TASK_NAME },
+      { name: 'fn1' },
+      { name: 'fn2' }
     ])
     expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
     expect(