777fff9f62574be885052bcce2e3d7b2f6215837
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { expect } from 'expect'
2 // eslint-disable-next-line n/no-unsupported-features/node-builtins
3 import { createHook, executionAsyncId } from 'node:async_hooks'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { readFileSync } from 'node:fs'
6 import { dirname, join } from 'node:path'
7 import { fileURLToPath } from 'node:url'
8
9 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
10 import {
11 DynamicClusterPool,
12 DynamicThreadPool,
13 FixedClusterPool,
14 FixedThreadPool,
15 PoolEvents,
16 PoolTypes,
17 WorkerChoiceStrategies,
18 WorkerTypes,
19 } from '../../lib/index.cjs'
20 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
21 import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
22 import { defaultBucketSize } from '../../lib/queues/queue-types.cjs'
23 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
24 import { waitPoolEvents } from '../test-utils.cjs'
25
26 describe('Abstract pool test suite', () => {
27 const version = JSON.parse(
28 readFileSync(
29 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 'utf8'
31 )
32 ).version
33 const numberOfWorkers = 2
34 class StubPoolWithIsMain extends FixedThreadPool {
35 isMain () {
36 return false
37 }
38 }
39
40 it('Verify that pool can be created and destroyed', async () => {
41 const pool = new FixedThreadPool(
42 numberOfWorkers,
43 './tests/worker-files/thread/testWorker.mjs'
44 )
45 expect(pool).toBeInstanceOf(FixedThreadPool)
46 await pool.destroy()
47 })
48
49 it('Verify that pool cannot be created from a non main thread/process', () => {
50 expect(
51 () =>
52 new StubPoolWithIsMain(
53 numberOfWorkers,
54 './tests/worker-files/thread/testWorker.mjs',
55 {
56 errorHandler: e => console.error(e),
57 }
58 )
59 ).toThrow(
60 new Error(
61 'Cannot start a pool from a worker with the same type as the pool'
62 )
63 )
64 })
65
66 it('Verify that pool statuses properties are set', async () => {
67 const pool = new FixedThreadPool(
68 numberOfWorkers,
69 './tests/worker-files/thread/testWorker.mjs'
70 )
71 expect(pool.started).toBe(true)
72 expect(pool.starting).toBe(false)
73 expect(pool.destroying).toBe(false)
74 await pool.destroy()
75 expect(pool.started).toBe(false)
76 expect(pool.starting).toBe(false)
77 expect(pool.destroying).toBe(false)
78 })
79
80 it('Verify that filePath is checked', () => {
81 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
82 new TypeError('The worker file path must be specified')
83 )
84 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
85 new TypeError('The worker file path must be a string')
86 )
87 expect(
88 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
89 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
90 })
91
92 it('Verify that numberOfWorkers is checked', () => {
93 expect(
94 () =>
95 new FixedThreadPool(
96 undefined,
97 './tests/worker-files/thread/testWorker.mjs'
98 )
99 ).toThrow(
100 new Error(
101 'Cannot instantiate a pool without specifying the number of workers'
102 )
103 )
104 })
105
106 it('Verify that a negative number of workers is checked', () => {
107 expect(
108 () =>
109 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
110 ).toThrow(
111 new RangeError(
112 'Cannot instantiate a pool with a negative number of workers'
113 )
114 )
115 })
116
117 it('Verify that a non integer number of workers is checked', () => {
118 expect(
119 () =>
120 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
121 ).toThrow(
122 new TypeError(
123 'Cannot instantiate a pool with a non safe integer number of workers'
124 )
125 )
126 })
127
128 it('Verify that pool arguments number and pool type are checked', () => {
129 expect(
130 () =>
131 new FixedThreadPool(
132 numberOfWorkers,
133 './tests/worker-files/thread/testWorker.mjs',
134 undefined,
135 numberOfWorkers * 2
136 )
137 ).toThrow(
138 new Error(
139 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
140 )
141 )
142 })
143
144 it('Verify that dynamic pool sizing is checked', () => {
145 expect(
146 () =>
147 new DynamicClusterPool(
148 1,
149 undefined,
150 './tests/worker-files/cluster/testWorker.cjs'
151 )
152 ).toThrow(
153 new TypeError(
154 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
155 )
156 )
157 expect(
158 () =>
159 new DynamicThreadPool(
160 0.5,
161 1,
162 './tests/worker-files/thread/testWorker.mjs'
163 )
164 ).toThrow(
165 new TypeError(
166 'Cannot instantiate a pool with a non safe integer number of workers'
167 )
168 )
169 expect(
170 () =>
171 new DynamicClusterPool(
172 0,
173 0.5,
174 './tests/worker-files/cluster/testWorker.cjs'
175 )
176 ).toThrow(
177 new TypeError(
178 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
179 )
180 )
181 expect(
182 () =>
183 new DynamicThreadPool(
184 2,
185 1,
186 './tests/worker-files/thread/testWorker.mjs'
187 )
188 ).toThrow(
189 new RangeError(
190 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
191 )
192 )
193 expect(
194 () =>
195 new DynamicThreadPool(
196 0,
197 0,
198 './tests/worker-files/thread/testWorker.mjs'
199 )
200 ).toThrow(
201 new RangeError(
202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
203 )
204 )
205 expect(
206 () =>
207 new DynamicClusterPool(
208 1,
209 1,
210 './tests/worker-files/cluster/testWorker.cjs'
211 )
212 ).toThrow(
213 new RangeError(
214 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
215 )
216 )
217 })
218
219 it('Verify that pool options are checked', async () => {
220 let pool = new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.mjs'
223 )
224 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
225 expect(pool.emitter.eventNames()).toStrictEqual([])
226 expect(pool.opts).toStrictEqual({
227 enableEvents: true,
228 enableTasksQueue: false,
229 restartWorkerOnError: true,
230 startWorkers: true,
231 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
232 })
233 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
234 .workerChoiceStrategies) {
235 expect(workerChoiceStrategy.opts).toStrictEqual({
236 elu: { median: false },
237 runTime: { median: false },
238 waitTime: { median: false },
239 weights: expect.objectContaining({
240 0: expect.any(Number),
241 [pool.info.maxSize - 1]: expect.any(Number),
242 }),
243 })
244 }
245 await pool.destroy()
246 const testHandler = () => console.info('test handler executed')
247 pool = new FixedThreadPool(
248 numberOfWorkers,
249 './tests/worker-files/thread/testWorker.mjs',
250 {
251 enableEvents: false,
252 enableTasksQueue: true,
253 errorHandler: testHandler,
254 exitHandler: testHandler,
255 messageHandler: testHandler,
256 onlineHandler: testHandler,
257 restartWorkerOnError: false,
258 tasksQueueOptions: { concurrency: 2 },
259 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
260 workerChoiceStrategyOptions: {
261 runTime: { median: true },
262 weights: { 0: 300, 1: 200 },
263 },
264 }
265 )
266 expect(pool.emitter).toBeUndefined()
267 expect(pool.opts).toStrictEqual({
268 enableEvents: false,
269 enableTasksQueue: true,
270 errorHandler: testHandler,
271 exitHandler: testHandler,
272 messageHandler: testHandler,
273 onlineHandler: testHandler,
274 restartWorkerOnError: false,
275 startWorkers: true,
276 tasksQueueOptions: {
277 concurrency: 2,
278 size: Math.pow(numberOfWorkers, 2),
279 tasksFinishedTimeout: 2000,
280 tasksStealingOnBackPressure: true,
281 tasksStealingRatio: 0.6,
282 taskStealing: true,
283 },
284 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
285 workerChoiceStrategyOptions: {
286 runTime: { median: true },
287 weights: { 0: 300, 1: 200 },
288 },
289 })
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
293 elu: { median: false },
294 runTime: { median: true },
295 waitTime: { median: false },
296 weights: { 0: 300, 1: 200 },
297 })
298 }
299 await pool.destroy()
300 })
301
302 it('Verify that pool options are validated', () => {
303 expect(
304 () =>
305 new FixedThreadPool(
306 numberOfWorkers,
307 './tests/worker-files/thread/testWorker.mjs',
308 {
309 workerChoiceStrategy: 'invalidStrategy',
310 }
311 )
312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.mjs',
318 {
319 workerChoiceStrategyOptions: { weights: {} },
320 }
321 )
322 ).toThrow(
323 new Error(
324 'Invalid worker choice strategy options: must have a weight for each worker node'
325 )
326 )
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
331 './tests/worker-files/thread/testWorker.mjs',
332 {
333 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
334 }
335 )
336 ).toThrow(
337 new Error(
338 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
339 )
340 )
341 expect(
342 () =>
343 new FixedThreadPool(
344 numberOfWorkers,
345 './tests/worker-files/thread/testWorker.mjs',
346 {
347 enableTasksQueue: true,
348 tasksQueueOptions: 'invalidTasksQueueOptions',
349 }
350 )
351 ).toThrow(
352 new TypeError('Invalid tasks queue options: must be a plain object')
353 )
354 expect(
355 () =>
356 new FixedThreadPool(
357 numberOfWorkers,
358 './tests/worker-files/thread/testWorker.mjs',
359 {
360 enableTasksQueue: true,
361 tasksQueueOptions: { concurrency: 0 },
362 }
363 )
364 ).toThrow(
365 new RangeError(
366 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
367 )
368 )
369 expect(
370 () =>
371 new FixedThreadPool(
372 numberOfWorkers,
373 './tests/worker-files/thread/testWorker.mjs',
374 {
375 enableTasksQueue: true,
376 tasksQueueOptions: { concurrency: -1 },
377 }
378 )
379 ).toThrow(
380 new RangeError(
381 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
382 )
383 )
384 expect(
385 () =>
386 new FixedThreadPool(
387 numberOfWorkers,
388 './tests/worker-files/thread/testWorker.mjs',
389 {
390 enableTasksQueue: true,
391 tasksQueueOptions: { concurrency: 0.2 },
392 }
393 )
394 ).toThrow(
395 new TypeError('Invalid worker node tasks concurrency: must be an integer')
396 )
397 expect(
398 () =>
399 new FixedThreadPool(
400 numberOfWorkers,
401 './tests/worker-files/thread/testWorker.mjs',
402 {
403 enableTasksQueue: true,
404 tasksQueueOptions: { size: 0 },
405 }
406 )
407 ).toThrow(
408 new RangeError(
409 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
410 )
411 )
412 expect(
413 () =>
414 new FixedThreadPool(
415 numberOfWorkers,
416 './tests/worker-files/thread/testWorker.mjs',
417 {
418 enableTasksQueue: true,
419 tasksQueueOptions: { size: -1 },
420 }
421 )
422 ).toThrow(
423 new RangeError(
424 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
425 )
426 )
427 expect(
428 () =>
429 new FixedThreadPool(
430 numberOfWorkers,
431 './tests/worker-files/thread/testWorker.mjs',
432 {
433 enableTasksQueue: true,
434 tasksQueueOptions: { size: 0.2 },
435 }
436 )
437 ).toThrow(
438 new TypeError('Invalid worker node tasks queue size: must be an integer')
439 )
440 expect(
441 () =>
442 new FixedThreadPool(
443 numberOfWorkers,
444 './tests/worker-files/thread/testWorker.mjs',
445 {
446 enableTasksQueue: true,
447 tasksQueueOptions: { tasksStealingRatio: '' },
448 }
449 )
450 ).toThrow(
451 new TypeError(
452 'Invalid worker node tasks stealing ratio: must be a number'
453 )
454 )
455 expect(
456 () =>
457 new FixedThreadPool(
458 numberOfWorkers,
459 './tests/worker-files/thread/testWorker.mjs',
460 {
461 enableTasksQueue: true,
462 tasksQueueOptions: { tasksStealingRatio: 1.1 },
463 }
464 )
465 ).toThrow(
466 new RangeError(
467 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
468 )
469 )
470 })
471
472 it('Verify that pool worker choice strategy options can be set', async () => {
473 const pool = new FixedThreadPool(
474 numberOfWorkers,
475 './tests/worker-files/thread/testWorker.mjs',
476 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
477 )
478 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
479 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
480 .workerChoiceStrategies) {
481 expect(workerChoiceStrategy.opts).toStrictEqual({
482 elu: { median: false },
483 runTime: { median: false },
484 waitTime: { median: false },
485 weights: expect.objectContaining({
486 0: expect.any(Number),
487 [pool.info.maxSize - 1]: expect.any(Number),
488 }),
489 })
490 }
491 expect(
492 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
493 ).toStrictEqual({
494 elu: {
495 aggregate: true,
496 average: true,
497 median: false,
498 },
499 runTime: {
500 aggregate: true,
501 average: true,
502 median: false,
503 },
504 waitTime: {
505 aggregate: true,
506 average: true,
507 median: false,
508 },
509 })
510 pool.setWorkerChoiceStrategyOptions({
511 elu: { median: true },
512 runTime: { median: true },
513 })
514 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
515 elu: { median: true },
516 runTime: { median: true },
517 })
518 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
519 .workerChoiceStrategies) {
520 expect(workerChoiceStrategy.opts).toStrictEqual({
521 elu: { median: true },
522 runTime: { median: true },
523 waitTime: { median: false },
524 weights: expect.objectContaining({
525 0: expect.any(Number),
526 [pool.info.maxSize - 1]: expect.any(Number),
527 }),
528 })
529 }
530 expect(
531 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
532 ).toStrictEqual({
533 elu: {
534 aggregate: true,
535 average: false,
536 median: true,
537 },
538 runTime: {
539 aggregate: true,
540 average: false,
541 median: true,
542 },
543 waitTime: {
544 aggregate: true,
545 average: true,
546 median: false,
547 },
548 })
549 pool.setWorkerChoiceStrategyOptions({
550 elu: { median: false },
551 runTime: { median: false },
552 })
553 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
554 elu: { median: false },
555 runTime: { median: false },
556 })
557 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
558 .workerChoiceStrategies) {
559 expect(workerChoiceStrategy.opts).toStrictEqual({
560 elu: { median: false },
561 runTime: { median: false },
562 waitTime: { median: false },
563 weights: expect.objectContaining({
564 0: expect.any(Number),
565 [pool.info.maxSize - 1]: expect.any(Number),
566 }),
567 })
568 }
569 expect(
570 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
571 ).toStrictEqual({
572 elu: {
573 aggregate: true,
574 average: true,
575 median: false,
576 },
577 runTime: {
578 aggregate: true,
579 average: true,
580 median: false,
581 },
582 waitTime: {
583 aggregate: true,
584 average: true,
585 median: false,
586 },
587 })
588 expect(() =>
589 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
590 ).toThrow(
591 new TypeError(
592 'Invalid worker choice strategy options: must be a plain object'
593 )
594 )
595 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
596 new Error(
597 'Invalid worker choice strategy options: must have a weight for each worker node'
598 )
599 )
600 expect(() =>
601 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
602 ).toThrow(
603 new Error(
604 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
605 )
606 )
607 await pool.destroy()
608 })
609
610 it('Verify that pool tasks queue can be enabled/disabled', async () => {
611 const pool = new FixedThreadPool(
612 numberOfWorkers,
613 './tests/worker-files/thread/testWorker.mjs'
614 )
615 expect(pool.opts.enableTasksQueue).toBe(false)
616 expect(pool.opts.tasksQueueOptions).toBeUndefined()
617 pool.enableTasksQueue(true)
618 expect(pool.opts.enableTasksQueue).toBe(true)
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 concurrency: 1,
621 size: Math.pow(numberOfWorkers, 2),
622 tasksFinishedTimeout: 2000,
623 tasksStealingOnBackPressure: true,
624 tasksStealingRatio: 0.6,
625 taskStealing: true,
626 })
627 pool.enableTasksQueue(true, { concurrency: 2 })
628 expect(pool.opts.enableTasksQueue).toBe(true)
629 expect(pool.opts.tasksQueueOptions).toStrictEqual({
630 concurrency: 2,
631 size: Math.pow(numberOfWorkers, 2),
632 tasksFinishedTimeout: 2000,
633 tasksStealingOnBackPressure: true,
634 tasksStealingRatio: 0.6,
635 taskStealing: true,
636 })
637 pool.enableTasksQueue(false)
638 expect(pool.opts.enableTasksQueue).toBe(false)
639 expect(pool.opts.tasksQueueOptions).toBeUndefined()
640 await pool.destroy()
641 })
642
643 it('Verify that pool tasks queue options can be set', async () => {
644 const pool = new FixedThreadPool(
645 numberOfWorkers,
646 './tests/worker-files/thread/testWorker.mjs',
647 { enableTasksQueue: true }
648 )
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 concurrency: 1,
651 size: Math.pow(numberOfWorkers, 2),
652 tasksFinishedTimeout: 2000,
653 tasksStealingOnBackPressure: true,
654 tasksStealingRatio: 0.6,
655 taskStealing: true,
656 })
657 for (const workerNode of pool.workerNodes) {
658 expect(workerNode.tasksQueueBackPressureSize).toBe(
659 pool.opts.tasksQueueOptions.size
660 )
661 }
662 pool.setTasksQueueOptions({
663 concurrency: 2,
664 size: 2,
665 tasksFinishedTimeout: 3000,
666 tasksStealingOnBackPressure: false,
667 tasksStealingRatio: 0.5,
668 taskStealing: false,
669 })
670 expect(pool.opts.tasksQueueOptions).toStrictEqual({
671 concurrency: 2,
672 size: 2,
673 tasksFinishedTimeout: 3000,
674 tasksStealingOnBackPressure: false,
675 tasksStealingRatio: 0.5,
676 taskStealing: false,
677 })
678 for (const workerNode of pool.workerNodes) {
679 expect(workerNode.tasksQueueBackPressureSize).toBe(
680 pool.opts.tasksQueueOptions.size
681 )
682 }
683 pool.setTasksQueueOptions({
684 concurrency: 1,
685 tasksStealingOnBackPressure: true,
686 taskStealing: true,
687 })
688 expect(pool.opts.tasksQueueOptions).toStrictEqual({
689 concurrency: 1,
690 size: 2,
691 tasksFinishedTimeout: 3000,
692 tasksStealingOnBackPressure: true,
693 tasksStealingRatio: 0.5,
694 taskStealing: true,
695 })
696 for (const workerNode of pool.workerNodes) {
697 expect(workerNode.tasksQueueBackPressureSize).toBe(
698 pool.opts.tasksQueueOptions.size
699 )
700 }
701 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
702 new TypeError('Invalid tasks queue options: must be a plain object')
703 )
704 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
705 new RangeError(
706 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
707 )
708 )
709 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
710 new RangeError(
711 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
712 )
713 )
714 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
715 new TypeError('Invalid worker node tasks concurrency: must be an integer')
716 )
717 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
718 new RangeError(
719 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
720 )
721 )
722 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
723 new RangeError(
724 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
725 )
726 )
727 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
728 new TypeError('Invalid worker node tasks queue size: must be an integer')
729 )
730 expect(() => pool.setTasksQueueOptions({ tasksStealingRatio: '' })).toThrow(
731 new TypeError(
732 'Invalid worker node tasks stealing ratio: must be a number'
733 )
734 )
735 expect(() =>
736 pool.setTasksQueueOptions({ tasksStealingRatio: 1.1 })
737 ).toThrow(
738 new RangeError(
739 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
740 )
741 )
742 await pool.destroy()
743 })
744
745 it('Verify that pool info is set', async () => {
746 let pool = new FixedThreadPool(
747 numberOfWorkers,
748 './tests/worker-files/thread/testWorker.mjs'
749 )
750 expect(pool.info).toStrictEqual({
751 busyWorkerNodes: 0,
752 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
753 executedTasks: 0,
754 executingTasks: 0,
755 failedTasks: 0,
756 idleWorkerNodes: numberOfWorkers,
757 maxSize: numberOfWorkers,
758 minSize: numberOfWorkers,
759 ready: true,
760 started: true,
761 strategyRetries: 0,
762 type: PoolTypes.fixed,
763 version,
764 worker: WorkerTypes.thread,
765 workerNodes: numberOfWorkers,
766 })
767 await pool.destroy()
768 pool = new DynamicClusterPool(
769 Math.floor(numberOfWorkers / 2),
770 numberOfWorkers,
771 './tests/worker-files/cluster/testWorker.cjs'
772 )
773 expect(pool.info).toStrictEqual({
774 busyWorkerNodes: 0,
775 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
776 dynamicWorkerNodes: 0,
777 executedTasks: 0,
778 executingTasks: 0,
779 failedTasks: 0,
780 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
781 maxSize: numberOfWorkers,
782 minSize: Math.floor(numberOfWorkers / 2),
783 ready: true,
784 started: true,
785 strategyRetries: 0,
786 type: PoolTypes.dynamic,
787 version,
788 worker: WorkerTypes.cluster,
789 workerNodes: Math.floor(numberOfWorkers / 2),
790 })
791 await pool.destroy()
792 })
793
794 it('Verify that pool worker tasks usage are initialized', async () => {
795 const pool = new FixedClusterPool(
796 numberOfWorkers,
797 './tests/worker-files/cluster/testWorker.cjs'
798 )
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode).toBeInstanceOf(WorkerNode)
801 expect(workerNode.usage).toStrictEqual({
802 elu: {
803 active: {
804 history: expect.any(CircularBuffer),
805 },
806 idle: {
807 history: expect.any(CircularBuffer),
808 },
809 },
810 runTime: {
811 history: expect.any(CircularBuffer),
812 },
813 tasks: {
814 executed: 0,
815 executing: 0,
816 failed: 0,
817 maxQueued: 0,
818 queued: 0,
819 sequentiallyStolen: 0,
820 stolen: 0,
821 },
822 waitTime: {
823 history: expect.any(CircularBuffer),
824 },
825 })
826 }
827 await pool.destroy()
828 })
829
830 it('Verify that pool worker tasks queue are initialized', async () => {
831 let pool = new FixedClusterPool(
832 numberOfWorkers,
833 './tests/worker-files/cluster/testWorker.cjs'
834 )
835 for (const workerNode of pool.workerNodes) {
836 expect(workerNode).toBeInstanceOf(WorkerNode)
837 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
838 expect(workerNode.tasksQueue.size).toBe(0)
839 expect(workerNode.tasksQueue.maxSize).toBe(0)
840 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
841 expect(workerNode.tasksQueue.enablePriority).toBe(false)
842 }
843 await pool.destroy()
844 pool = new DynamicThreadPool(
845 Math.floor(numberOfWorkers / 2),
846 numberOfWorkers,
847 './tests/worker-files/thread/testWorker.mjs'
848 )
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
854 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
855 expect(workerNode.tasksQueue.enablePriority).toBe(false)
856 }
857 await pool.destroy()
858 })
859
860 it('Verify that pool worker info are initialized', async () => {
861 let pool = new FixedClusterPool(
862 numberOfWorkers,
863 './tests/worker-files/cluster/testWorker.cjs'
864 )
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.info).toStrictEqual({
868 backPressure: false,
869 backPressureStealing: false,
870 continuousStealing: false,
871 dynamic: false,
872 id: expect.any(Number),
873 ready: true,
874 stealing: false,
875 stolen: false,
876 type: WorkerTypes.cluster,
877 })
878 }
879 await pool.destroy()
880 pool = new DynamicThreadPool(
881 Math.floor(numberOfWorkers / 2),
882 numberOfWorkers,
883 './tests/worker-files/thread/testWorker.mjs'
884 )
885 for (const workerNode of pool.workerNodes) {
886 expect(workerNode).toBeInstanceOf(WorkerNode)
887 expect(workerNode.info).toStrictEqual({
888 backPressure: false,
889 backPressureStealing: false,
890 continuousStealing: false,
891 dynamic: false,
892 id: expect.any(Number),
893 ready: true,
894 stealing: false,
895 stolen: false,
896 type: WorkerTypes.thread,
897 })
898 }
899 await pool.destroy()
900 })
901
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
904 numberOfWorkers,
905 './tests/worker-files/thread/testWorker.mjs'
906 )
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
911 )
912 await pool.destroy()
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
917 )
918 })
919
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
922 numberOfWorkers,
923 './tests/worker-files/cluster/testWorker.cjs',
924 {
925 startWorkers: false,
926 }
927 )
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
930 expect(pool.workerNodes).toStrictEqual([])
931 expect(pool.readyEventEmitted).toBe(false)
932 expect(pool.busyEventEmitted).toBe(false)
933 expect(pool.backPressureEventEmitted).toBe(false)
934 pool.start()
935 expect(pool.info.started).toBe(true)
936 expect(pool.info.ready).toBe(true)
937 await waitPoolEvents(pool, PoolEvents.ready, 1)
938 expect(pool.readyEventEmitted).toBe(true)
939 expect(pool.busyEventEmitted).toBe(false)
940 expect(pool.backPressureEventEmitted).toBe(false)
941 expect(pool.workerNodes.length).toBe(numberOfWorkers)
942 for (const workerNode of pool.workerNodes) {
943 expect(workerNode).toBeInstanceOf(WorkerNode)
944 }
945 await pool.destroy()
946 })
947
948 it('Verify that pool execute() arguments are checked', async () => {
949 const pool = new FixedClusterPool(
950 numberOfWorkers,
951 './tests/worker-files/cluster/testWorker.cjs'
952 )
953 await expect(pool.execute(undefined, 0)).rejects.toThrow(
954 new TypeError('name argument must be a string')
955 )
956 await expect(pool.execute(undefined, '')).rejects.toThrow(
957 new TypeError('name argument must not be an empty string')
958 )
959 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
960 new TypeError('transferList argument must be an array')
961 )
962 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
963 "Task function 'unknown' not found"
964 )
965 await pool.destroy()
966 await expect(pool.execute()).rejects.toThrow(
967 new Error('Cannot execute a task on not started pool')
968 )
969 })
970
971 it('Verify that pool worker tasks usage are computed', async () => {
972 const pool = new FixedClusterPool(
973 numberOfWorkers,
974 './tests/worker-files/cluster/testWorker.cjs'
975 )
976 const promises = new Set()
977 const maxMultiplier = 2
978 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
979 promises.add(pool.execute())
980 }
981 for (const workerNode of pool.workerNodes) {
982 expect(workerNode.usage).toStrictEqual({
983 elu: {
984 active: {
985 history: expect.any(CircularBuffer),
986 },
987 idle: {
988 history: expect.any(CircularBuffer),
989 },
990 },
991 runTime: {
992 history: expect.any(CircularBuffer),
993 },
994 tasks: {
995 executed: 0,
996 executing: maxMultiplier,
997 failed: 0,
998 maxQueued: 0,
999 queued: 0,
1000 sequentiallyStolen: 0,
1001 stolen: 0,
1002 },
1003 waitTime: {
1004 history: expect.any(CircularBuffer),
1005 },
1006 })
1007 }
1008 await Promise.all(promises)
1009 for (const workerNode of pool.workerNodes) {
1010 expect(workerNode.usage).toStrictEqual({
1011 elu: {
1012 active: {
1013 history: expect.any(CircularBuffer),
1014 },
1015 idle: {
1016 history: expect.any(CircularBuffer),
1017 },
1018 },
1019 runTime: {
1020 history: expect.any(CircularBuffer),
1021 },
1022 tasks: {
1023 executed: maxMultiplier,
1024 executing: 0,
1025 failed: 0,
1026 maxQueued: 0,
1027 queued: 0,
1028 sequentiallyStolen: 0,
1029 stolen: 0,
1030 },
1031 waitTime: {
1032 history: expect.any(CircularBuffer),
1033 },
1034 })
1035 }
1036 await pool.destroy()
1037 })
1038
1039 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
1040 const pool = new DynamicThreadPool(
1041 Math.floor(numberOfWorkers / 2),
1042 numberOfWorkers,
1043 './tests/worker-files/thread/testWorker.mjs'
1044 )
1045 const promises = new Set()
1046 const maxMultiplier = 2
1047 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1048 promises.add(pool.execute())
1049 }
1050 await Promise.all(promises)
1051 for (const workerNode of pool.workerNodes) {
1052 expect(workerNode.usage).toStrictEqual({
1053 elu: {
1054 active: {
1055 history: expect.any(CircularBuffer),
1056 },
1057 idle: {
1058 history: expect.any(CircularBuffer),
1059 },
1060 },
1061 runTime: {
1062 history: expect.any(CircularBuffer),
1063 },
1064 tasks: {
1065 executed: expect.any(Number),
1066 executing: 0,
1067 failed: 0,
1068 maxQueued: 0,
1069 queued: 0,
1070 sequentiallyStolen: 0,
1071 stolen: 0,
1072 },
1073 waitTime: {
1074 history: expect.any(CircularBuffer),
1075 },
1076 })
1077 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1078 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1079 numberOfWorkers * maxMultiplier
1080 )
1081 }
1082 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1083 for (const workerNode of pool.workerNodes) {
1084 expect(workerNode.usage).toStrictEqual({
1085 elu: {
1086 active: {
1087 history: expect.any(CircularBuffer),
1088 },
1089 idle: {
1090 history: expect.any(CircularBuffer),
1091 },
1092 },
1093 runTime: {
1094 history: expect.any(CircularBuffer),
1095 },
1096 tasks: {
1097 executed: expect.any(Number),
1098 executing: 0,
1099 failed: 0,
1100 maxQueued: 0,
1101 queued: 0,
1102 sequentiallyStolen: 0,
1103 stolen: 0,
1104 },
1105 waitTime: {
1106 history: expect.any(CircularBuffer),
1107 },
1108 })
1109 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1110 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1111 numberOfWorkers * maxMultiplier
1112 )
1113 }
1114 await pool.destroy()
1115 })
1116
1117 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1118 const pool = new DynamicClusterPool(
1119 Math.floor(numberOfWorkers / 2),
1120 numberOfWorkers,
1121 './tests/worker-files/cluster/testWorker.cjs'
1122 )
1123 expect(pool.emitter.eventNames()).toStrictEqual([])
1124 let poolInfo
1125 let poolReady = 0
1126 pool.emitter.on(PoolEvents.ready, info => {
1127 ++poolReady
1128 poolInfo = info
1129 })
1130 await waitPoolEvents(pool, PoolEvents.ready, 1)
1131 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1132 expect(poolReady).toBe(1)
1133 expect(poolInfo).toStrictEqual({
1134 busyWorkerNodes: 0,
1135 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 dynamicWorkerNodes: 0,
1137 executedTasks: 0,
1138 executingTasks: 0,
1139 failedTasks: 0,
1140 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
1141 maxSize: numberOfWorkers,
1142 minSize: Math.floor(numberOfWorkers / 2),
1143 ready: true,
1144 started: true,
1145 strategyRetries: expect.any(Number),
1146 type: PoolTypes.dynamic,
1147 version,
1148 worker: WorkerTypes.cluster,
1149 workerNodes: Math.floor(numberOfWorkers / 2),
1150 })
1151 await pool.destroy()
1152 })
1153
1154 it("Verify that pool event emitter 'busy' and 'busyEnd' events can register a callback", async () => {
1155 const pool = new FixedThreadPool(
1156 numberOfWorkers,
1157 './tests/worker-files/thread/testWorker.mjs'
1158 )
1159 expect(pool.emitter.eventNames()).toStrictEqual([])
1160 const promises = new Set()
1161 let poolBusy = 0
1162 let poolBusyInfo
1163 pool.emitter.on(PoolEvents.busy, info => {
1164 ++poolBusy
1165 poolBusyInfo = info
1166 })
1167 let poolBusyEnd = 0
1168 let poolBusyEndInfo
1169 pool.emitter.on(PoolEvents.busyEnd, info => {
1170 ++poolBusyEnd
1171 poolBusyEndInfo = info
1172 })
1173 expect(pool.emitter.eventNames()).toStrictEqual([
1174 PoolEvents.busy,
1175 PoolEvents.busyEnd,
1176 ])
1177 for (let i = 0; i < numberOfWorkers * 2; i++) {
1178 promises.add(pool.execute())
1179 }
1180 await Promise.all(promises)
1181 expect(poolBusy).toBe(1)
1182 expect(poolBusyInfo).toStrictEqual({
1183 busyWorkerNodes: numberOfWorkers,
1184 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1185 executedTasks: expect.any(Number),
1186 executingTasks: expect.any(Number),
1187 failedTasks: expect.any(Number),
1188 idleWorkerNodes: 0,
1189 maxSize: numberOfWorkers,
1190 minSize: numberOfWorkers,
1191 ready: true,
1192 started: true,
1193 strategyRetries: expect.any(Number),
1194 type: PoolTypes.fixed,
1195 version,
1196 worker: WorkerTypes.thread,
1197 workerNodes: numberOfWorkers,
1198 })
1199 expect(poolBusyEnd).toBe(1)
1200 expect(poolBusyEndInfo).toStrictEqual({
1201 busyWorkerNodes: expect.any(Number),
1202 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1203 executedTasks: expect.any(Number),
1204 executingTasks: expect.any(Number),
1205 failedTasks: expect.any(Number),
1206 idleWorkerNodes: expect.any(Number),
1207 maxSize: numberOfWorkers,
1208 minSize: numberOfWorkers,
1209 ready: true,
1210 started: true,
1211 strategyRetries: expect.any(Number),
1212 type: PoolTypes.fixed,
1213 version,
1214 worker: WorkerTypes.thread,
1215 workerNodes: numberOfWorkers,
1216 })
1217 expect(poolBusyEndInfo.busyWorkerNodes).toBeLessThan(numberOfWorkers)
1218 await pool.destroy()
1219 })
1220
1221 it("Verify that pool event emitter 'full' and 'fullEnd' events can register a callback", async () => {
1222 const pool = new DynamicClusterPool(
1223 Math.floor(numberOfWorkers / 2),
1224 numberOfWorkers,
1225 './tests/worker-files/cluster/testWorker.cjs'
1226 )
1227 expect(pool.emitter.eventNames()).toStrictEqual([])
1228 const promises = new Set()
1229 let poolFull = 0
1230 let poolFullInfo
1231 pool.emitter.on(PoolEvents.full, info => {
1232 ++poolFull
1233 poolFullInfo = info
1234 })
1235 let poolFullEnd = 0
1236 let poolFullEndInfo
1237 pool.emitter.on(PoolEvents.fullEnd, info => {
1238 ++poolFullEnd
1239 poolFullEndInfo = info
1240 })
1241 expect(pool.emitter.eventNames()).toStrictEqual([
1242 PoolEvents.full,
1243 PoolEvents.fullEnd,
1244 ])
1245 for (let i = 0; i < numberOfWorkers * 2; i++) {
1246 promises.add(pool.execute())
1247 }
1248 await Promise.all(promises)
1249 expect(poolFull).toBe(1)
1250 expect(poolFullInfo).toStrictEqual({
1251 busyWorkerNodes: expect.any(Number),
1252 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1253 dynamicWorkerNodes: Math.floor(numberOfWorkers / 2),
1254 executedTasks: expect.any(Number),
1255 executingTasks: expect.any(Number),
1256 failedTasks: expect.any(Number),
1257 idleWorkerNodes: expect.any(Number),
1258 maxSize: numberOfWorkers,
1259 minSize: Math.floor(numberOfWorkers / 2),
1260 ready: true,
1261 started: true,
1262 strategyRetries: expect.any(Number),
1263 type: PoolTypes.dynamic,
1264 version,
1265 worker: WorkerTypes.cluster,
1266 workerNodes: numberOfWorkers,
1267 })
1268 await waitPoolEvents(pool, PoolEvents.fullEnd, 1)
1269 expect(poolFullEnd).toBe(1)
1270 expect(poolFullEndInfo).toStrictEqual({
1271 busyWorkerNodes: expect.any(Number),
1272 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1273 dynamicWorkerNodes: 0,
1274 executedTasks: expect.any(Number),
1275 executingTasks: expect.any(Number),
1276 failedTasks: expect.any(Number),
1277 idleWorkerNodes: expect.any(Number),
1278 maxSize: numberOfWorkers,
1279 minSize: Math.floor(numberOfWorkers / 2),
1280 ready: true,
1281 started: true,
1282 strategyRetries: expect.any(Number),
1283 type: PoolTypes.dynamic,
1284 version,
1285 worker: WorkerTypes.cluster,
1286 workerNodes: Math.floor(numberOfWorkers / 2),
1287 })
1288 await pool.destroy()
1289 })
1290
1291 it("Verify that pool event emitter 'backPressure' and 'backPressureEnd' events can register a callback", async () => {
1292 const pool = new FixedThreadPool(
1293 numberOfWorkers,
1294 './tests/worker-files/thread/testWorker.mjs',
1295 {
1296 enableTasksQueue: true,
1297 }
1298 )
1299 expect(pool.emitter.eventNames()).toStrictEqual([])
1300 const promises = new Set()
1301 let poolBackPressure = 0
1302 let poolBackPressureInfo
1303 pool.emitter.on(PoolEvents.backPressure, info => {
1304 ++poolBackPressure
1305 poolBackPressureInfo = info
1306 })
1307 let poolBackPressureEnd = 0
1308 let poolBackPressureEndInfo
1309 pool.emitter.on(PoolEvents.backPressureEnd, info => {
1310 ++poolBackPressureEnd
1311 poolBackPressureEndInfo = info
1312 })
1313 expect(pool.emitter.eventNames()).toStrictEqual([
1314 PoolEvents.backPressure,
1315 PoolEvents.backPressureEnd,
1316 ])
1317 for (let i = 0; i < numberOfWorkers * 10; i++) {
1318 promises.add(pool.execute())
1319 }
1320 await Promise.all(promises)
1321 expect(poolBackPressure).toBe(1)
1322 expect(poolBackPressureInfo).toStrictEqual({
1323 backPressure: true,
1324 backPressureWorkerNodes: numberOfWorkers,
1325 busyWorkerNodes: expect.any(Number),
1326 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1327 executedTasks: expect.any(Number),
1328 executingTasks: expect.any(Number),
1329 failedTasks: expect.any(Number),
1330 idleWorkerNodes: expect.any(Number),
1331 maxQueuedTasks: expect.any(Number),
1332 maxSize: numberOfWorkers,
1333 minSize: numberOfWorkers,
1334 queuedTasks: expect.any(Number),
1335 ready: true,
1336 started: true,
1337 stealingWorkerNodes: expect.any(Number),
1338 stolenTasks: expect.any(Number),
1339 strategyRetries: expect.any(Number),
1340 type: PoolTypes.fixed,
1341 version,
1342 worker: WorkerTypes.thread,
1343 workerNodes: numberOfWorkers,
1344 })
1345 expect(poolBackPressureEnd).toBe(1)
1346 expect(poolBackPressureEndInfo).toStrictEqual({
1347 backPressure: false,
1348 backPressureWorkerNodes: expect.any(Number),
1349 busyWorkerNodes: expect.any(Number),
1350 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1351 executedTasks: expect.any(Number),
1352 executingTasks: expect.any(Number),
1353 failedTasks: expect.any(Number),
1354 idleWorkerNodes: expect.any(Number),
1355 maxQueuedTasks: expect.any(Number),
1356 maxSize: numberOfWorkers,
1357 minSize: numberOfWorkers,
1358 queuedTasks: expect.any(Number),
1359 ready: true,
1360 started: true,
1361 stealingWorkerNodes: expect.any(Number),
1362 stolenTasks: expect.any(Number),
1363 strategyRetries: expect.any(Number),
1364 type: PoolTypes.fixed,
1365 version,
1366 worker: WorkerTypes.thread,
1367 workerNodes: numberOfWorkers,
1368 })
1369 expect(poolBackPressureEndInfo.backPressureWorkerNodes).toBeLessThan(
1370 numberOfWorkers
1371 )
1372 await pool.destroy()
1373 })
1374
1375 it("Verify that pool event emitter 'empty' event can register a callback", async () => {
1376 const pool = new DynamicClusterPool(
1377 0,
1378 numberOfWorkers,
1379 './tests/worker-files/cluster/testWorker.cjs'
1380 )
1381 expect(pool.emitter.eventNames()).toStrictEqual([])
1382 const promises = new Set()
1383 let poolEmpty = 0
1384 let poolInfo
1385 pool.emitter.on(PoolEvents.empty, info => {
1386 ++poolEmpty
1387 poolInfo = info
1388 })
1389 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.empty])
1390 for (let i = 0; i < numberOfWorkers; i++) {
1391 promises.add(pool.execute())
1392 }
1393 await Promise.all(promises)
1394 await waitPoolEvents(pool, PoolEvents.empty, 1)
1395 expect(poolEmpty).toBe(1)
1396 expect(poolInfo).toStrictEqual({
1397 busyWorkerNodes: 0,
1398 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1399 dynamicWorkerNodes: 0,
1400 executedTasks: expect.any(Number),
1401 executingTasks: expect.any(Number),
1402 failedTasks: expect.any(Number),
1403 idleWorkerNodes: 0,
1404 maxSize: numberOfWorkers,
1405 minSize: 0,
1406 ready: true,
1407 started: true,
1408 strategyRetries: expect.any(Number),
1409 type: PoolTypes.dynamic,
1410 version,
1411 worker: WorkerTypes.cluster,
1412 workerNodes: 0,
1413 })
1414 await pool.destroy()
1415 })
1416
1417 it('Verify that destroy() waits for queued tasks to finish', async () => {
1418 const tasksFinishedTimeout = 2500
1419 const pool = new FixedThreadPool(
1420 numberOfWorkers,
1421 './tests/worker-files/thread/asyncWorker.mjs',
1422 {
1423 enableTasksQueue: true,
1424 tasksQueueOptions: { tasksFinishedTimeout },
1425 }
1426 )
1427 const maxMultiplier = 4
1428 let tasksFinished = 0
1429 for (const workerNode of pool.workerNodes) {
1430 workerNode.on('taskFinished', () => {
1431 ++tasksFinished
1432 })
1433 }
1434 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1435 pool.execute()
1436 }
1437 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1438 const startTime = performance.now()
1439 await pool.destroy()
1440 const elapsedTime = performance.now() - startTime
1441 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1442 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1443 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1444 })
1445
1446 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1447 const tasksFinishedTimeout = 1000
1448 const pool = new FixedThreadPool(
1449 numberOfWorkers,
1450 './tests/worker-files/thread/asyncWorker.mjs',
1451 {
1452 enableTasksQueue: true,
1453 tasksQueueOptions: { tasksFinishedTimeout },
1454 }
1455 )
1456 const maxMultiplier = 4
1457 let tasksFinished = 0
1458 for (const workerNode of pool.workerNodes) {
1459 workerNode.on('taskFinished', () => {
1460 ++tasksFinished
1461 })
1462 }
1463 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1464 pool.execute()
1465 }
1466 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1467 const startTime = performance.now()
1468 await pool.destroy()
1469 const elapsedTime = performance.now() - startTime
1470 expect(tasksFinished).toBe(0)
1471 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1472 })
1473
1474 it('Verify that pool asynchronous resource track tasks execution', async () => {
1475 let taskAsyncId
1476 let initCalls = 0
1477 let beforeCalls = 0
1478 let afterCalls = 0
1479 let resolveCalls = 0
1480 const hook = createHook({
1481 after (asyncId) {
1482 if (asyncId === taskAsyncId) afterCalls++
1483 },
1484 before (asyncId) {
1485 if (asyncId === taskAsyncId) beforeCalls++
1486 },
1487 init (asyncId, type) {
1488 if (type === 'poolifier:task') {
1489 initCalls++
1490 taskAsyncId = asyncId
1491 }
1492 },
1493 promiseResolve () {
1494 if (executionAsyncId() === taskAsyncId) resolveCalls++
1495 },
1496 })
1497 const pool = new FixedThreadPool(
1498 numberOfWorkers,
1499 './tests/worker-files/thread/testWorker.mjs'
1500 )
1501 hook.enable()
1502 await pool.execute()
1503 hook.disable()
1504 expect(initCalls).toBe(1)
1505 expect(beforeCalls).toBe(1)
1506 expect(afterCalls).toBe(1)
1507 expect(resolveCalls).toBe(1)
1508 await pool.destroy()
1509 })
1510
1511 it('Verify that hasTaskFunction() is working', async () => {
1512 const dynamicThreadPool = new DynamicThreadPool(
1513 Math.floor(numberOfWorkers / 2),
1514 numberOfWorkers,
1515 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1516 )
1517 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1518 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1519 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1520 true
1521 )
1522 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1523 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1524 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1525 await dynamicThreadPool.destroy()
1526 const fixedClusterPool = new FixedClusterPool(
1527 numberOfWorkers,
1528 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1529 )
1530 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1531 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1532 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1533 true
1534 )
1535 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1536 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1537 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1538 await fixedClusterPool.destroy()
1539 })
1540
1541 it('Verify that addTaskFunction() is working', async () => {
1542 const dynamicThreadPool = new DynamicThreadPool(
1543 Math.floor(numberOfWorkers / 2),
1544 numberOfWorkers,
1545 './tests/worker-files/thread/testWorker.mjs'
1546 )
1547 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1548 await expect(
1549 dynamicThreadPool.addTaskFunction(0, () => {})
1550 ).rejects.toThrow(new TypeError('name argument must be a string'))
1551 await expect(
1552 dynamicThreadPool.addTaskFunction('', () => {})
1553 ).rejects.toThrow(
1554 new TypeError('name argument must not be an empty string')
1555 )
1556 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1557 new TypeError('taskFunction property must be a function')
1558 )
1559 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1560 new TypeError('taskFunction property must be a function')
1561 )
1562 await expect(
1563 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1564 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1565 await expect(
1566 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1567 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1568 await expect(
1569 dynamicThreadPool.addTaskFunction('test', {
1570 priority: -21,
1571 taskFunction: () => {},
1572 })
1573 ).rejects.toThrow(
1574 new RangeError("Property 'priority' must be between -20 and 19")
1575 )
1576 await expect(
1577 dynamicThreadPool.addTaskFunction('test', {
1578 priority: 20,
1579 taskFunction: () => {},
1580 })
1581 ).rejects.toThrow(
1582 new RangeError("Property 'priority' must be between -20 and 19")
1583 )
1584 await expect(
1585 dynamicThreadPool.addTaskFunction('test', {
1586 strategy: 'invalidStrategy',
1587 taskFunction: () => {},
1588 })
1589 ).rejects.toThrow(
1590 new Error("Invalid worker choice strategy 'invalidStrategy'")
1591 )
1592 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1593 { name: DEFAULT_TASK_NAME },
1594 { name: 'test' },
1595 ])
1596 expect([
1597 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1598 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1599 const echoTaskFunction = data => {
1600 return data
1601 }
1602 await expect(
1603 dynamicThreadPool.addTaskFunction('echo', {
1604 strategy: WorkerChoiceStrategies.LEAST_ELU,
1605 taskFunction: echoTaskFunction,
1606 })
1607 ).resolves.toBe(true)
1608 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1609 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1610 strategy: WorkerChoiceStrategies.LEAST_ELU,
1611 taskFunction: echoTaskFunction,
1612 })
1613 expect([
1614 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1615 ]).toStrictEqual([
1616 WorkerChoiceStrategies.ROUND_ROBIN,
1617 WorkerChoiceStrategies.LEAST_ELU,
1618 ])
1619 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1620 { name: DEFAULT_TASK_NAME },
1621 { name: 'test' },
1622 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1623 ])
1624 const taskFunctionData = { test: 'test' }
1625 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1626 expect(echoResult).toStrictEqual(taskFunctionData)
1627 for (const workerNode of dynamicThreadPool.workerNodes) {
1628 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1629 elu: expect.objectContaining({
1630 active: expect.objectContaining({
1631 history: expect.any(CircularBuffer),
1632 }),
1633 idle: expect.objectContaining({
1634 history: expect.any(CircularBuffer),
1635 }),
1636 }),
1637 runTime: {
1638 history: expect.any(CircularBuffer),
1639 },
1640 tasks: {
1641 executed: expect.any(Number),
1642 executing: 0,
1643 failed: 0,
1644 queued: 0,
1645 sequentiallyStolen: 0,
1646 stolen: 0,
1647 },
1648 waitTime: {
1649 history: expect.any(CircularBuffer),
1650 },
1651 })
1652 expect(
1653 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1654 ).toBeGreaterThan(0)
1655 if (
1656 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1657 null
1658 ) {
1659 expect(
1660 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1661 ).toBeUndefined()
1662 } else {
1663 expect(
1664 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1665 ).toBeGreaterThan(0)
1666 }
1667 if (
1668 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1669 ) {
1670 expect(
1671 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1672 ).toBeUndefined()
1673 } else {
1674 expect(
1675 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1676 ).toBeGreaterThanOrEqual(0)
1677 }
1678 if (
1679 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1680 ) {
1681 expect(
1682 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1683 ).toBeUndefined()
1684 } else {
1685 expect(
1686 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1687 ).toBeGreaterThanOrEqual(0)
1688 expect(
1689 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1690 ).toBeLessThanOrEqual(1)
1691 }
1692 }
1693 await dynamicThreadPool.destroy()
1694 })
1695
1696 it('Verify that removeTaskFunction() is working', async () => {
1697 const dynamicThreadPool = new DynamicThreadPool(
1698 Math.floor(numberOfWorkers / 2),
1699 numberOfWorkers,
1700 './tests/worker-files/thread/testWorker.mjs'
1701 )
1702 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1703 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1704 { name: DEFAULT_TASK_NAME },
1705 { name: 'test' },
1706 ])
1707 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1708 new Error('Cannot remove a task function not handled on the pool side')
1709 )
1710 const echoTaskFunction = data => {
1711 return data
1712 }
1713 await dynamicThreadPool.addTaskFunction('echo', {
1714 strategy: WorkerChoiceStrategies.LEAST_ELU,
1715 taskFunction: echoTaskFunction,
1716 })
1717 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1718 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1719 strategy: WorkerChoiceStrategies.LEAST_ELU,
1720 taskFunction: echoTaskFunction,
1721 })
1722 expect([
1723 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1724 ]).toStrictEqual([
1725 WorkerChoiceStrategies.ROUND_ROBIN,
1726 WorkerChoiceStrategies.LEAST_ELU,
1727 ])
1728 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1729 { name: DEFAULT_TASK_NAME },
1730 { name: 'test' },
1731 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1732 ])
1733 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1734 true
1735 )
1736 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1737 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1738 expect([
1739 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1740 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1741 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1742 { name: DEFAULT_TASK_NAME },
1743 { name: 'test' },
1744 ])
1745 await dynamicThreadPool.destroy()
1746 })
1747
1748 it('Verify that listTaskFunctionsProperties() is working', async () => {
1749 const dynamicThreadPool = new DynamicThreadPool(
1750 Math.floor(numberOfWorkers / 2),
1751 numberOfWorkers,
1752 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1753 )
1754 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1755 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1756 { name: DEFAULT_TASK_NAME },
1757 { name: 'factorial' },
1758 { name: 'fibonacci' },
1759 { name: 'jsonIntegerSerialization' },
1760 ])
1761 await dynamicThreadPool.destroy()
1762 const fixedClusterPool = new FixedClusterPool(
1763 numberOfWorkers,
1764 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1765 )
1766 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1767 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1768 { name: DEFAULT_TASK_NAME },
1769 { name: 'factorial' },
1770 { name: 'fibonacci' },
1771 { name: 'jsonIntegerSerialization' },
1772 ])
1773 await fixedClusterPool.destroy()
1774 })
1775
1776 it('Verify that setDefaultTaskFunction() is working', async () => {
1777 const dynamicThreadPool = new DynamicThreadPool(
1778 Math.floor(numberOfWorkers / 2),
1779 numberOfWorkers,
1780 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1781 )
1782 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1783 const workerId = dynamicThreadPool.workerNodes[0].info.id
1784 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1785 new Error(
1786 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1787 )
1788 )
1789 await expect(
1790 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1791 ).rejects.toThrow(
1792 new Error(
1793 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1794 )
1795 )
1796 await expect(
1797 dynamicThreadPool.setDefaultTaskFunction('unknown')
1798 ).rejects.toThrow(
1799 new Error(
1800 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1801 )
1802 )
1803 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1804 { name: DEFAULT_TASK_NAME },
1805 { name: 'factorial' },
1806 { name: 'fibonacci' },
1807 { name: 'jsonIntegerSerialization' },
1808 ])
1809 await expect(
1810 dynamicThreadPool.setDefaultTaskFunction('factorial')
1811 ).resolves.toBe(true)
1812 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1813 { name: DEFAULT_TASK_NAME },
1814 { name: 'factorial' },
1815 { name: 'fibonacci' },
1816 { name: 'jsonIntegerSerialization' },
1817 ])
1818 await expect(
1819 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1820 ).resolves.toBe(true)
1821 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1822 { name: DEFAULT_TASK_NAME },
1823 { name: 'fibonacci' },
1824 { name: 'factorial' },
1825 { name: 'jsonIntegerSerialization' },
1826 ])
1827 await dynamicThreadPool.destroy()
1828 })
1829
1830 it('Verify that multiple task functions worker is working', async () => {
1831 const pool = new DynamicClusterPool(
1832 Math.floor(numberOfWorkers / 2),
1833 numberOfWorkers,
1834 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1835 )
1836 const data = { n: 10 }
1837 const result0 = await pool.execute(data)
1838 expect(result0).toStrictEqual(3628800)
1839 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1840 expect(result1).toStrictEqual({ ok: 1 })
1841 const result2 = await pool.execute(data, 'factorial')
1842 expect(result2).toBe(3628800)
1843 const result3 = await pool.execute(data, 'fibonacci')
1844 expect(result3).toBe(55)
1845 expect(pool.info.executingTasks).toBe(0)
1846 expect(pool.info.executedTasks).toBe(4)
1847 for (const workerNode of pool.workerNodes) {
1848 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1849 { name: DEFAULT_TASK_NAME },
1850 { name: 'factorial' },
1851 { name: 'fibonacci' },
1852 { name: 'jsonIntegerSerialization' },
1853 ])
1854 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1855 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1856 expect(workerNode.tasksQueue.enablePriority).toBe(false)
1857 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1858 expect(
1859 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1860 ).toStrictEqual({
1861 elu: {
1862 active: {
1863 history: expect.any(CircularBuffer),
1864 },
1865 idle: {
1866 history: expect.any(CircularBuffer),
1867 },
1868 },
1869 runTime: {
1870 history: expect.any(CircularBuffer),
1871 },
1872 tasks: {
1873 executed: expect.any(Number),
1874 executing: 0,
1875 failed: 0,
1876 queued: 0,
1877 sequentiallyStolen: 0,
1878 stolen: 0,
1879 },
1880 waitTime: {
1881 history: expect.any(CircularBuffer),
1882 },
1883 })
1884 expect(
1885 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1886 .tasks.executed
1887 ).toBeGreaterThan(0)
1888 }
1889 expect(
1890 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1891 ).toStrictEqual(
1892 workerNode.getTaskFunctionWorkerUsage(
1893 workerNode.info.taskFunctionsProperties[1].name
1894 )
1895 )
1896 }
1897 await pool.destroy()
1898 })
1899
1900 it('Verify that mapExecute() is working', async () => {
1901 const pool = new DynamicThreadPool(
1902 Math.floor(numberOfWorkers / 2),
1903 numberOfWorkers,
1904 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1905 )
1906 await expect(pool.mapExecute()).rejects.toThrow(
1907 new TypeError('data argument must be a defined iterable')
1908 )
1909 await expect(pool.mapExecute(0)).rejects.toThrow(
1910 new TypeError('data argument must be an iterable')
1911 )
1912 await expect(pool.mapExecute([undefined], 0)).rejects.toThrow(
1913 new TypeError('name argument must be a string')
1914 )
1915 await expect(pool.mapExecute([undefined], '')).rejects.toThrow(
1916 new TypeError('name argument must not be an empty string')
1917 )
1918 await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow(
1919 new TypeError('transferList argument must be an array')
1920 )
1921 await expect(pool.mapExecute([undefined], 'unknown')).rejects.toBe(
1922 "Task function 'unknown' not found"
1923 )
1924 let results = await pool.mapExecute(
1925 [{}, {}, {}, {}],
1926 'jsonIntegerSerialization'
1927 )
1928 expect(results).toStrictEqual([{ ok: 1 }, { ok: 1 }, { ok: 1 }, { ok: 1 }])
1929 expect(pool.info.executingTasks).toBe(0)
1930 expect(pool.info.executedTasks).toBe(4)
1931 results = await pool.mapExecute(
1932 [{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }],
1933 'factorial'
1934 )
1935 expect(results).toStrictEqual([
1936 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1937 ])
1938 expect(pool.info.executingTasks).toBe(0)
1939 expect(pool.info.executedTasks).toBe(8)
1940 results = await pool.mapExecute(
1941 new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]),
1942 'factorial'
1943 )
1944 expect(results).toStrictEqual([
1945 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1946 ])
1947 expect(pool.info.executingTasks).toBe(0)
1948 expect(pool.info.executedTasks).toBe(12)
1949 await pool.destroy()
1950 await expect(pool.mapExecute()).rejects.toThrow(
1951 new Error('Cannot execute task(s) on not started pool')
1952 )
1953 })
1954
1955 it('Verify that task function objects worker is working', async () => {
1956 const pool = new DynamicThreadPool(
1957 Math.floor(numberOfWorkers / 2),
1958 numberOfWorkers,
1959 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1960 )
1961 const data = { n: 10 }
1962 const result0 = await pool.execute(data)
1963 expect(result0).toStrictEqual(3628800)
1964 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1965 expect(result1).toStrictEqual({ ok: 1 })
1966 const result2 = await pool.execute(data, 'factorial')
1967 expect(result2).toBe(3628800)
1968 const result3 = await pool.execute(data, 'fibonacci')
1969 expect(result3).toBe(55)
1970 expect(pool.info.executingTasks).toBe(0)
1971 expect(pool.info.executedTasks).toBe(4)
1972 for (const workerNode of pool.workerNodes) {
1973 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1974 { name: DEFAULT_TASK_NAME },
1975 { name: 'factorial' },
1976 { name: 'fibonacci', priority: -5 },
1977 { name: 'jsonIntegerSerialization' },
1978 ])
1979 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1980 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1981 expect(workerNode.tasksQueue.enablePriority).toBe(true)
1982 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1983 expect(
1984 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1985 ).toStrictEqual({
1986 elu: {
1987 active: {
1988 history: expect.any(CircularBuffer),
1989 },
1990 idle: {
1991 history: expect.any(CircularBuffer),
1992 },
1993 },
1994 runTime: {
1995 history: expect.any(CircularBuffer),
1996 },
1997 tasks: {
1998 executed: expect.any(Number),
1999 executing: 0,
2000 failed: 0,
2001 queued: 0,
2002 sequentiallyStolen: 0,
2003 stolen: 0,
2004 },
2005 waitTime: {
2006 history: expect.any(CircularBuffer),
2007 },
2008 })
2009 expect(
2010 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
2011 .tasks.executed
2012 ).toBeGreaterThan(0)
2013 }
2014 expect(
2015 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
2016 ).toStrictEqual(
2017 workerNode.getTaskFunctionWorkerUsage(
2018 workerNode.info.taskFunctionsProperties[1].name
2019 )
2020 )
2021 }
2022 await pool.destroy()
2023 })
2024
2025 it('Verify sendKillMessageToWorker()', async () => {
2026 const pool = new DynamicClusterPool(
2027 Math.floor(numberOfWorkers / 2),
2028 numberOfWorkers,
2029 './tests/worker-files/cluster/testWorker.cjs'
2030 )
2031 const workerNodeKey = 0
2032 await expect(
2033 pool.sendKillMessageToWorker(workerNodeKey)
2034 ).resolves.toBeUndefined()
2035 await pool.destroy()
2036 })
2037
2038 it('Verify sendTaskFunctionOperationToWorker()', async () => {
2039 const pool = new DynamicClusterPool(
2040 Math.floor(numberOfWorkers / 2),
2041 numberOfWorkers,
2042 './tests/worker-files/cluster/testWorker.cjs'
2043 )
2044 const workerNodeKey = 0
2045 await expect(
2046 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
2047 taskFunction: (() => {}).toString(),
2048 taskFunctionOperation: 'add',
2049 taskFunctionProperties: { name: 'empty' },
2050 })
2051 ).resolves.toBe(true)
2052 expect(
2053 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
2054 ).toStrictEqual([
2055 { name: DEFAULT_TASK_NAME },
2056 { name: 'test' },
2057 { name: 'empty' },
2058 ])
2059 await pool.destroy()
2060 })
2061
2062 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
2063 const pool = new DynamicClusterPool(
2064 Math.floor(numberOfWorkers / 2),
2065 numberOfWorkers,
2066 './tests/worker-files/cluster/testWorker.cjs'
2067 )
2068 await expect(
2069 pool.sendTaskFunctionOperationToWorkers({
2070 taskFunction: (() => {}).toString(),
2071 taskFunctionOperation: 'add',
2072 taskFunctionProperties: { name: 'empty' },
2073 })
2074 ).resolves.toBe(true)
2075 for (const workerNode of pool.workerNodes) {
2076 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
2077 { name: DEFAULT_TASK_NAME },
2078 { name: 'test' },
2079 { name: 'empty' },
2080 ])
2081 }
2082 await pool.destroy()
2083 })
2084 })