]> Piment Noir Git Repositories - poolifier.git/blame_incremental - tests/pools/worker-node.test.mjs
build(deps): bump the regular group across 11 directories with 1 update (#2911)
[poolifier.git] / tests / pools / worker-node.test.mjs
... / ...
CommitLineData
1import { expect } from '@std/expect'
2import { Worker as ClusterWorker } from 'node:cluster'
3import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
4
5import { CircularBuffer } from '../../lib/circular-buffer.cjs'
6import { WorkerTypes } from '../../lib/index.cjs'
7import { WorkerNode } from '../../lib/pools/worker-node.cjs'
8import { MeasurementHistorySize } from '../../lib/pools/worker.cjs'
9import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
10import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
11
12describe('Worker node test suite', () => {
13 let clusterWorkerNode, threadWorkerNode
14
15 before('Create worker nodes', () => {
16 threadWorkerNode = new WorkerNode(
17 WorkerTypes.thread,
18 './tests/worker-files/thread/testWorker.mjs',
19 {
20 tasksQueueBackPressureSize: 12,
21 tasksQueueBucketSize: 6,
22 tasksQueuePriority: true,
23 }
24 )
25 clusterWorkerNode = new WorkerNode(
26 WorkerTypes.cluster,
27 './tests/worker-files/cluster/testWorker.cjs',
28 {
29 tasksQueueBackPressureSize: 12,
30 tasksQueueBucketSize: 6,
31 tasksQueuePriority: true,
32 }
33 )
34 })
35
36 after('Terminate worker nodes', async () => {
37 await threadWorkerNode.terminate()
38 await clusterWorkerNode.terminate()
39 })
40
41 it('Worker node instantiation', () => {
42 expect(() => new WorkerNode()).toThrow(
43 new TypeError('Cannot construct a worker node without a worker type')
44 )
45 expect(
46 () =>
47 new WorkerNode(
48 'invalidWorkerType',
49 './tests/worker-files/thread/testWorker.mjs'
50 )
51 ).toThrow(
52 new TypeError(
53 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
54 )
55 )
56 expect(
57 () =>
58 new WorkerNode(
59 WorkerTypes.thread,
60 './tests/worker-files/thread/testWorker.mjs'
61 )
62 ).toThrow(
63 new TypeError(
64 'Cannot construct a worker node without worker node options'
65 )
66 )
67 expect(
68 () =>
69 new WorkerNode(
70 WorkerTypes.thread,
71 './tests/worker-files/thread/testWorker.mjs',
72 ''
73 )
74 ).toThrow(
75 new TypeError(
76 'Cannot construct a worker node with invalid worker node options: must be a plain object'
77 )
78 )
79 expect(
80 () =>
81 new WorkerNode(
82 WorkerTypes.thread,
83 './tests/worker-files/thread/testWorker.mjs',
84 {}
85 )
86 ).toThrow(
87 new TypeError(
88 'Cannot construct a worker node without a tasks queue back pressure size option'
89 )
90 )
91 expect(
92 () =>
93 new WorkerNode(
94 WorkerTypes.thread,
95 './tests/worker-files/thread/testWorker.mjs',
96 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
97 )
98 ).toThrow(
99 new TypeError(
100 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
101 )
102 )
103 expect(
104 () =>
105 new WorkerNode(
106 WorkerTypes.thread,
107 './tests/worker-files/thread/testWorker.mjs',
108 { tasksQueueBackPressureSize: 0.2 }
109 )
110 ).toThrow(
111 new TypeError(
112 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
113 )
114 )
115 expect(
116 () =>
117 new WorkerNode(
118 WorkerTypes.thread,
119 './tests/worker-files/thread/testWorker.mjs',
120 { tasksQueueBackPressureSize: 0 }
121 )
122 ).toThrow(
123 new RangeError(
124 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
125 )
126 )
127 expect(
128 () =>
129 new WorkerNode(
130 WorkerTypes.thread,
131 './tests/worker-files/thread/testWorker.mjs',
132 { tasksQueueBackPressureSize: -1 }
133 )
134 ).toThrow(
135 new RangeError(
136 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
137 )
138 )
139 expect(
140 () =>
141 new WorkerNode(
142 WorkerTypes.thread,
143 './tests/worker-files/thread/testWorker.mjs',
144 {
145 tasksQueueBackPressureSize: 12,
146 }
147 )
148 ).toThrow(
149 new TypeError(
150 'Cannot construct a worker node without a tasks queue bucket size option'
151 )
152 )
153 expect(
154 () =>
155 new WorkerNode(
156 WorkerTypes.thread,
157 './tests/worker-files/thread/testWorker.mjs',
158 {
159 tasksQueueBackPressureSize: 12,
160 tasksQueueBucketSize: 'invalidTasksQueueBucketSize',
161 }
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
166 )
167 )
168 expect(
169 () =>
170 new WorkerNode(
171 WorkerTypes.thread,
172 './tests/worker-files/thread/testWorker.mjs',
173 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
178 )
179 )
180 expect(
181 () =>
182 new WorkerNode(
183 WorkerTypes.thread,
184 './tests/worker-files/thread/testWorker.mjs',
185 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
190 )
191 )
192 expect(
193 () =>
194 new WorkerNode(
195 WorkerTypes.thread,
196 './tests/worker-files/thread/testWorker.mjs',
197 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
202 )
203 )
204 expect(
205 () =>
206 new WorkerNode(
207 WorkerTypes.thread,
208 './tests/worker-files/thread/testWorker.mjs',
209 {
210 tasksQueueBackPressureSize: 12,
211 tasksQueueBucketSize: 6,
212 }
213 )
214 ).toThrow(
215 new TypeError(
216 'Cannot construct a worker node without a tasks queue priority option'
217 )
218 )
219 expect(
220 () =>
221 new WorkerNode(
222 WorkerTypes.thread,
223 './tests/worker-files/thread/testWorker.mjs',
224 {
225 tasksQueueBackPressureSize: 12,
226 tasksQueueBucketSize: 6,
227 tasksQueuePriority: 'invalidTasksQueuePriority',
228 }
229 )
230 ).toThrow(
231 new TypeError(
232 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
233 )
234 )
235 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
236 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
237 expect(threadWorkerNode.info).toStrictEqual({
238 backPressure: false,
239 backPressureStealing: false,
240 continuousStealing: false,
241 dynamic: false,
242 id: threadWorkerNode.worker.threadId,
243 queuedTaskAbortion: false,
244 ready: false,
245 stealing: false,
246 stolen: false,
247 type: WorkerTypes.thread,
248 })
249 expect(threadWorkerNode.usage).toStrictEqual({
250 elu: {
251 active: {
252 history: expect.any(CircularBuffer),
253 },
254 idle: {
255 history: expect.any(CircularBuffer),
256 },
257 },
258 runTime: {
259 history: expect.any(CircularBuffer),
260 },
261 tasks: {
262 executed: 0,
263 executing: 0,
264 failed: 0,
265 maxQueued: 0,
266 queued: 0,
267 sequentiallyStolen: 0,
268 stolen: 0,
269 },
270 waitTime: {
271 history: expect.any(CircularBuffer),
272 },
273 })
274 expect(threadWorkerNode.usage.runTime.history.items.length).toBe(
275 MeasurementHistorySize
276 )
277 expect(threadWorkerNode.usage.waitTime.history.items.length).toBe(
278 MeasurementHistorySize
279 )
280 expect(threadWorkerNode.usage.elu.idle.history.items.length).toBe(
281 MeasurementHistorySize
282 )
283 expect(threadWorkerNode.usage.elu.active.history.items.length).toBe(
284 MeasurementHistorySize
285 )
286 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
287 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
288 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
289 expect(threadWorkerNode.tasksQueue.size).toBe(0)
290 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
291 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
292 expect(threadWorkerNode.tasksQueueSize()).toBe(
293 threadWorkerNode.tasksQueue.size
294 )
295 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
296 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
297
298 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
299 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
300 expect(clusterWorkerNode.info).toStrictEqual({
301 backPressure: false,
302 backPressureStealing: false,
303 continuousStealing: false,
304 dynamic: false,
305 id: clusterWorkerNode.worker.id,
306 queuedTaskAbortion: false,
307 ready: false,
308 stealing: false,
309 stolen: false,
310 type: WorkerTypes.cluster,
311 })
312 expect(clusterWorkerNode.usage).toStrictEqual({
313 elu: {
314 active: {
315 history: expect.any(CircularBuffer),
316 },
317 idle: {
318 history: expect.any(CircularBuffer),
319 },
320 },
321 runTime: {
322 history: expect.any(CircularBuffer),
323 },
324 tasks: {
325 executed: 0,
326 executing: 0,
327 failed: 0,
328 maxQueued: 0,
329 queued: 0,
330 sequentiallyStolen: 0,
331 stolen: 0,
332 },
333 waitTime: {
334 history: expect.any(CircularBuffer),
335 },
336 })
337 expect(clusterWorkerNode.usage.runTime.history.items.length).toBe(
338 MeasurementHistorySize
339 )
340 expect(clusterWorkerNode.usage.waitTime.history.items.length).toBe(
341 MeasurementHistorySize
342 )
343 expect(clusterWorkerNode.usage.elu.idle.history.items.length).toBe(
344 MeasurementHistorySize
345 )
346 expect(clusterWorkerNode.usage.elu.active.history.items.length).toBe(
347 MeasurementHistorySize
348 )
349 expect(clusterWorkerNode.messageChannel).toBeUndefined()
350 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
351 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
352 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
353 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
354 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
355 expect(clusterWorkerNode.tasksQueueSize()).toBe(
356 clusterWorkerNode.tasksQueue.size
357 )
358 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
359 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
360 })
361
362 it('Worker node getTaskFunctionWorkerUsage()', () => {
363 expect(() =>
364 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
365 ).toThrow(
366 new Error(
367 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
368 )
369 )
370 threadWorkerNode.info.taskFunctionsProperties = [
371 { name: DEFAULT_TASK_NAME },
372 { name: 'fn1' },
373 ]
374 expect(() =>
375 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
376 ).toThrow(
377 new Error(
378 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
379 )
380 )
381 threadWorkerNode.info.taskFunctionsProperties = [
382 { name: DEFAULT_TASK_NAME },
383 { name: 'fn1' },
384 { name: 'fn2' },
385 ]
386 expect(
387 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
388 ).toStrictEqual({
389 elu: {
390 active: {
391 history: expect.any(CircularBuffer),
392 },
393 idle: {
394 history: expect.any(CircularBuffer),
395 },
396 },
397 runTime: {
398 history: expect.any(CircularBuffer),
399 },
400 tasks: {
401 executed: 0,
402 executing: 0,
403 failed: 0,
404 queued: 0,
405 sequentiallyStolen: 0,
406 stolen: 0,
407 },
408 waitTime: {
409 history: expect.any(CircularBuffer),
410 },
411 })
412 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
413 elu: {
414 active: {
415 history: expect.any(CircularBuffer),
416 },
417 idle: {
418 history: expect.any(CircularBuffer),
419 },
420 },
421 runTime: {
422 history: expect.any(CircularBuffer),
423 },
424 tasks: {
425 executed: 0,
426 executing: 0,
427 failed: 0,
428 queued: 0,
429 sequentiallyStolen: 0,
430 stolen: 0,
431 },
432 waitTime: {
433 history: expect.any(CircularBuffer),
434 },
435 })
436 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
437 elu: {
438 active: {
439 history: expect.any(CircularBuffer),
440 },
441 idle: {
442 history: expect.any(CircularBuffer),
443 },
444 },
445 runTime: {
446 history: expect.any(CircularBuffer),
447 },
448 tasks: {
449 executed: 0,
450 executing: 0,
451 failed: 0,
452 queued: 0,
453 sequentiallyStolen: 0,
454 stolen: 0,
455 },
456 waitTime: {
457 history: expect.any(CircularBuffer),
458 },
459 })
460 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
461 })
462
463 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
464 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
465 { name: DEFAULT_TASK_NAME },
466 { name: 'fn1' },
467 { name: 'fn2' },
468 ])
469 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
470 expect(
471 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
472 ).toBe(false)
473 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
474 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
475 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
476 })
477})