1 import { MessageChannel, Worker } from 'node:worker_threads'
2 import cluster from 'node:cluster'
3 import { expect } from 'expect'
4 import { WorkerNode } from '../../lib/pools/worker-node.js'
5 import { WorkerTypes } from '../../lib/index.js'
6 import { CircularArray } from '../../lib/circular-array.js'
7 import { Deque } from '../../lib/deque.js'
8 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
10 describe('Worker node test suite', () => {
11 const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
12 const clusterWorker = cluster.fork()
13 const threadWorkerNode = new WorkerNode(threadWorker, 12)
14 const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
16 it('Worker node instantiation', () => {
17 expect(() => new WorkerNode()).toThrow(
18 new TypeError('Cannot construct a worker node without a worker')
20 expect(() => new WorkerNode(threadWorker)).toThrow(
22 'Cannot construct a worker node without a tasks queue back pressure size'
26 () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
29 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
32 expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
34 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
37 expect(() => new WorkerNode(threadWorker, 0)).toThrow(
39 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
42 expect(() => new WorkerNode(threadWorker, -1)).toThrow(
44 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
47 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
48 expect(threadWorkerNode.worker).toBe(threadWorker)
49 expect(threadWorkerNode.info).toStrictEqual({
50 id: threadWorker.threadId,
51 type: WorkerTypes.thread,
55 expect(threadWorkerNode.usage).toStrictEqual({
61 sequentiallyStolen: 0,
66 history: new CircularArray()
69 history: new CircularArray()
73 history: new CircularArray()
76 history: new CircularArray()
80 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
81 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
82 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
83 expect(threadWorkerNode.tasksQueue.size).toBe(0)
84 expect(threadWorkerNode.tasksQueueSize()).toBe(
85 threadWorkerNode.tasksQueue.size
87 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
88 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
90 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
91 expect(clusterWorkerNode.worker).toBe(clusterWorker)
92 expect(clusterWorkerNode.info).toStrictEqual({
94 type: WorkerTypes.cluster,
98 expect(clusterWorkerNode.usage).toStrictEqual({
104 sequentiallyStolen: 0,
109 history: new CircularArray()
112 history: new CircularArray()
116 history: new CircularArray()
119 history: new CircularArray()
123 expect(clusterWorkerNode.messageChannel).toBeUndefined()
124 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
125 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
126 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
127 expect(clusterWorkerNode.tasksQueueSize()).toBe(
128 clusterWorkerNode.tasksQueue.size
130 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
131 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
134 it('Worker node getTaskFunctionWorkerUsage()', () => {
136 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
139 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
142 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
144 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
147 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
150 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
152 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
158 sequentiallyStolen: 0,
163 history: new CircularArray()
166 history: new CircularArray()
170 history: new CircularArray()
173 history: new CircularArray()
177 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
182 sequentiallyStolen: 0,
187 history: new CircularArray()
190 history: new CircularArray()
194 history: new CircularArray()
197 history: new CircularArray()
201 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
206 sequentiallyStolen: 0,
211 history: new CircularArray()
214 history: new CircularArray()
218 history: new CircularArray()
221 history: new CircularArray()
225 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
228 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
229 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
234 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
236 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
238 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
239 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
240 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)