549db4b6aa5ae82ac6d87ab3007c376f2e854853
[poolifier.git] / tests / pools / worker-node.test.mjs
1 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
2 import { Worker as ClusterWorker } from 'node:cluster'
3 import { expect } from 'expect'
4 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
5 import { WorkerTypes } from '../../lib/index.cjs'
6 import { CircularArray } from '../../lib/circular-array.cjs'
7 import { Deque } from '../../lib/deque.cjs'
8 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
9
10 describe('Worker node test suite', () => {
11 const threadWorkerNode = new WorkerNode(
12 WorkerTypes.thread,
13 './tests/worker-files/thread/testWorker.mjs',
14 { tasksQueueBackPressureSize: 12 }
15 )
16 const clusterWorkerNode = new WorkerNode(
17 WorkerTypes.cluster,
18 './tests/worker-files/cluster/testWorker.cjs',
19 { tasksQueueBackPressureSize: 12 }
20 )
21
22 it('Worker node instantiation', () => {
23 expect(() => new WorkerNode()).toThrow(
24 new TypeError('Cannot construct a worker node without a worker type')
25 )
26 expect(
27 () =>
28 new WorkerNode(
29 'invalidWorkerType',
30 './tests/worker-files/thread/testWorker.mjs',
31 { tasksQueueBackPressureSize: 12 }
32 )
33 ).toThrow(
34 new TypeError(
35 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
36 )
37 )
38 expect(
39 () =>
40 new WorkerNode(
41 WorkerTypes.thread,
42 './tests/worker-files/thread/testWorker.mjs'
43 )
44 ).toThrow(
45 new TypeError(
46 'Cannot construct a worker node without worker node options'
47 )
48 )
49 expect(
50 () =>
51 new WorkerNode(
52 WorkerTypes.thread,
53 './tests/worker-files/thread/testWorker.mjs',
54 ''
55 )
56 ).toThrow(
57 new TypeError(
58 'Cannot construct a worker node with invalid options: must be a plain object'
59 )
60 )
61 expect(
62 () =>
63 new WorkerNode(
64 WorkerTypes.thread,
65 './tests/worker-files/thread/testWorker.mjs',
66 {}
67 )
68 ).toThrow(
69 new TypeError(
70 'Cannot construct a worker node without a tasks queue back pressure size option'
71 )
72 )
73 expect(
74 () =>
75 new WorkerNode(
76 WorkerTypes.thread,
77 './tests/worker-files/thread/testWorker.mjs',
78 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
79 )
80 ).toThrow(
81 new TypeError(
82 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
83 )
84 )
85 expect(
86 () =>
87 new WorkerNode(
88 WorkerTypes.thread,
89 './tests/worker-files/thread/testWorker.mjs',
90 { tasksQueueBackPressureSize: 0.2 }
91 )
92 ).toThrow(
93 new TypeError(
94 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
95 )
96 )
97 expect(
98 () =>
99 new WorkerNode(
100 WorkerTypes.thread,
101 './tests/worker-files/thread/testWorker.mjs',
102 { tasksQueueBackPressureSize: 0 }
103 )
104 ).toThrow(
105 new RangeError(
106 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
107 )
108 )
109 expect(
110 () =>
111 new WorkerNode(
112 WorkerTypes.thread,
113 './tests/worker-files/thread/testWorker.mjs',
114 { tasksQueueBackPressureSize: -1 }
115 )
116 ).toThrow(
117 new RangeError(
118 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
119 )
120 )
121 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
122 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
123 expect(threadWorkerNode.info).toStrictEqual({
124 id: threadWorkerNode.worker.threadId,
125 type: WorkerTypes.thread,
126 dynamic: false,
127 ready: false,
128 stealing: false
129 })
130 expect(threadWorkerNode.usage).toStrictEqual({
131 tasks: {
132 executed: 0,
133 executing: 0,
134 queued: 0,
135 maxQueued: 0,
136 sequentiallyStolen: 0,
137 stolen: 0,
138 failed: 0
139 },
140 runTime: {
141 history: new CircularArray()
142 },
143 waitTime: {
144 history: new CircularArray()
145 },
146 elu: {
147 idle: {
148 history: new CircularArray()
149 },
150 active: {
151 history: new CircularArray()
152 }
153 }
154 })
155 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
156 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
157 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
158 expect(threadWorkerNode.tasksQueue.size).toBe(0)
159 expect(threadWorkerNode.tasksQueueSize()).toBe(
160 threadWorkerNode.tasksQueue.size
161 )
162 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
163 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
164
165 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
166 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
167 expect(clusterWorkerNode.info).toStrictEqual({
168 id: clusterWorkerNode.worker.id,
169 type: WorkerTypes.cluster,
170 dynamic: false,
171 ready: false,
172 stealing: false
173 })
174 expect(clusterWorkerNode.usage).toStrictEqual({
175 tasks: {
176 executed: 0,
177 executing: 0,
178 queued: 0,
179 maxQueued: 0,
180 sequentiallyStolen: 0,
181 stolen: 0,
182 failed: 0
183 },
184 runTime: {
185 history: new CircularArray()
186 },
187 waitTime: {
188 history: new CircularArray()
189 },
190 elu: {
191 idle: {
192 history: new CircularArray()
193 },
194 active: {
195 history: new CircularArray()
196 }
197 }
198 })
199 expect(clusterWorkerNode.messageChannel).toBeUndefined()
200 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
201 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
202 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
203 expect(clusterWorkerNode.tasksQueueSize()).toBe(
204 clusterWorkerNode.tasksQueue.size
205 )
206 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
207 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
208 })
209
210 it('Worker node getTaskFunctionWorkerUsage()', () => {
211 expect(() =>
212 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
213 ).toThrow(
214 new TypeError(
215 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
216 )
217 )
218 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
219 expect(() =>
220 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
221 ).toThrow(
222 new TypeError(
223 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
224 )
225 )
226 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
227 expect(
228 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
229 ).toStrictEqual({
230 tasks: {
231 executed: 0,
232 executing: 0,
233 queued: 0,
234 sequentiallyStolen: 0,
235 stolen: 0,
236 failed: 0
237 },
238 runTime: {
239 history: new CircularArray()
240 },
241 waitTime: {
242 history: new CircularArray()
243 },
244 elu: {
245 idle: {
246 history: new CircularArray()
247 },
248 active: {
249 history: new CircularArray()
250 }
251 }
252 })
253 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
254 tasks: {
255 executed: 0,
256 executing: 0,
257 queued: 0,
258 sequentiallyStolen: 0,
259 stolen: 0,
260 failed: 0
261 },
262 runTime: {
263 history: new CircularArray()
264 },
265 waitTime: {
266 history: new CircularArray()
267 },
268 elu: {
269 idle: {
270 history: new CircularArray()
271 },
272 active: {
273 history: new CircularArray()
274 }
275 }
276 })
277 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
278 tasks: {
279 executed: 0,
280 executing: 0,
281 queued: 0,
282 sequentiallyStolen: 0,
283 stolen: 0,
284 failed: 0
285 },
286 runTime: {
287 history: new CircularArray()
288 },
289 waitTime: {
290 history: new CircularArray()
291 },
292 elu: {
293 idle: {
294 history: new CircularArray()
295 },
296 active: {
297 history: new CircularArray()
298 }
299 }
300 })
301 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
302 })
303
304 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
305 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
306 DEFAULT_TASK_NAME,
307 'fn1',
308 'fn2'
309 ])
310 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
311 expect(
312 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
313 ).toBe(false)
314 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
315 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
316 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
317 })
318 })