docs(api.md): add missing ToC entry
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
9974369e 1import { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
2import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
3
a074ffee 4import { expect } from 'expect'
ded253e2 5
f12182ad 6import { CircularBuffer } from '../../lib/circular-buffer.cjs'
ded253e2 7import { WorkerTypes } from '../../lib/index.cjs'
bcac1803 8import { MeasurementHistorySize } from '../../lib/pools/worker.cjs'
ded253e2 9import { WorkerNode } from '../../lib/pools/worker-node.cjs'
c6dd1aeb 10import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
d35e5717 11import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
26fb3c18
JB
12
13describe('Worker node test suite', () => {
c3719753
JB
14 const threadWorkerNode = new WorkerNode(
15 WorkerTypes.thread,
16 './tests/worker-files/thread/testWorker.mjs',
fcfc3353
JB
17 {
18 tasksQueueBackPressureSize: 12,
19 tasksQueueBucketSize: 6,
3a502712 20 tasksQueuePriority: true,
fcfc3353 21 }
c3719753
JB
22 )
23 const clusterWorkerNode = new WorkerNode(
24 WorkerTypes.cluster,
d35e5717 25 './tests/worker-files/cluster/testWorker.cjs',
fcfc3353
JB
26 {
27 tasksQueueBackPressureSize: 12,
28 tasksQueueBucketSize: 6,
3a502712 29 tasksQueuePriority: true,
fcfc3353 30 }
c3719753 31 )
26fb3c18
JB
32
33 it('Worker node instantiation', () => {
948faff7 34 expect(() => new WorkerNode()).toThrow(
c3719753 35 new TypeError('Cannot construct a worker node without a worker type')
26fb3c18 36 )
c3719753
JB
37 expect(
38 () =>
39 new WorkerNode(
40 'invalidWorkerType',
fcfc3353 41 './tests/worker-files/thread/testWorker.mjs'
c3719753
JB
42 )
43 ).toThrow(
44 new TypeError(
45 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
46 )
47 )
48 expect(
49 () =>
50 new WorkerNode(
51 WorkerTypes.thread,
52 './tests/worker-files/thread/testWorker.mjs'
53 )
54 ).toThrow(
26fb3c18 55 new TypeError(
c3719753 56 'Cannot construct a worker node without worker node options'
26fb3c18
JB
57 )
58 )
59 expect(
c3719753
JB
60 () =>
61 new WorkerNode(
62 WorkerTypes.thread,
63 './tests/worker-files/thread/testWorker.mjs',
64 ''
65 )
948faff7 66 ).toThrow(
26fb3c18 67 new TypeError(
fcfc3353 68 'Cannot construct a worker node with invalid worker node options: must be a plain object'
26fb3c18
JB
69 )
70 )
c3719753
JB
71 expect(
72 () =>
73 new WorkerNode(
74 WorkerTypes.thread,
75 './tests/worker-files/thread/testWorker.mjs',
76 {}
77 )
78 ).toThrow(
5b49e864 79 new TypeError(
c3719753 80 'Cannot construct a worker node without a tasks queue back pressure size option'
5b49e864
JB
81 )
82 )
c3719753
JB
83 expect(
84 () =>
85 new WorkerNode(
86 WorkerTypes.thread,
87 './tests/worker-files/thread/testWorker.mjs',
88 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
89 )
90 ).toThrow(
91 new TypeError(
92 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
93 )
94 )
95 expect(
96 () =>
97 new WorkerNode(
98 WorkerTypes.thread,
99 './tests/worker-files/thread/testWorker.mjs',
100 { tasksQueueBackPressureSize: 0.2 }
101 )
102 ).toThrow(
103 new TypeError(
104 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
105 )
106 )
107 expect(
108 () =>
109 new WorkerNode(
110 WorkerTypes.thread,
111 './tests/worker-files/thread/testWorker.mjs',
112 { tasksQueueBackPressureSize: 0 }
113 )
114 ).toThrow(
5b49e864 115 new RangeError(
c3719753 116 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
117 )
118 )
c3719753
JB
119 expect(
120 () =>
121 new WorkerNode(
122 WorkerTypes.thread,
123 './tests/worker-files/thread/testWorker.mjs',
124 { tasksQueueBackPressureSize: -1 }
125 )
126 ).toThrow(
5b49e864 127 new RangeError(
c3719753 128 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
129 )
130 )
59ca7cff
JB
131 expect(
132 () =>
133 new WorkerNode(
134 WorkerTypes.thread,
135 './tests/worker-files/thread/testWorker.mjs',
136 {
3a502712 137 tasksQueueBackPressureSize: 12,
59ca7cff
JB
138 }
139 )
140 ).toThrow(
141 new TypeError(
142 'Cannot construct a worker node without a tasks queue bucket size option'
143 )
144 )
145 expect(
146 () =>
147 new WorkerNode(
148 WorkerTypes.thread,
149 './tests/worker-files/thread/testWorker.mjs',
150 {
151 tasksQueueBackPressureSize: 12,
3a502712 152 tasksQueueBucketSize: 'invalidTasksQueueBucketSize',
59ca7cff
JB
153 }
154 )
155 ).toThrow(
156 new TypeError(
157 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
158 )
159 )
160 expect(
161 () =>
162 new WorkerNode(
163 WorkerTypes.thread,
164 './tests/worker-files/thread/testWorker.mjs',
165 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
166 )
167 ).toThrow(
168 new TypeError(
169 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
170 )
171 )
172 expect(
173 () =>
174 new WorkerNode(
175 WorkerTypes.thread,
176 './tests/worker-files/thread/testWorker.mjs',
177 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
178 )
179 ).toThrow(
180 new RangeError(
181 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
182 )
183 )
184 expect(
185 () =>
186 new WorkerNode(
187 WorkerTypes.thread,
188 './tests/worker-files/thread/testWorker.mjs',
189 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
190 )
191 ).toThrow(
192 new RangeError(
193 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
194 )
195 )
fcfc3353
JB
196 expect(
197 () =>
198 new WorkerNode(
199 WorkerTypes.thread,
200 './tests/worker-files/thread/testWorker.mjs',
201 {
202 tasksQueueBackPressureSize: 12,
3a502712 203 tasksQueueBucketSize: 6,
fcfc3353
JB
204 }
205 )
206 ).toThrow(
207 new RangeError(
208 'Cannot construct a worker node without a tasks queue priority option'
209 )
210 )
211 expect(
212 () =>
213 new WorkerNode(
214 WorkerTypes.thread,
215 './tests/worker-files/thread/testWorker.mjs',
216 {
217 tasksQueueBackPressureSize: 12,
218 tasksQueueBucketSize: 6,
3a502712 219 tasksQueuePriority: 'invalidTasksQueuePriority',
fcfc3353
JB
220 }
221 )
222 ).toThrow(
223 new RangeError(
224 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
225 )
226 )
75de9f41 227 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 228 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
75de9f41 229 expect(threadWorkerNode.info).toStrictEqual({
c3719753 230 id: threadWorkerNode.worker.threadId,
26fb3c18
JB
231 type: WorkerTypes.thread,
232 dynamic: false,
5eb72b9e 233 ready: false,
85b553ba 234 stealing: false,
3a502712 235 backPressure: false,
26fb3c18 236 })
75de9f41
JB
237 expect(threadWorkerNode.usage).toStrictEqual({
238 tasks: {
239 executed: 0,
240 executing: 0,
241 queued: 0,
242 maxQueued: 0,
463226a4 243 sequentiallyStolen: 0,
75de9f41 244 stolen: 0,
3a502712 245 failed: 0,
75de9f41
JB
246 },
247 runTime: {
3a502712 248 history: expect.any(CircularBuffer),
75de9f41
JB
249 },
250 waitTime: {
3a502712 251 history: expect.any(CircularBuffer),
75de9f41
JB
252 },
253 elu: {
254 idle: {
3a502712 255 history: expect.any(CircularBuffer),
75de9f41
JB
256 },
257 active: {
3a502712
JB
258 history: expect.any(CircularBuffer),
259 },
260 },
75de9f41 261 })
bcac1803
JB
262 expect(threadWorkerNode.usage.runTime.history.items.length).toBe(
263 MeasurementHistorySize
264 )
265 expect(threadWorkerNode.usage.waitTime.history.items.length).toBe(
266 MeasurementHistorySize
267 )
268 expect(threadWorkerNode.usage.elu.idle.history.items.length).toBe(
269 MeasurementHistorySize
270 )
271 expect(threadWorkerNode.usage.elu.active.history.items.length).toBe(
272 MeasurementHistorySize
273 )
75de9f41
JB
274 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
275 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 276 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 277 expect(threadWorkerNode.tasksQueue.size).toBe(0)
2107bc57 278 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
fcfc3353 279 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
68f1f531
JB
280 expect(threadWorkerNode.tasksQueueSize()).toBe(
281 threadWorkerNode.tasksQueue.size
282 )
2eee7220 283 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
75de9f41
JB
284 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
285
286 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 287 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
75de9f41 288 expect(clusterWorkerNode.info).toStrictEqual({
c3719753 289 id: clusterWorkerNode.worker.id,
75de9f41
JB
290 type: WorkerTypes.cluster,
291 dynamic: false,
5eb72b9e 292 ready: false,
85b553ba 293 stealing: false,
3a502712 294 backPressure: false,
75de9f41
JB
295 })
296 expect(clusterWorkerNode.usage).toStrictEqual({
26fb3c18
JB
297 tasks: {
298 executed: 0,
299 executing: 0,
300 queued: 0,
301 maxQueued: 0,
463226a4 302 sequentiallyStolen: 0,
26fb3c18 303 stolen: 0,
3a502712 304 failed: 0,
26fb3c18
JB
305 },
306 runTime: {
3a502712 307 history: expect.any(CircularBuffer),
26fb3c18
JB
308 },
309 waitTime: {
3a502712 310 history: expect.any(CircularBuffer),
26fb3c18
JB
311 },
312 elu: {
313 idle: {
3a502712 314 history: expect.any(CircularBuffer),
26fb3c18
JB
315 },
316 active: {
3a502712
JB
317 history: expect.any(CircularBuffer),
318 },
319 },
26fb3c18 320 })
bcac1803
JB
321 expect(clusterWorkerNode.usage.runTime.history.items.length).toBe(
322 MeasurementHistorySize
323 )
324 expect(clusterWorkerNode.usage.waitTime.history.items.length).toBe(
325 MeasurementHistorySize
326 )
327 expect(clusterWorkerNode.usage.elu.idle.history.items.length).toBe(
328 MeasurementHistorySize
329 )
330 expect(clusterWorkerNode.usage.elu.active.history.items.length).toBe(
331 MeasurementHistorySize
332 )
75de9f41
JB
333 expect(clusterWorkerNode.messageChannel).toBeUndefined()
334 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 335 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 336 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
2107bc57 337 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
fcfc3353 338 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
68f1f531
JB
339 expect(clusterWorkerNode.tasksQueueSize()).toBe(
340 clusterWorkerNode.tasksQueue.size
341 )
2eee7220 342 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
75de9f41 343 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
344 })
345
346 it('Worker node getTaskFunctionWorkerUsage()', () => {
347 expect(() =>
75de9f41 348 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 349 ).toThrow(
26fb3c18 350 new TypeError(
31847469 351 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
26fb3c18
JB
352 )
353 )
31847469
JB
354 threadWorkerNode.info.taskFunctionsProperties = [
355 { name: DEFAULT_TASK_NAME },
3a502712 356 { name: 'fn1' },
31847469 357 ]
26fb3c18 358 expect(() =>
75de9f41 359 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 360 ).toThrow(
26fb3c18 361 new TypeError(
31847469 362 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
26fb3c18
JB
363 )
364 )
31847469
JB
365 threadWorkerNode.info.taskFunctionsProperties = [
366 { name: DEFAULT_TASK_NAME },
367 { name: 'fn1' },
3a502712 368 { name: 'fn2' },
31847469 369 ]
6cd5248f 370 expect(
75de9f41 371 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 372 ).toStrictEqual({
26fb3c18
JB
373 tasks: {
374 executed: 0,
375 executing: 0,
376 queued: 0,
463226a4 377 sequentiallyStolen: 0,
5ad42e34 378 stolen: 0,
3a502712 379 failed: 0,
26fb3c18
JB
380 },
381 runTime: {
3a502712 382 history: expect.any(CircularBuffer),
26fb3c18
JB
383 },
384 waitTime: {
3a502712 385 history: expect.any(CircularBuffer),
26fb3c18
JB
386 },
387 elu: {
388 idle: {
3a502712 389 history: expect.any(CircularBuffer),
26fb3c18
JB
390 },
391 active: {
3a502712
JB
392 history: expect.any(CircularBuffer),
393 },
394 },
26fb3c18 395 })
75de9f41 396 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18
JB
397 tasks: {
398 executed: 0,
399 executing: 0,
400 queued: 0,
463226a4 401 sequentiallyStolen: 0,
5ad42e34 402 stolen: 0,
3a502712 403 failed: 0,
26fb3c18
JB
404 },
405 runTime: {
3a502712 406 history: expect.any(CircularBuffer),
26fb3c18
JB
407 },
408 waitTime: {
3a502712 409 history: expect.any(CircularBuffer),
26fb3c18
JB
410 },
411 elu: {
412 idle: {
3a502712 413 history: expect.any(CircularBuffer),
26fb3c18
JB
414 },
415 active: {
3a502712
JB
416 history: expect.any(CircularBuffer),
417 },
418 },
26fb3c18 419 })
75de9f41 420 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18
JB
421 tasks: {
422 executed: 0,
423 executing: 0,
424 queued: 0,
463226a4 425 sequentiallyStolen: 0,
5ad42e34 426 stolen: 0,
3a502712 427 failed: 0,
26fb3c18
JB
428 },
429 runTime: {
3a502712 430 history: expect.any(CircularBuffer),
26fb3c18
JB
431 },
432 waitTime: {
3a502712 433 history: expect.any(CircularBuffer),
26fb3c18
JB
434 },
435 elu: {
436 idle: {
3a502712 437 history: expect.any(CircularBuffer),
26fb3c18
JB
438 },
439 active: {
3a502712
JB
440 history: expect.any(CircularBuffer),
441 },
442 },
26fb3c18 443 })
75de9f41 444 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 445 })
adee6053
JB
446
447 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
31847469
JB
448 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
449 { name: DEFAULT_TASK_NAME },
450 { name: 'fn1' },
3a502712 451 { name: 'fn2' },
adee6053
JB
452 ])
453 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
454 expect(
455 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
456 ).toBe(false)
457 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
458 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
459 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
460 })
26fb3c18 461})