1 import { Worker as ClusterWorker } from 'node:cluster'
2 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
4 import { expect } from 'expect'
6 import { CircularArray } from '../../lib/circular-array.cjs'
7 import { Deque } from '../../lib/deque.cjs'
8 import { WorkerTypes } from '../../lib/index.cjs'
9 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
12 describe('Worker node test suite', () => {
13 const threadWorkerNode = new WorkerNode(
15 './tests/worker-files/thread/testWorker.mjs',
16 { tasksQueueBackPressureSize: 12 }
18 const clusterWorkerNode = new WorkerNode(
20 './tests/worker-files/cluster/testWorker.cjs',
21 { tasksQueueBackPressureSize: 12 }
24 it('Worker node instantiation', () => {
25 expect(() => new WorkerNode()).toThrow(
26 new TypeError('Cannot construct a worker node without a worker type')
32 './tests/worker-files/thread/testWorker.mjs',
33 { tasksQueueBackPressureSize: 12 }
37 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
44 './tests/worker-files/thread/testWorker.mjs'
48 'Cannot construct a worker node without worker node options'
55 './tests/worker-files/thread/testWorker.mjs',
60 'Cannot construct a worker node with invalid options: must be a plain object'
67 './tests/worker-files/thread/testWorker.mjs',
72 'Cannot construct a worker node without a tasks queue back pressure size option'
79 './tests/worker-files/thread/testWorker.mjs',
80 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
84 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
91 './tests/worker-files/thread/testWorker.mjs',
92 { tasksQueueBackPressureSize: 0.2 }
96 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
103 './tests/worker-files/thread/testWorker.mjs',
104 { tasksQueueBackPressureSize: 0 }
108 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
115 './tests/worker-files/thread/testWorker.mjs',
116 { tasksQueueBackPressureSize: -1 }
120 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
123 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
124 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
125 expect(threadWorkerNode.info).toStrictEqual({
126 id: threadWorkerNode.worker.threadId,
127 type: WorkerTypes.thread,
132 expect(threadWorkerNode.usage).toStrictEqual({
138 sequentiallyStolen: 0,
143 history: new CircularArray()
146 history: new CircularArray()
150 history: new CircularArray()
153 history: new CircularArray()
157 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
158 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
159 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
160 expect(threadWorkerNode.tasksQueue.size).toBe(0)
161 expect(threadWorkerNode.tasksQueueSize()).toBe(
162 threadWorkerNode.tasksQueue.size
164 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
165 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
167 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
168 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
169 expect(clusterWorkerNode.info).toStrictEqual({
170 id: clusterWorkerNode.worker.id,
171 type: WorkerTypes.cluster,
176 expect(clusterWorkerNode.usage).toStrictEqual({
182 sequentiallyStolen: 0,
187 history: new CircularArray()
190 history: new CircularArray()
194 history: new CircularArray()
197 history: new CircularArray()
201 expect(clusterWorkerNode.messageChannel).toBeUndefined()
202 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
203 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
204 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
205 expect(clusterWorkerNode.tasksQueueSize()).toBe(
206 clusterWorkerNode.tasksQueue.size
208 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
209 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
212 it('Worker node getTaskFunctionWorkerUsage()', () => {
214 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
217 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
220 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
222 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
225 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
228 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
230 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
236 sequentiallyStolen: 0,
241 history: new CircularArray()
244 history: new CircularArray()
248 history: new CircularArray()
251 history: new CircularArray()
255 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
260 sequentiallyStolen: 0,
265 history: new CircularArray()
268 history: new CircularArray()
272 history: new CircularArray()
275 history: new CircularArray()
279 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
284 sequentiallyStolen: 0,
289 history: new CircularArray()
292 history: new CircularArray()
296 history: new CircularArray()
299 history: new CircularArray()
303 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
306 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
307 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
312 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
314 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
316 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
317 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
318 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)