8698680f370dcddd803b9c157c7cae5f9a02f041
[poolifier.git] / tests / pools / worker-node.test.mjs
1 import { expect } from 'expect'
2 import { Worker as ClusterWorker } from 'node:cluster'
3 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
4
5 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
6 import { WorkerTypes } from '../../lib/index.cjs'
7 import { MeasurementHistorySize } from '../../lib/pools/worker.cjs'
8 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
9 import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
11
12 describe('Worker node test suite', () => {
13 let clusterWorkerNode, threadWorkerNode
14
15 before('Create worker nodes', () => {
16 threadWorkerNode = new WorkerNode(
17 WorkerTypes.thread,
18 './tests/worker-files/thread/testWorker.mjs',
19 {
20 tasksQueueBackPressureSize: 12,
21 tasksQueueBucketSize: 6,
22 tasksQueuePriority: true,
23 }
24 )
25 clusterWorkerNode = new WorkerNode(
26 WorkerTypes.cluster,
27 './tests/worker-files/cluster/testWorker.cjs',
28 {
29 tasksQueueBackPressureSize: 12,
30 tasksQueueBucketSize: 6,
31 tasksQueuePriority: true,
32 }
33 )
34 })
35
36 after('Terminate worker nodes', async () => {
37 await threadWorkerNode.terminate()
38 await clusterWorkerNode.terminate()
39 })
40
41 it('Worker node instantiation', () => {
42 expect(() => new WorkerNode()).toThrow(
43 new TypeError('Cannot construct a worker node without a worker type')
44 )
45 expect(
46 () =>
47 new WorkerNode(
48 'invalidWorkerType',
49 './tests/worker-files/thread/testWorker.mjs'
50 )
51 ).toThrow(
52 new TypeError(
53 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
54 )
55 )
56 expect(
57 () =>
58 new WorkerNode(
59 WorkerTypes.thread,
60 './tests/worker-files/thread/testWorker.mjs'
61 )
62 ).toThrow(
63 new TypeError(
64 'Cannot construct a worker node without worker node options'
65 )
66 )
67 expect(
68 () =>
69 new WorkerNode(
70 WorkerTypes.thread,
71 './tests/worker-files/thread/testWorker.mjs',
72 ''
73 )
74 ).toThrow(
75 new TypeError(
76 'Cannot construct a worker node with invalid worker node options: must be a plain object'
77 )
78 )
79 expect(
80 () =>
81 new WorkerNode(
82 WorkerTypes.thread,
83 './tests/worker-files/thread/testWorker.mjs',
84 {}
85 )
86 ).toThrow(
87 new TypeError(
88 'Cannot construct a worker node without a tasks queue back pressure size option'
89 )
90 )
91 expect(
92 () =>
93 new WorkerNode(
94 WorkerTypes.thread,
95 './tests/worker-files/thread/testWorker.mjs',
96 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
97 )
98 ).toThrow(
99 new TypeError(
100 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
101 )
102 )
103 expect(
104 () =>
105 new WorkerNode(
106 WorkerTypes.thread,
107 './tests/worker-files/thread/testWorker.mjs',
108 { tasksQueueBackPressureSize: 0.2 }
109 )
110 ).toThrow(
111 new TypeError(
112 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
113 )
114 )
115 expect(
116 () =>
117 new WorkerNode(
118 WorkerTypes.thread,
119 './tests/worker-files/thread/testWorker.mjs',
120 { tasksQueueBackPressureSize: 0 }
121 )
122 ).toThrow(
123 new RangeError(
124 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
125 )
126 )
127 expect(
128 () =>
129 new WorkerNode(
130 WorkerTypes.thread,
131 './tests/worker-files/thread/testWorker.mjs',
132 { tasksQueueBackPressureSize: -1 }
133 )
134 ).toThrow(
135 new RangeError(
136 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
137 )
138 )
139 expect(
140 () =>
141 new WorkerNode(
142 WorkerTypes.thread,
143 './tests/worker-files/thread/testWorker.mjs',
144 {
145 tasksQueueBackPressureSize: 12,
146 }
147 )
148 ).toThrow(
149 new TypeError(
150 'Cannot construct a worker node without a tasks queue bucket size option'
151 )
152 )
153 expect(
154 () =>
155 new WorkerNode(
156 WorkerTypes.thread,
157 './tests/worker-files/thread/testWorker.mjs',
158 {
159 tasksQueueBackPressureSize: 12,
160 tasksQueueBucketSize: 'invalidTasksQueueBucketSize',
161 }
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
166 )
167 )
168 expect(
169 () =>
170 new WorkerNode(
171 WorkerTypes.thread,
172 './tests/worker-files/thread/testWorker.mjs',
173 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
178 )
179 )
180 expect(
181 () =>
182 new WorkerNode(
183 WorkerTypes.thread,
184 './tests/worker-files/thread/testWorker.mjs',
185 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
190 )
191 )
192 expect(
193 () =>
194 new WorkerNode(
195 WorkerTypes.thread,
196 './tests/worker-files/thread/testWorker.mjs',
197 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
202 )
203 )
204 expect(
205 () =>
206 new WorkerNode(
207 WorkerTypes.thread,
208 './tests/worker-files/thread/testWorker.mjs',
209 {
210 tasksQueueBackPressureSize: 12,
211 tasksQueueBucketSize: 6,
212 }
213 )
214 ).toThrow(
215 new RangeError(
216 'Cannot construct a worker node without a tasks queue priority option'
217 )
218 )
219 expect(
220 () =>
221 new WorkerNode(
222 WorkerTypes.thread,
223 './tests/worker-files/thread/testWorker.mjs',
224 {
225 tasksQueueBackPressureSize: 12,
226 tasksQueueBucketSize: 6,
227 tasksQueuePriority: 'invalidTasksQueuePriority',
228 }
229 )
230 ).toThrow(
231 new RangeError(
232 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
233 )
234 )
235 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
236 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
237 expect(threadWorkerNode.info).toStrictEqual({
238 backPressure: false,
239 backPressureStealing: false,
240 continuousStealing: false,
241 dynamic: false,
242 id: threadWorkerNode.worker.threadId,
243 ready: false,
244 stealing: false,
245 stolen: false,
246 type: WorkerTypes.thread,
247 })
248 expect(threadWorkerNode.usage).toStrictEqual({
249 elu: {
250 active: {
251 history: expect.any(CircularBuffer),
252 },
253 idle: {
254 history: expect.any(CircularBuffer),
255 },
256 },
257 runTime: {
258 history: expect.any(CircularBuffer),
259 },
260 tasks: {
261 executed: 0,
262 executing: 0,
263 failed: 0,
264 maxQueued: 0,
265 queued: 0,
266 sequentiallyStolen: 0,
267 stolen: 0,
268 },
269 waitTime: {
270 history: expect.any(CircularBuffer),
271 },
272 })
273 expect(threadWorkerNode.usage.runTime.history.items.length).toBe(
274 MeasurementHistorySize
275 )
276 expect(threadWorkerNode.usage.waitTime.history.items.length).toBe(
277 MeasurementHistorySize
278 )
279 expect(threadWorkerNode.usage.elu.idle.history.items.length).toBe(
280 MeasurementHistorySize
281 )
282 expect(threadWorkerNode.usage.elu.active.history.items.length).toBe(
283 MeasurementHistorySize
284 )
285 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
286 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
287 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
288 expect(threadWorkerNode.tasksQueue.size).toBe(0)
289 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
290 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
291 expect(threadWorkerNode.tasksQueueSize()).toBe(
292 threadWorkerNode.tasksQueue.size
293 )
294 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
295 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
296
297 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
298 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
299 expect(clusterWorkerNode.info).toStrictEqual({
300 backPressure: false,
301 backPressureStealing: false,
302 continuousStealing: false,
303 dynamic: false,
304 id: clusterWorkerNode.worker.id,
305 ready: false,
306 stealing: false,
307 stolen: false,
308 type: WorkerTypes.cluster,
309 })
310 expect(clusterWorkerNode.usage).toStrictEqual({
311 elu: {
312 active: {
313 history: expect.any(CircularBuffer),
314 },
315 idle: {
316 history: expect.any(CircularBuffer),
317 },
318 },
319 runTime: {
320 history: expect.any(CircularBuffer),
321 },
322 tasks: {
323 executed: 0,
324 executing: 0,
325 failed: 0,
326 maxQueued: 0,
327 queued: 0,
328 sequentiallyStolen: 0,
329 stolen: 0,
330 },
331 waitTime: {
332 history: expect.any(CircularBuffer),
333 },
334 })
335 expect(clusterWorkerNode.usage.runTime.history.items.length).toBe(
336 MeasurementHistorySize
337 )
338 expect(clusterWorkerNode.usage.waitTime.history.items.length).toBe(
339 MeasurementHistorySize
340 )
341 expect(clusterWorkerNode.usage.elu.idle.history.items.length).toBe(
342 MeasurementHistorySize
343 )
344 expect(clusterWorkerNode.usage.elu.active.history.items.length).toBe(
345 MeasurementHistorySize
346 )
347 expect(clusterWorkerNode.messageChannel).toBeUndefined()
348 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
349 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
350 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
351 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
352 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
353 expect(clusterWorkerNode.tasksQueueSize()).toBe(
354 clusterWorkerNode.tasksQueue.size
355 )
356 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
357 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
358 })
359
360 it('Worker node getTaskFunctionWorkerUsage()', () => {
361 expect(() =>
362 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
363 ).toThrow(
364 new TypeError(
365 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
366 )
367 )
368 threadWorkerNode.info.taskFunctionsProperties = [
369 { name: DEFAULT_TASK_NAME },
370 { name: 'fn1' },
371 ]
372 expect(() =>
373 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
374 ).toThrow(
375 new TypeError(
376 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
377 )
378 )
379 threadWorkerNode.info.taskFunctionsProperties = [
380 { name: DEFAULT_TASK_NAME },
381 { name: 'fn1' },
382 { name: 'fn2' },
383 ]
384 expect(
385 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
386 ).toStrictEqual({
387 elu: {
388 active: {
389 history: expect.any(CircularBuffer),
390 },
391 idle: {
392 history: expect.any(CircularBuffer),
393 },
394 },
395 runTime: {
396 history: expect.any(CircularBuffer),
397 },
398 tasks: {
399 executed: 0,
400 executing: 0,
401 failed: 0,
402 queued: 0,
403 sequentiallyStolen: 0,
404 stolen: 0,
405 },
406 waitTime: {
407 history: expect.any(CircularBuffer),
408 },
409 })
410 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
411 elu: {
412 active: {
413 history: expect.any(CircularBuffer),
414 },
415 idle: {
416 history: expect.any(CircularBuffer),
417 },
418 },
419 runTime: {
420 history: expect.any(CircularBuffer),
421 },
422 tasks: {
423 executed: 0,
424 executing: 0,
425 failed: 0,
426 queued: 0,
427 sequentiallyStolen: 0,
428 stolen: 0,
429 },
430 waitTime: {
431 history: expect.any(CircularBuffer),
432 },
433 })
434 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
435 elu: {
436 active: {
437 history: expect.any(CircularBuffer),
438 },
439 idle: {
440 history: expect.any(CircularBuffer),
441 },
442 },
443 runTime: {
444 history: expect.any(CircularBuffer),
445 },
446 tasks: {
447 executed: 0,
448 executing: 0,
449 failed: 0,
450 queued: 0,
451 sequentiallyStolen: 0,
452 stolen: 0,
453 },
454 waitTime: {
455 history: expect.any(CircularBuffer),
456 },
457 })
458 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
459 })
460
461 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
462 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
463 { name: DEFAULT_TASK_NAME },
464 { name: 'fn1' },
465 { name: 'fn2' },
466 ])
467 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
468 expect(
469 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
470 ).toBe(false)
471 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
472 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
473 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
474 })
475 })