1 import { expect } from 'expect'
2 import { Worker as ClusterWorker } from 'node:cluster'
3 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
5 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
6 import { WorkerTypes } from '../../lib/index.cjs'
7 import { MeasurementHistorySize } from '../../lib/pools/worker.cjs'
8 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
9 import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
12 describe('Worker node test suite', () => {
13 let clusterWorkerNode, threadWorkerNode
15 before('Create worker nodes', () => {
16 threadWorkerNode = new WorkerNode(
18 './tests/worker-files/thread/testWorker.mjs',
20 tasksQueueBackPressureSize: 12,
21 tasksQueueBucketSize: 6,
22 tasksQueuePriority: true,
25 clusterWorkerNode = new WorkerNode(
27 './tests/worker-files/cluster/testWorker.cjs',
29 tasksQueueBackPressureSize: 12,
30 tasksQueueBucketSize: 6,
31 tasksQueuePriority: true,
36 after('Terminate worker nodes', async () => {
37 await threadWorkerNode.terminate()
38 await clusterWorkerNode.terminate()
41 it('Worker node instantiation', () => {
42 expect(() => new WorkerNode()).toThrow(
43 new TypeError('Cannot construct a worker node without a worker type')
49 './tests/worker-files/thread/testWorker.mjs'
53 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
60 './tests/worker-files/thread/testWorker.mjs'
64 'Cannot construct a worker node without worker node options'
71 './tests/worker-files/thread/testWorker.mjs',
76 'Cannot construct a worker node with invalid worker node options: must be a plain object'
83 './tests/worker-files/thread/testWorker.mjs',
88 'Cannot construct a worker node without a tasks queue back pressure size option'
95 './tests/worker-files/thread/testWorker.mjs',
96 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
100 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
107 './tests/worker-files/thread/testWorker.mjs',
108 { tasksQueueBackPressureSize: 0.2 }
112 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
119 './tests/worker-files/thread/testWorker.mjs',
120 { tasksQueueBackPressureSize: 0 }
124 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
131 './tests/worker-files/thread/testWorker.mjs',
132 { tasksQueueBackPressureSize: -1 }
136 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
143 './tests/worker-files/thread/testWorker.mjs',
145 tasksQueueBackPressureSize: 12,
150 'Cannot construct a worker node without a tasks queue bucket size option'
157 './tests/worker-files/thread/testWorker.mjs',
159 tasksQueueBackPressureSize: 12,
160 tasksQueueBucketSize: 'invalidTasksQueueBucketSize',
165 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
172 './tests/worker-files/thread/testWorker.mjs',
173 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
177 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
184 './tests/worker-files/thread/testWorker.mjs',
185 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
189 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
196 './tests/worker-files/thread/testWorker.mjs',
197 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
201 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
208 './tests/worker-files/thread/testWorker.mjs',
210 tasksQueueBackPressureSize: 12,
211 tasksQueueBucketSize: 6,
216 'Cannot construct a worker node without a tasks queue priority option'
223 './tests/worker-files/thread/testWorker.mjs',
225 tasksQueueBackPressureSize: 12,
226 tasksQueueBucketSize: 6,
227 tasksQueuePriority: 'invalidTasksQueuePriority',
232 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
235 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
236 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
237 expect(threadWorkerNode.info).toStrictEqual({
239 backPressureStealing: false,
240 continuousStealing: false,
242 id: threadWorkerNode.worker.threadId,
246 type: WorkerTypes.thread,
248 expect(threadWorkerNode.usage).toStrictEqual({
251 history: expect.any(CircularBuffer),
254 history: expect.any(CircularBuffer),
258 history: expect.any(CircularBuffer),
266 sequentiallyStolen: 0,
270 history: expect.any(CircularBuffer),
273 expect(threadWorkerNode.usage.runTime.history.items.length).toBe(
274 MeasurementHistorySize
276 expect(threadWorkerNode.usage.waitTime.history.items.length).toBe(
277 MeasurementHistorySize
279 expect(threadWorkerNode.usage.elu.idle.history.items.length).toBe(
280 MeasurementHistorySize
282 expect(threadWorkerNode.usage.elu.active.history.items.length).toBe(
283 MeasurementHistorySize
285 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
286 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
287 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
288 expect(threadWorkerNode.tasksQueue.size).toBe(0)
289 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
290 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
291 expect(threadWorkerNode.tasksQueueSize()).toBe(
292 threadWorkerNode.tasksQueue.size
294 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
295 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
297 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
298 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
299 expect(clusterWorkerNode.info).toStrictEqual({
301 backPressureStealing: false,
302 continuousStealing: false,
304 id: clusterWorkerNode.worker.id,
308 type: WorkerTypes.cluster,
310 expect(clusterWorkerNode.usage).toStrictEqual({
313 history: expect.any(CircularBuffer),
316 history: expect.any(CircularBuffer),
320 history: expect.any(CircularBuffer),
328 sequentiallyStolen: 0,
332 history: expect.any(CircularBuffer),
335 expect(clusterWorkerNode.usage.runTime.history.items.length).toBe(
336 MeasurementHistorySize
338 expect(clusterWorkerNode.usage.waitTime.history.items.length).toBe(
339 MeasurementHistorySize
341 expect(clusterWorkerNode.usage.elu.idle.history.items.length).toBe(
342 MeasurementHistorySize
344 expect(clusterWorkerNode.usage.elu.active.history.items.length).toBe(
345 MeasurementHistorySize
347 expect(clusterWorkerNode.messageChannel).toBeUndefined()
348 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
349 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
350 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
351 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
352 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
353 expect(clusterWorkerNode.tasksQueueSize()).toBe(
354 clusterWorkerNode.tasksQueue.size
356 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
357 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
360 it('Worker node getTaskFunctionWorkerUsage()', () => {
362 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
365 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
368 threadWorkerNode.info.taskFunctionsProperties = [
369 { name: DEFAULT_TASK_NAME },
373 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
376 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
379 threadWorkerNode.info.taskFunctionsProperties = [
380 { name: DEFAULT_TASK_NAME },
385 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
389 history: expect.any(CircularBuffer),
392 history: expect.any(CircularBuffer),
396 history: expect.any(CircularBuffer),
403 sequentiallyStolen: 0,
407 history: expect.any(CircularBuffer),
410 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
413 history: expect.any(CircularBuffer),
416 history: expect.any(CircularBuffer),
420 history: expect.any(CircularBuffer),
427 sequentiallyStolen: 0,
431 history: expect.any(CircularBuffer),
434 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
437 history: expect.any(CircularBuffer),
440 history: expect.any(CircularBuffer),
444 history: expect.any(CircularBuffer),
451 sequentiallyStolen: 0,
455 history: expect.any(CircularBuffer),
458 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
461 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
462 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
463 { name: DEFAULT_TASK_NAME },
467 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
469 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
471 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
472 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
473 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)