]> Piment Noir Git Repositories - poolifier.git/blame - tests/pools/worker-node.test.mjs
Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
a274b5a5 1import { expect } from '@std/expect'
9974369e 2import { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
3import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
4
f12182ad 5import { CircularBuffer } from '../../lib/circular-buffer.cjs'
ded253e2
JB
6import { WorkerTypes } from '../../lib/index.cjs'
7import { WorkerNode } from '../../lib/pools/worker-node.cjs'
5de88a2a 8import { MeasurementHistorySize } from '../../lib/pools/worker.cjs'
c6dd1aeb 9import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
d35e5717 10import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
26fb3c18
JB
11
12describe('Worker node test suite', () => {
97231086 13 let clusterWorkerNode, threadWorkerNode
e1e0cb25
JB
14
15 before('Create worker nodes', () => {
16 threadWorkerNode = new WorkerNode(
17 WorkerTypes.thread,
18 './tests/worker-files/thread/testWorker.mjs',
19 {
20 tasksQueueBackPressureSize: 12,
21 tasksQueueBucketSize: 6,
22 tasksQueuePriority: true,
23 }
24 )
25 clusterWorkerNode = new WorkerNode(
26 WorkerTypes.cluster,
27 './tests/worker-files/cluster/testWorker.cjs',
28 {
29 tasksQueueBackPressureSize: 12,
30 tasksQueueBucketSize: 6,
31 tasksQueuePriority: true,
32 }
33 )
34 })
35
36 after('Terminate worker nodes', async () => {
37 await threadWorkerNode.terminate()
38 await clusterWorkerNode.terminate()
39 })
26fb3c18
JB
40
41 it('Worker node instantiation', () => {
948faff7 42 expect(() => new WorkerNode()).toThrow(
c3719753 43 new TypeError('Cannot construct a worker node without a worker type')
26fb3c18 44 )
c3719753
JB
45 expect(
46 () =>
47 new WorkerNode(
48 'invalidWorkerType',
fcfc3353 49 './tests/worker-files/thread/testWorker.mjs'
c3719753
JB
50 )
51 ).toThrow(
52 new TypeError(
53 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
54 )
55 )
56 expect(
57 () =>
58 new WorkerNode(
59 WorkerTypes.thread,
60 './tests/worker-files/thread/testWorker.mjs'
61 )
62 ).toThrow(
26fb3c18 63 new TypeError(
c3719753 64 'Cannot construct a worker node without worker node options'
26fb3c18
JB
65 )
66 )
67 expect(
c3719753
JB
68 () =>
69 new WorkerNode(
70 WorkerTypes.thread,
71 './tests/worker-files/thread/testWorker.mjs',
72 ''
73 )
948faff7 74 ).toThrow(
26fb3c18 75 new TypeError(
fcfc3353 76 'Cannot construct a worker node with invalid worker node options: must be a plain object'
26fb3c18
JB
77 )
78 )
c3719753
JB
79 expect(
80 () =>
81 new WorkerNode(
82 WorkerTypes.thread,
83 './tests/worker-files/thread/testWorker.mjs',
84 {}
85 )
86 ).toThrow(
5b49e864 87 new TypeError(
c3719753 88 'Cannot construct a worker node without a tasks queue back pressure size option'
5b49e864
JB
89 )
90 )
c3719753
JB
91 expect(
92 () =>
93 new WorkerNode(
94 WorkerTypes.thread,
95 './tests/worker-files/thread/testWorker.mjs',
96 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
97 )
98 ).toThrow(
99 new TypeError(
100 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
101 )
102 )
103 expect(
104 () =>
105 new WorkerNode(
106 WorkerTypes.thread,
107 './tests/worker-files/thread/testWorker.mjs',
108 { tasksQueueBackPressureSize: 0.2 }
109 )
110 ).toThrow(
111 new TypeError(
112 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
113 )
114 )
115 expect(
116 () =>
117 new WorkerNode(
118 WorkerTypes.thread,
119 './tests/worker-files/thread/testWorker.mjs',
120 { tasksQueueBackPressureSize: 0 }
121 )
122 ).toThrow(
5b49e864 123 new RangeError(
c3719753 124 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
125 )
126 )
c3719753
JB
127 expect(
128 () =>
129 new WorkerNode(
130 WorkerTypes.thread,
131 './tests/worker-files/thread/testWorker.mjs',
132 { tasksQueueBackPressureSize: -1 }
133 )
134 ).toThrow(
5b49e864 135 new RangeError(
c3719753 136 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
137 )
138 )
59ca7cff
JB
139 expect(
140 () =>
141 new WorkerNode(
142 WorkerTypes.thread,
143 './tests/worker-files/thread/testWorker.mjs',
144 {
3a502712 145 tasksQueueBackPressureSize: 12,
59ca7cff
JB
146 }
147 )
148 ).toThrow(
149 new TypeError(
150 'Cannot construct a worker node without a tasks queue bucket size option'
151 )
152 )
153 expect(
154 () =>
155 new WorkerNode(
156 WorkerTypes.thread,
157 './tests/worker-files/thread/testWorker.mjs',
158 {
159 tasksQueueBackPressureSize: 12,
3a502712 160 tasksQueueBucketSize: 'invalidTasksQueueBucketSize',
59ca7cff
JB
161 }
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
166 )
167 )
168 expect(
169 () =>
170 new WorkerNode(
171 WorkerTypes.thread,
172 './tests/worker-files/thread/testWorker.mjs',
173 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
178 )
179 )
180 expect(
181 () =>
182 new WorkerNode(
183 WorkerTypes.thread,
184 './tests/worker-files/thread/testWorker.mjs',
185 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
190 )
191 )
192 expect(
193 () =>
194 new WorkerNode(
195 WorkerTypes.thread,
196 './tests/worker-files/thread/testWorker.mjs',
197 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
202 )
203 )
fcfc3353
JB
204 expect(
205 () =>
206 new WorkerNode(
207 WorkerTypes.thread,
208 './tests/worker-files/thread/testWorker.mjs',
209 {
210 tasksQueueBackPressureSize: 12,
3a502712 211 tasksQueueBucketSize: 6,
fcfc3353
JB
212 }
213 )
214 ).toThrow(
7045f62e 215 new TypeError(
fcfc3353
JB
216 'Cannot construct a worker node without a tasks queue priority option'
217 )
218 )
219 expect(
220 () =>
221 new WorkerNode(
222 WorkerTypes.thread,
223 './tests/worker-files/thread/testWorker.mjs',
224 {
225 tasksQueueBackPressureSize: 12,
226 tasksQueueBucketSize: 6,
3a502712 227 tasksQueuePriority: 'invalidTasksQueuePriority',
fcfc3353
JB
228 }
229 )
230 ).toThrow(
7045f62e 231 new TypeError(
fcfc3353
JB
232 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
233 )
234 )
75de9f41 235 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 236 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
75de9f41 237 expect(threadWorkerNode.info).toStrictEqual({
97231086
JB
238 backPressure: false,
239 backPressureStealing: false,
240 continuousStealing: false,
26fb3c18 241 dynamic: false,
97231086 242 id: threadWorkerNode.worker.threadId,
f4289ecb 243 queuedTaskAbortion: false,
5eb72b9e 244 ready: false,
85b553ba 245 stealing: false,
eebfd819 246 stolen: false,
97231086 247 type: WorkerTypes.thread,
26fb3c18 248 })
75de9f41 249 expect(threadWorkerNode.usage).toStrictEqual({
97231086
JB
250 elu: {
251 active: {
252 history: expect.any(CircularBuffer),
253 },
254 idle: {
255 history: expect.any(CircularBuffer),
256 },
257 },
258 runTime: {
259 history: expect.any(CircularBuffer),
260 },
75de9f41
JB
261 tasks: {
262 executed: 0,
263 executing: 0,
97231086 264 failed: 0,
75de9f41 265 maxQueued: 0,
97231086 266 queued: 0,
463226a4 267 sequentiallyStolen: 0,
75de9f41 268 stolen: 0,
75de9f41
JB
269 },
270 waitTime: {
3a502712 271 history: expect.any(CircularBuffer),
75de9f41 272 },
75de9f41 273 })
bcac1803
JB
274 expect(threadWorkerNode.usage.runTime.history.items.length).toBe(
275 MeasurementHistorySize
276 )
277 expect(threadWorkerNode.usage.waitTime.history.items.length).toBe(
278 MeasurementHistorySize
279 )
280 expect(threadWorkerNode.usage.elu.idle.history.items.length).toBe(
281 MeasurementHistorySize
282 )
283 expect(threadWorkerNode.usage.elu.active.history.items.length).toBe(
284 MeasurementHistorySize
285 )
75de9f41
JB
286 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
287 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 288 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 289 expect(threadWorkerNode.tasksQueue.size).toBe(0)
2107bc57 290 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
fcfc3353 291 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
68f1f531
JB
292 expect(threadWorkerNode.tasksQueueSize()).toBe(
293 threadWorkerNode.tasksQueue.size
294 )
2eee7220 295 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
75de9f41
JB
296 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
297
298 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 299 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
75de9f41 300 expect(clusterWorkerNode.info).toStrictEqual({
97231086
JB
301 backPressure: false,
302 backPressureStealing: false,
303 continuousStealing: false,
75de9f41 304 dynamic: false,
97231086 305 id: clusterWorkerNode.worker.id,
f4289ecb 306 queuedTaskAbortion: false,
5eb72b9e 307 ready: false,
85b553ba 308 stealing: false,
eebfd819 309 stolen: false,
97231086 310 type: WorkerTypes.cluster,
75de9f41
JB
311 })
312 expect(clusterWorkerNode.usage).toStrictEqual({
97231086
JB
313 elu: {
314 active: {
315 history: expect.any(CircularBuffer),
316 },
317 idle: {
318 history: expect.any(CircularBuffer),
319 },
320 },
321 runTime: {
322 history: expect.any(CircularBuffer),
323 },
26fb3c18
JB
324 tasks: {
325 executed: 0,
326 executing: 0,
97231086 327 failed: 0,
26fb3c18 328 maxQueued: 0,
97231086 329 queued: 0,
463226a4 330 sequentiallyStolen: 0,
26fb3c18 331 stolen: 0,
26fb3c18
JB
332 },
333 waitTime: {
3a502712 334 history: expect.any(CircularBuffer),
26fb3c18 335 },
26fb3c18 336 })
bcac1803
JB
337 expect(clusterWorkerNode.usage.runTime.history.items.length).toBe(
338 MeasurementHistorySize
339 )
340 expect(clusterWorkerNode.usage.waitTime.history.items.length).toBe(
341 MeasurementHistorySize
342 )
343 expect(clusterWorkerNode.usage.elu.idle.history.items.length).toBe(
344 MeasurementHistorySize
345 )
346 expect(clusterWorkerNode.usage.elu.active.history.items.length).toBe(
347 MeasurementHistorySize
348 )
75de9f41
JB
349 expect(clusterWorkerNode.messageChannel).toBeUndefined()
350 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 351 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 352 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
2107bc57 353 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
fcfc3353 354 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
68f1f531
JB
355 expect(clusterWorkerNode.tasksQueueSize()).toBe(
356 clusterWorkerNode.tasksQueue.size
357 )
2eee7220 358 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
75de9f41 359 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
360 })
361
362 it('Worker node getTaskFunctionWorkerUsage()', () => {
363 expect(() =>
75de9f41 364 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 365 ).toThrow(
7045f62e 366 new Error(
31847469 367 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
26fb3c18
JB
368 )
369 )
31847469
JB
370 threadWorkerNode.info.taskFunctionsProperties = [
371 { name: DEFAULT_TASK_NAME },
3a502712 372 { name: 'fn1' },
31847469 373 ]
26fb3c18 374 expect(() =>
75de9f41 375 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 376 ).toThrow(
7045f62e 377 new Error(
31847469 378 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
26fb3c18
JB
379 )
380 )
31847469
JB
381 threadWorkerNode.info.taskFunctionsProperties = [
382 { name: DEFAULT_TASK_NAME },
383 { name: 'fn1' },
3a502712 384 { name: 'fn2' },
31847469 385 ]
6cd5248f 386 expect(
75de9f41 387 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 388 ).toStrictEqual({
97231086
JB
389 elu: {
390 active: {
391 history: expect.any(CircularBuffer),
392 },
393 idle: {
394 history: expect.any(CircularBuffer),
395 },
396 },
397 runTime: {
398 history: expect.any(CircularBuffer),
399 },
26fb3c18
JB
400 tasks: {
401 executed: 0,
402 executing: 0,
97231086 403 failed: 0,
26fb3c18 404 queued: 0,
463226a4 405 sequentiallyStolen: 0,
5ad42e34 406 stolen: 0,
26fb3c18
JB
407 },
408 waitTime: {
3a502712 409 history: expect.any(CircularBuffer),
26fb3c18 410 },
97231086
JB
411 })
412 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18 413 elu: {
97231086 414 active: {
3a502712 415 history: expect.any(CircularBuffer),
26fb3c18 416 },
97231086 417 idle: {
3a502712
JB
418 history: expect.any(CircularBuffer),
419 },
420 },
97231086
JB
421 runTime: {
422 history: expect.any(CircularBuffer),
423 },
26fb3c18
JB
424 tasks: {
425 executed: 0,
426 executing: 0,
97231086 427 failed: 0,
26fb3c18 428 queued: 0,
463226a4 429 sequentiallyStolen: 0,
5ad42e34 430 stolen: 0,
26fb3c18
JB
431 },
432 waitTime: {
3a502712 433 history: expect.any(CircularBuffer),
26fb3c18 434 },
97231086
JB
435 })
436 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18 437 elu: {
97231086 438 active: {
3a502712 439 history: expect.any(CircularBuffer),
26fb3c18 440 },
97231086 441 idle: {
3a502712
JB
442 history: expect.any(CircularBuffer),
443 },
444 },
97231086
JB
445 runTime: {
446 history: expect.any(CircularBuffer),
447 },
26fb3c18
JB
448 tasks: {
449 executed: 0,
450 executing: 0,
97231086 451 failed: 0,
26fb3c18 452 queued: 0,
463226a4 453 sequentiallyStolen: 0,
5ad42e34 454 stolen: 0,
26fb3c18
JB
455 },
456 waitTime: {
3a502712 457 history: expect.any(CircularBuffer),
26fb3c18 458 },
26fb3c18 459 })
75de9f41 460 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 461 })
adee6053
JB
462
463 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
31847469
JB
464 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
465 { name: DEFAULT_TASK_NAME },
466 { name: 'fn1' },
3a502712 467 { name: 'fn2' },
adee6053
JB
468 ])
469 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
470 expect(
471 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
472 ).toBe(false)
473 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
474 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
475 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
476 })
26fb3c18 477})