1 import { Worker as ClusterWorker } from 'node:cluster'
2 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
4 import { expect } from 'expect'
6 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
7 import { WorkerTypes } from '../../lib/index.cjs'
8 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
9 import { PriorityQueue } from '../../lib/priority-queue.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
12 describe('Worker node test suite', () => {
13 const threadWorkerNode = new WorkerNode(
15 './tests/worker-files/thread/testWorker.mjs',
17 tasksQueueBackPressureSize: 12,
18 tasksQueueBucketSize: 6,
19 tasksQueuePriority: true,
22 const clusterWorkerNode = new WorkerNode(
24 './tests/worker-files/cluster/testWorker.cjs',
26 tasksQueueBackPressureSize: 12,
27 tasksQueueBucketSize: 6,
28 tasksQueuePriority: true,
32 it('Worker node instantiation', () => {
33 expect(() => new WorkerNode()).toThrow(
34 new TypeError('Cannot construct a worker node without a worker type')
40 './tests/worker-files/thread/testWorker.mjs'
44 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
51 './tests/worker-files/thread/testWorker.mjs'
55 'Cannot construct a worker node without worker node options'
62 './tests/worker-files/thread/testWorker.mjs',
67 'Cannot construct a worker node with invalid worker node options: must be a plain object'
74 './tests/worker-files/thread/testWorker.mjs',
79 'Cannot construct a worker node without a tasks queue back pressure size option'
86 './tests/worker-files/thread/testWorker.mjs',
87 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
91 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
98 './tests/worker-files/thread/testWorker.mjs',
99 { tasksQueueBackPressureSize: 0.2 }
103 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
110 './tests/worker-files/thread/testWorker.mjs',
111 { tasksQueueBackPressureSize: 0 }
115 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
122 './tests/worker-files/thread/testWorker.mjs',
123 { tasksQueueBackPressureSize: -1 }
127 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
134 './tests/worker-files/thread/testWorker.mjs',
136 tasksQueueBackPressureSize: 12,
141 'Cannot construct a worker node without a tasks queue bucket size option'
148 './tests/worker-files/thread/testWorker.mjs',
150 tasksQueueBackPressureSize: 12,
151 tasksQueueBucketSize: 'invalidTasksQueueBucketSize',
156 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
163 './tests/worker-files/thread/testWorker.mjs',
164 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
168 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
175 './tests/worker-files/thread/testWorker.mjs',
176 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
180 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
187 './tests/worker-files/thread/testWorker.mjs',
188 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
192 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
199 './tests/worker-files/thread/testWorker.mjs',
201 tasksQueueBackPressureSize: 12,
202 tasksQueueBucketSize: 6,
207 'Cannot construct a worker node without a tasks queue priority option'
214 './tests/worker-files/thread/testWorker.mjs',
216 tasksQueueBackPressureSize: 12,
217 tasksQueueBucketSize: 6,
218 tasksQueuePriority: 'invalidTasksQueuePriority',
223 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
226 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
227 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
228 expect(threadWorkerNode.info).toStrictEqual({
229 id: threadWorkerNode.worker.threadId,
230 type: WorkerTypes.thread,
236 expect(threadWorkerNode.usage).toStrictEqual({
242 sequentiallyStolen: 0,
247 history: expect.any(CircularBuffer),
250 history: expect.any(CircularBuffer),
254 history: expect.any(CircularBuffer),
257 history: expect.any(CircularBuffer),
261 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
262 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
263 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
264 expect(threadWorkerNode.tasksQueue.size).toBe(0)
265 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
266 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
267 expect(threadWorkerNode.tasksQueueSize()).toBe(
268 threadWorkerNode.tasksQueue.size
270 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
271 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
273 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
274 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
275 expect(clusterWorkerNode.info).toStrictEqual({
276 id: clusterWorkerNode.worker.id,
277 type: WorkerTypes.cluster,
283 expect(clusterWorkerNode.usage).toStrictEqual({
289 sequentiallyStolen: 0,
294 history: expect.any(CircularBuffer),
297 history: expect.any(CircularBuffer),
301 history: expect.any(CircularBuffer),
304 history: expect.any(CircularBuffer),
308 expect(clusterWorkerNode.messageChannel).toBeUndefined()
309 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
310 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
311 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
312 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
313 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
314 expect(clusterWorkerNode.tasksQueueSize()).toBe(
315 clusterWorkerNode.tasksQueue.size
317 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
318 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
321 it('Worker node getTaskFunctionWorkerUsage()', () => {
323 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
326 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
329 threadWorkerNode.info.taskFunctionsProperties = [
330 { name: DEFAULT_TASK_NAME },
334 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
337 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
340 threadWorkerNode.info.taskFunctionsProperties = [
341 { name: DEFAULT_TASK_NAME },
346 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
352 sequentiallyStolen: 0,
357 history: expect.any(CircularBuffer),
360 history: expect.any(CircularBuffer),
364 history: expect.any(CircularBuffer),
367 history: expect.any(CircularBuffer),
371 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
376 sequentiallyStolen: 0,
381 history: expect.any(CircularBuffer),
384 history: expect.any(CircularBuffer),
388 history: expect.any(CircularBuffer),
391 history: expect.any(CircularBuffer),
395 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
400 sequentiallyStolen: 0,
405 history: expect.any(CircularBuffer),
408 history: expect.any(CircularBuffer),
412 history: expect.any(CircularBuffer),
415 history: expect.any(CircularBuffer),
419 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
422 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
423 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
424 { name: DEFAULT_TASK_NAME },
428 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
430 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
432 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
433 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
434 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)