1 import { Worker as ClusterWorker } from 'node:cluster'
2 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
4 import { expect } from 'expect'
6 import { CircularArray } from '../../lib/circular-array.cjs'
7 import { WorkerTypes } from '../../lib/index.cjs'
8 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
9 import { PriorityQueue } from '../../lib/priority-queue.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
12 describe('Worker node test suite', () => {
13 const threadWorkerNode = new WorkerNode(
15 './tests/worker-files/thread/testWorker.mjs',
16 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
18 const clusterWorkerNode = new WorkerNode(
20 './tests/worker-files/cluster/testWorker.cjs',
21 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
24 it('Worker node instantiation', () => {
25 expect(() => new WorkerNode()).toThrow(
26 new TypeError('Cannot construct a worker node without a worker type')
32 './tests/worker-files/thread/testWorker.mjs',
33 { tasksQueueBackPressureSize: 12 }
37 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
44 './tests/worker-files/thread/testWorker.mjs'
48 'Cannot construct a worker node without worker node options'
55 './tests/worker-files/thread/testWorker.mjs',
60 'Cannot construct a worker node with invalid options: must be a plain object'
67 './tests/worker-files/thread/testWorker.mjs',
72 'Cannot construct a worker node without a tasks queue back pressure size option'
79 './tests/worker-files/thread/testWorker.mjs',
80 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
84 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
91 './tests/worker-files/thread/testWorker.mjs',
92 { tasksQueueBackPressureSize: 0.2 }
96 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
103 './tests/worker-files/thread/testWorker.mjs',
104 { tasksQueueBackPressureSize: 0 }
108 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
115 './tests/worker-files/thread/testWorker.mjs',
116 { tasksQueueBackPressureSize: -1 }
120 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
127 './tests/worker-files/thread/testWorker.mjs',
129 tasksQueueBackPressureSize: 12
134 'Cannot construct a worker node without a tasks queue bucket size option'
141 './tests/worker-files/thread/testWorker.mjs',
143 tasksQueueBackPressureSize: 12,
144 tasksQueueBucketSize: 'invalidTasksQueueBucketSize'
149 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
156 './tests/worker-files/thread/testWorker.mjs',
157 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
161 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
168 './tests/worker-files/thread/testWorker.mjs',
169 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
173 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
180 './tests/worker-files/thread/testWorker.mjs',
181 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
185 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
188 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
189 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
190 expect(threadWorkerNode.info).toStrictEqual({
191 id: threadWorkerNode.worker.threadId,
192 type: WorkerTypes.thread,
198 expect(threadWorkerNode.usage).toStrictEqual({
204 sequentiallyStolen: 0,
209 history: new CircularArray()
212 history: new CircularArray()
216 history: new CircularArray()
219 history: new CircularArray()
223 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
224 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
225 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
226 expect(threadWorkerNode.tasksQueue.size).toBe(0)
227 expect(threadWorkerNode.tasksQueue.k).toBe(6)
228 expect(threadWorkerNode.tasksQueueSize()).toBe(
229 threadWorkerNode.tasksQueue.size
231 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
232 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
234 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
235 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
236 expect(clusterWorkerNode.info).toStrictEqual({
237 id: clusterWorkerNode.worker.id,
238 type: WorkerTypes.cluster,
244 expect(clusterWorkerNode.usage).toStrictEqual({
250 sequentiallyStolen: 0,
255 history: new CircularArray()
258 history: new CircularArray()
262 history: new CircularArray()
265 history: new CircularArray()
269 expect(clusterWorkerNode.messageChannel).toBeUndefined()
270 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
271 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
272 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
273 expect(clusterWorkerNode.tasksQueue.k).toBe(6)
274 expect(clusterWorkerNode.tasksQueueSize()).toBe(
275 clusterWorkerNode.tasksQueue.size
277 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
278 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
281 it('Worker node getTaskFunctionWorkerUsage()', () => {
283 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
286 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
289 threadWorkerNode.info.taskFunctionsProperties = [
290 { name: DEFAULT_TASK_NAME },
294 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
297 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
300 threadWorkerNode.info.taskFunctionsProperties = [
301 { name: DEFAULT_TASK_NAME },
306 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
312 sequentiallyStolen: 0,
317 history: new CircularArray()
320 history: new CircularArray()
324 history: new CircularArray()
327 history: new CircularArray()
331 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
336 sequentiallyStolen: 0,
341 history: new CircularArray()
344 history: new CircularArray()
348 history: new CircularArray()
351 history: new CircularArray()
355 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
360 sequentiallyStolen: 0,
365 history: new CircularArray()
368 history: new CircularArray()
372 history: new CircularArray()
375 history: new CircularArray()
379 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
382 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
383 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
384 { name: DEFAULT_TASK_NAME },
388 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
390 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
392 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
393 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
394 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)