1 import { Worker as ClusterWorker } from 'node:cluster'
2 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
4 import { expect } from 'expect'
6 import { CircularArray } from '../../lib/circular-array.cjs'
7 import { Deque } from '../../lib/deque.cjs'
8 import { WorkerTypes } from '../../lib/index.cjs'
9 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
12 describe('Worker node test suite', () => {
13 const threadWorkerNode = new WorkerNode(
15 './tests/worker-files/thread/testWorker.mjs',
16 { tasksQueueBackPressureSize: 12 }
18 const clusterWorkerNode = new WorkerNode(
20 './tests/worker-files/cluster/testWorker.cjs',
21 { tasksQueueBackPressureSize: 12 }
24 it('Worker node instantiation', () => {
25 expect(() => new WorkerNode()).toThrow(
26 new TypeError('Cannot construct a worker node without a worker type')
32 './tests/worker-files/thread/testWorker.mjs',
33 { tasksQueueBackPressureSize: 12 }
37 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
44 './tests/worker-files/thread/testWorker.mjs'
48 'Cannot construct a worker node without worker node options'
55 './tests/worker-files/thread/testWorker.mjs',
60 'Cannot construct a worker node with invalid options: must be a plain object'
67 './tests/worker-files/thread/testWorker.mjs',
72 'Cannot construct a worker node without a tasks queue back pressure size option'
79 './tests/worker-files/thread/testWorker.mjs',
80 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
84 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
91 './tests/worker-files/thread/testWorker.mjs',
92 { tasksQueueBackPressureSize: 0.2 }
96 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
103 './tests/worker-files/thread/testWorker.mjs',
104 { tasksQueueBackPressureSize: 0 }
108 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
115 './tests/worker-files/thread/testWorker.mjs',
116 { tasksQueueBackPressureSize: -1 }
120 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
123 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
124 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
125 expect(threadWorkerNode.info).toStrictEqual({
126 id: threadWorkerNode.worker.threadId,
127 type: WorkerTypes.thread,
132 expect(threadWorkerNode.usage).toStrictEqual({
138 sequentiallyStolen: 0,
143 history: new CircularArray()
146 history: new CircularArray()
150 history: new CircularArray()
153 history: new CircularArray()
157 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
158 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
159 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
160 expect(threadWorkerNode.tasksQueue.size).toBe(0)
161 expect(threadWorkerNode.tasksQueueSize()).toBe(
162 threadWorkerNode.tasksQueue.size
164 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
165 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
167 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
168 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
169 expect(clusterWorkerNode.info).toStrictEqual({
170 id: clusterWorkerNode.worker.id,
171 type: WorkerTypes.cluster,
176 expect(clusterWorkerNode.usage).toStrictEqual({
182 sequentiallyStolen: 0,
187 history: new CircularArray()
190 history: new CircularArray()
194 history: new CircularArray()
197 history: new CircularArray()
201 expect(clusterWorkerNode.messageChannel).toBeUndefined()
202 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
203 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
204 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
205 expect(clusterWorkerNode.tasksQueueSize()).toBe(
206 clusterWorkerNode.tasksQueue.size
208 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
209 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
212 it('Worker node getTaskFunctionWorkerUsage()', () => {
214 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
217 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
220 threadWorkerNode.info.taskFunctionsProperties = [
221 { name: DEFAULT_TASK_NAME },
225 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
228 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
231 threadWorkerNode.info.taskFunctionsProperties = [
232 { name: DEFAULT_TASK_NAME },
237 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
243 sequentiallyStolen: 0,
248 history: new CircularArray()
251 history: new CircularArray()
255 history: new CircularArray()
258 history: new CircularArray()
262 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
267 sequentiallyStolen: 0,
272 history: new CircularArray()
275 history: new CircularArray()
279 history: new CircularArray()
282 history: new CircularArray()
286 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
291 sequentiallyStolen: 0,
296 history: new CircularArray()
299 history: new CircularArray()
303 history: new CircularArray()
306 history: new CircularArray()
310 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
313 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
314 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
315 { name: DEFAULT_TASK_NAME },
319 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
321 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
323 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
324 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
325 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)