build(deps): bump poolifier in /examples/typescript/smtp-client-pool
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
7 import {
8 DynamicClusterPool,
9 DynamicThreadPool,
10 FixedClusterPool,
11 FixedThreadPool,
12 PoolEvents,
13 PoolTypes,
14 WorkerChoiceStrategies,
15 WorkerTypes
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
22
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
25 readFileSync(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
27 'utf8'
28 )
29 ).version
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
32 isMain () {
33 return false
34 }
35 }
36
37 afterEach(() => {
38 restore()
39 })
40
41 it('Simulate pool creation from a non main thread/process', () => {
42 expect(
43 () =>
44 new StubPoolWithIsMain(
45 numberOfWorkers,
46 './tests/worker-files/thread/testWorker.mjs',
47 {
48 errorHandler: e => console.error(e)
49 }
50 )
51 ).toThrow(
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
55 )
56 })
57
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
61 './tests/worker-files/thread/testWorker.mjs'
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
66 })
67
68 it('Verify that filePath is checked', () => {
69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
70 new Error("Cannot find the worker file 'undefined'")
71 )
72 expect(
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
74 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
75 })
76
77 it('Verify that numberOfWorkers is checked', () => {
78 expect(
79 () =>
80 new FixedThreadPool(
81 undefined,
82 './tests/worker-files/thread/testWorker.mjs'
83 )
84 ).toThrow(
85 new Error(
86 'Cannot instantiate a pool without specifying the number of workers'
87 )
88 )
89 })
90
91 it('Verify that a negative number of workers is checked', () => {
92 expect(
93 () =>
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
95 ).toThrow(
96 new RangeError(
97 'Cannot instantiate a pool with a negative number of workers'
98 )
99 )
100 })
101
102 it('Verify that a non integer number of workers is checked', () => {
103 expect(
104 () =>
105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
106 ).toThrow(
107 new TypeError(
108 'Cannot instantiate a pool with a non safe integer number of workers'
109 )
110 )
111 })
112
113 it('Verify that dynamic pool sizing is checked', () => {
114 expect(
115 () =>
116 new DynamicClusterPool(
117 1,
118 undefined,
119 './tests/worker-files/cluster/testWorker.js'
120 )
121 ).toThrow(
122 new TypeError(
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 )
125 )
126 expect(
127 () =>
128 new DynamicThreadPool(
129 0.5,
130 1,
131 './tests/worker-files/thread/testWorker.mjs'
132 )
133 ).toThrow(
134 new TypeError(
135 'Cannot instantiate a pool with a non safe integer number of workers'
136 )
137 )
138 expect(
139 () =>
140 new DynamicClusterPool(
141 0,
142 0.5,
143 './tests/worker-files/cluster/testWorker.js'
144 )
145 ).toThrow(
146 new TypeError(
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 )
149 )
150 expect(
151 () =>
152 new DynamicThreadPool(
153 2,
154 1,
155 './tests/worker-files/thread/testWorker.mjs'
156 )
157 ).toThrow(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
164 new DynamicThreadPool(
165 0,
166 0,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
169 ).toThrow(
170 new RangeError(
171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
172 )
173 )
174 expect(
175 () =>
176 new DynamicClusterPool(
177 1,
178 1,
179 './tests/worker-files/cluster/testWorker.js'
180 )
181 ).toThrow(
182 new RangeError(
183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
184 )
185 )
186 })
187
188 it('Verify that pool options are checked', async () => {
189 let pool = new FixedThreadPool(
190 numberOfWorkers,
191 './tests/worker-files/thread/testWorker.mjs'
192 )
193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
194 expect(pool.opts).toStrictEqual({
195 startWorkers: true,
196 enableEvents: true,
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
201 retries: 6,
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
205 }
206 })
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
208 retries: 6,
209 runTime: { median: false },
210 waitTime: { median: false },
211 elu: { median: false }
212 })
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
216 retries: 6,
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
220 })
221 }
222 await pool.destroy()
223 const testHandler = () => console.info('test handler executed')
224 pool = new FixedThreadPool(
225 numberOfWorkers,
226 './tests/worker-files/thread/testWorker.mjs',
227 {
228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
229 workerChoiceStrategyOptions: {
230 runTime: { median: true },
231 weights: { 0: 300, 1: 200 }
232 },
233 enableEvents: false,
234 restartWorkerOnError: false,
235 enableTasksQueue: true,
236 tasksQueueOptions: { concurrency: 2 },
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
241 }
242 )
243 expect(pool.emitter).toBeUndefined()
244 expect(pool.opts).toStrictEqual({
245 startWorkers: true,
246 enableEvents: false,
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
249 tasksQueueOptions: {
250 concurrency: 2,
251 size: Math.pow(numberOfWorkers, 2),
252 taskStealing: true,
253 tasksStealingOnBackPressure: true
254 },
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
257 retries: 6,
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
262 },
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
267 })
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
269 retries: 6,
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
274 })
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
278 retries: 6,
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
283 })
284 }
285 await pool.destroy()
286 })
287
288 it('Verify that pool options are validated', () => {
289 expect(
290 () =>
291 new FixedThreadPool(
292 numberOfWorkers,
293 './tests/worker-files/thread/testWorker.mjs',
294 {
295 workerChoiceStrategy: 'invalidStrategy'
296 }
297 )
298 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
299 expect(
300 () =>
301 new FixedThreadPool(
302 numberOfWorkers,
303 './tests/worker-files/thread/testWorker.mjs',
304 {
305 workerChoiceStrategyOptions: {
306 retries: 'invalidChoiceRetries'
307 }
308 }
309 )
310 ).toThrow(
311 new TypeError(
312 'Invalid worker choice strategy options: retries must be an integer'
313 )
314 )
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
319 './tests/worker-files/thread/testWorker.mjs',
320 {
321 workerChoiceStrategyOptions: {
322 retries: -1
323 }
324 }
325 )
326 ).toThrow(
327 new RangeError(
328 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
329 )
330 )
331 expect(
332 () =>
333 new FixedThreadPool(
334 numberOfWorkers,
335 './tests/worker-files/thread/testWorker.mjs',
336 {
337 workerChoiceStrategyOptions: { weights: {} }
338 }
339 )
340 ).toThrow(
341 new Error(
342 'Invalid worker choice strategy options: must have a weight for each worker node'
343 )
344 )
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
349 './tests/worker-files/thread/testWorker.mjs',
350 {
351 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
352 }
353 )
354 ).toThrow(
355 new Error(
356 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
357 )
358 )
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
363 './tests/worker-files/thread/testWorker.mjs',
364 {
365 enableTasksQueue: true,
366 tasksQueueOptions: 'invalidTasksQueueOptions'
367 }
368 )
369 ).toThrow(
370 new TypeError('Invalid tasks queue options: must be a plain object')
371 )
372 expect(
373 () =>
374 new FixedThreadPool(
375 numberOfWorkers,
376 './tests/worker-files/thread/testWorker.mjs',
377 {
378 enableTasksQueue: true,
379 tasksQueueOptions: { concurrency: 0 }
380 }
381 )
382 ).toThrow(
383 new RangeError(
384 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
385 )
386 )
387 expect(
388 () =>
389 new FixedThreadPool(
390 numberOfWorkers,
391 './tests/worker-files/thread/testWorker.mjs',
392 {
393 enableTasksQueue: true,
394 tasksQueueOptions: { concurrency: -1 }
395 }
396 )
397 ).toThrow(
398 new RangeError(
399 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
400 )
401 )
402 expect(
403 () =>
404 new FixedThreadPool(
405 numberOfWorkers,
406 './tests/worker-files/thread/testWorker.mjs',
407 {
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: 0.2 }
410 }
411 )
412 ).toThrow(
413 new TypeError('Invalid worker node tasks concurrency: must be an integer')
414 )
415 expect(
416 () =>
417 new FixedThreadPool(
418 numberOfWorkers,
419 './tests/worker-files/thread/testWorker.mjs',
420 {
421 enableTasksQueue: true,
422 tasksQueueOptions: { size: 0 }
423 }
424 )
425 ).toThrow(
426 new RangeError(
427 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
428 )
429 )
430 expect(
431 () =>
432 new FixedThreadPool(
433 numberOfWorkers,
434 './tests/worker-files/thread/testWorker.mjs',
435 {
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: -1 }
438 }
439 )
440 ).toThrow(
441 new RangeError(
442 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
443 )
444 )
445 expect(
446 () =>
447 new FixedThreadPool(
448 numberOfWorkers,
449 './tests/worker-files/thread/testWorker.mjs',
450 {
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: 0.2 }
453 }
454 )
455 ).toThrow(
456 new TypeError('Invalid worker node tasks queue size: must be an integer')
457 )
458 })
459
460 it('Verify that pool worker choice strategy options can be set', async () => {
461 const pool = new FixedThreadPool(
462 numberOfWorkers,
463 './tests/worker-files/thread/testWorker.mjs',
464 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
465 )
466 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
467 retries: 6,
468 runTime: { median: false },
469 waitTime: { median: false },
470 elu: { median: false }
471 })
472 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
473 retries: 6,
474 runTime: { median: false },
475 waitTime: { median: false },
476 elu: { median: false }
477 })
478 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
479 .workerChoiceStrategies) {
480 expect(workerChoiceStrategy.opts).toStrictEqual({
481 retries: 6,
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
485 })
486 }
487 expect(
488 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 ).toStrictEqual({
490 runTime: {
491 aggregate: true,
492 average: true,
493 median: false
494 },
495 waitTime: {
496 aggregate: false,
497 average: false,
498 median: false
499 },
500 elu: {
501 aggregate: true,
502 average: true,
503 median: false
504 }
505 })
506 pool.setWorkerChoiceStrategyOptions({
507 runTime: { median: true },
508 elu: { median: true }
509 })
510 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
511 retries: 6,
512 runTime: { median: true },
513 waitTime: { median: false },
514 elu: { median: true }
515 })
516 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
517 retries: 6,
518 runTime: { median: true },
519 waitTime: { median: false },
520 elu: { median: true }
521 })
522 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
523 .workerChoiceStrategies) {
524 expect(workerChoiceStrategy.opts).toStrictEqual({
525 retries: 6,
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
529 })
530 }
531 expect(
532 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
533 ).toStrictEqual({
534 runTime: {
535 aggregate: true,
536 average: false,
537 median: true
538 },
539 waitTime: {
540 aggregate: false,
541 average: false,
542 median: false
543 },
544 elu: {
545 aggregate: true,
546 average: false,
547 median: true
548 }
549 })
550 pool.setWorkerChoiceStrategyOptions({
551 runTime: { median: false },
552 elu: { median: false }
553 })
554 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
555 retries: 6,
556 runTime: { median: false },
557 waitTime: { median: false },
558 elu: { median: false }
559 })
560 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
561 retries: 6,
562 runTime: { median: false },
563 waitTime: { median: false },
564 elu: { median: false }
565 })
566 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
567 .workerChoiceStrategies) {
568 expect(workerChoiceStrategy.opts).toStrictEqual({
569 retries: 6,
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
573 })
574 }
575 expect(
576 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
577 ).toStrictEqual({
578 runTime: {
579 aggregate: true,
580 average: true,
581 median: false
582 },
583 waitTime: {
584 aggregate: false,
585 average: false,
586 median: false
587 },
588 elu: {
589 aggregate: true,
590 average: true,
591 median: false
592 }
593 })
594 expect(() =>
595 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
596 ).toThrow(
597 new TypeError(
598 'Invalid worker choice strategy options: must be a plain object'
599 )
600 )
601 expect(() =>
602 pool.setWorkerChoiceStrategyOptions({
603 retries: 'invalidChoiceRetries'
604 })
605 ).toThrow(
606 new TypeError(
607 'Invalid worker choice strategy options: retries must be an integer'
608 )
609 )
610 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
611 new RangeError(
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
613 )
614 )
615 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
616 new Error(
617 'Invalid worker choice strategy options: must have a weight for each worker node'
618 )
619 )
620 expect(() =>
621 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
622 ).toThrow(
623 new Error(
624 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
625 )
626 )
627 await pool.destroy()
628 })
629
630 it('Verify that pool tasks queue can be enabled/disabled', async () => {
631 const pool = new FixedThreadPool(
632 numberOfWorkers,
633 './tests/worker-files/thread/testWorker.mjs'
634 )
635 expect(pool.opts.enableTasksQueue).toBe(false)
636 expect(pool.opts.tasksQueueOptions).toBeUndefined()
637 pool.enableTasksQueue(true)
638 expect(pool.opts.enableTasksQueue).toBe(true)
639 expect(pool.opts.tasksQueueOptions).toStrictEqual({
640 concurrency: 1,
641 size: Math.pow(numberOfWorkers, 2),
642 taskStealing: true,
643 tasksStealingOnBackPressure: true
644 })
645 pool.enableTasksQueue(true, { concurrency: 2 })
646 expect(pool.opts.enableTasksQueue).toBe(true)
647 expect(pool.opts.tasksQueueOptions).toStrictEqual({
648 concurrency: 2,
649 size: Math.pow(numberOfWorkers, 2),
650 taskStealing: true,
651 tasksStealingOnBackPressure: true
652 })
653 pool.enableTasksQueue(false)
654 expect(pool.opts.enableTasksQueue).toBe(false)
655 expect(pool.opts.tasksQueueOptions).toBeUndefined()
656 await pool.destroy()
657 })
658
659 it('Verify that pool tasks queue options can be set', async () => {
660 const pool = new FixedThreadPool(
661 numberOfWorkers,
662 './tests/worker-files/thread/testWorker.mjs',
663 { enableTasksQueue: true }
664 )
665 expect(pool.opts.tasksQueueOptions).toStrictEqual({
666 concurrency: 1,
667 size: Math.pow(numberOfWorkers, 2),
668 taskStealing: true,
669 tasksStealingOnBackPressure: true
670 })
671 for (const workerNode of pool.workerNodes) {
672 expect(workerNode.tasksQueueBackPressureSize).toBe(
673 pool.opts.tasksQueueOptions.size
674 )
675 }
676 pool.setTasksQueueOptions({
677 concurrency: 2,
678 size: 2,
679 taskStealing: false,
680 tasksStealingOnBackPressure: false
681 })
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
683 concurrency: 2,
684 size: 2,
685 taskStealing: false,
686 tasksStealingOnBackPressure: false
687 })
688 for (const workerNode of pool.workerNodes) {
689 expect(workerNode.tasksQueueBackPressureSize).toBe(
690 pool.opts.tasksQueueOptions.size
691 )
692 }
693 pool.setTasksQueueOptions({
694 concurrency: 1,
695 taskStealing: true,
696 tasksStealingOnBackPressure: true
697 })
698 expect(pool.opts.tasksQueueOptions).toStrictEqual({
699 concurrency: 1,
700 size: Math.pow(numberOfWorkers, 2),
701 taskStealing: true,
702 tasksStealingOnBackPressure: true
703 })
704 for (const workerNode of pool.workerNodes) {
705 expect(workerNode.tasksQueueBackPressureSize).toBe(
706 pool.opts.tasksQueueOptions.size
707 )
708 }
709 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
710 new TypeError('Invalid tasks queue options: must be a plain object')
711 )
712 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
713 new RangeError(
714 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
715 )
716 )
717 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
718 new RangeError(
719 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
720 )
721 )
722 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
723 new TypeError('Invalid worker node tasks concurrency: must be an integer')
724 )
725 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
726 new RangeError(
727 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
728 )
729 )
730 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
731 new RangeError(
732 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
733 )
734 )
735 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
736 new TypeError('Invalid worker node tasks queue size: must be an integer')
737 )
738 await pool.destroy()
739 })
740
741 it('Verify that pool info is set', async () => {
742 let pool = new FixedThreadPool(
743 numberOfWorkers,
744 './tests/worker-files/thread/testWorker.mjs'
745 )
746 expect(pool.info).toStrictEqual({
747 version,
748 type: PoolTypes.fixed,
749 worker: WorkerTypes.thread,
750 started: true,
751 ready: true,
752 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
753 minSize: numberOfWorkers,
754 maxSize: numberOfWorkers,
755 workerNodes: numberOfWorkers,
756 idleWorkerNodes: numberOfWorkers,
757 busyWorkerNodes: 0,
758 executedTasks: 0,
759 executingTasks: 0,
760 failedTasks: 0
761 })
762 await pool.destroy()
763 pool = new DynamicClusterPool(
764 Math.floor(numberOfWorkers / 2),
765 numberOfWorkers,
766 './tests/worker-files/cluster/testWorker.js'
767 )
768 expect(pool.info).toStrictEqual({
769 version,
770 type: PoolTypes.dynamic,
771 worker: WorkerTypes.cluster,
772 started: true,
773 ready: true,
774 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
775 minSize: Math.floor(numberOfWorkers / 2),
776 maxSize: numberOfWorkers,
777 workerNodes: Math.floor(numberOfWorkers / 2),
778 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
779 busyWorkerNodes: 0,
780 executedTasks: 0,
781 executingTasks: 0,
782 failedTasks: 0
783 })
784 await pool.destroy()
785 })
786
787 it('Verify that pool worker tasks usage are initialized', async () => {
788 const pool = new FixedClusterPool(
789 numberOfWorkers,
790 './tests/worker-files/cluster/testWorker.js'
791 )
792 for (const workerNode of pool.workerNodes) {
793 expect(workerNode).toBeInstanceOf(WorkerNode)
794 expect(workerNode.usage).toStrictEqual({
795 tasks: {
796 executed: 0,
797 executing: 0,
798 queued: 0,
799 maxQueued: 0,
800 stolen: 0,
801 failed: 0
802 },
803 runTime: {
804 history: new CircularArray()
805 },
806 waitTime: {
807 history: new CircularArray()
808 },
809 elu: {
810 idle: {
811 history: new CircularArray()
812 },
813 active: {
814 history: new CircularArray()
815 }
816 }
817 })
818 }
819 await pool.destroy()
820 })
821
822 it('Verify that pool worker tasks queue are initialized', async () => {
823 let pool = new FixedClusterPool(
824 numberOfWorkers,
825 './tests/worker-files/cluster/testWorker.js'
826 )
827 for (const workerNode of pool.workerNodes) {
828 expect(workerNode).toBeInstanceOf(WorkerNode)
829 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
830 expect(workerNode.tasksQueue.size).toBe(0)
831 expect(workerNode.tasksQueue.maxSize).toBe(0)
832 }
833 await pool.destroy()
834 pool = new DynamicThreadPool(
835 Math.floor(numberOfWorkers / 2),
836 numberOfWorkers,
837 './tests/worker-files/thread/testWorker.mjs'
838 )
839 for (const workerNode of pool.workerNodes) {
840 expect(workerNode).toBeInstanceOf(WorkerNode)
841 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
842 expect(workerNode.tasksQueue.size).toBe(0)
843 expect(workerNode.tasksQueue.maxSize).toBe(0)
844 }
845 await pool.destroy()
846 })
847
848 it('Verify that pool worker info are initialized', async () => {
849 let pool = new FixedClusterPool(
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
853 for (const workerNode of pool.workerNodes) {
854 expect(workerNode).toBeInstanceOf(WorkerNode)
855 expect(workerNode.info).toStrictEqual({
856 id: expect.any(Number),
857 type: WorkerTypes.cluster,
858 dynamic: false,
859 ready: true
860 })
861 }
862 await pool.destroy()
863 pool = new DynamicThreadPool(
864 Math.floor(numberOfWorkers / 2),
865 numberOfWorkers,
866 './tests/worker-files/thread/testWorker.mjs'
867 )
868 for (const workerNode of pool.workerNodes) {
869 expect(workerNode).toBeInstanceOf(WorkerNode)
870 expect(workerNode.info).toStrictEqual({
871 id: expect.any(Number),
872 type: WorkerTypes.thread,
873 dynamic: false,
874 ready: true
875 })
876 }
877 await pool.destroy()
878 })
879
880 it('Verify that pool can be started after initialization', async () => {
881 const pool = new FixedClusterPool(
882 numberOfWorkers,
883 './tests/worker-files/cluster/testWorker.js',
884 {
885 startWorkers: false
886 }
887 )
888 expect(pool.info.started).toBe(false)
889 expect(pool.info.ready).toBe(false)
890 expect(pool.workerNodes).toStrictEqual([])
891 await expect(pool.execute()).rejects.toThrow(
892 new Error('Cannot execute a task on not started pool')
893 )
894 pool.start()
895 expect(pool.info.started).toBe(true)
896 expect(pool.info.ready).toBe(true)
897 expect(pool.workerNodes.length).toBe(numberOfWorkers)
898 for (const workerNode of pool.workerNodes) {
899 expect(workerNode).toBeInstanceOf(WorkerNode)
900 }
901 await pool.destroy()
902 })
903
904 it('Verify that pool execute() arguments are checked', async () => {
905 const pool = new FixedClusterPool(
906 numberOfWorkers,
907 './tests/worker-files/cluster/testWorker.js'
908 )
909 await expect(pool.execute(undefined, 0)).rejects.toThrow(
910 new TypeError('name argument must be a string')
911 )
912 await expect(pool.execute(undefined, '')).rejects.toThrow(
913 new TypeError('name argument must not be an empty string')
914 )
915 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
916 new TypeError('transferList argument must be an array')
917 )
918 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
919 "Task function 'unknown' not found"
920 )
921 await pool.destroy()
922 await expect(pool.execute()).rejects.toThrow(
923 new Error('Cannot execute a task on not started pool')
924 )
925 })
926
927 it('Verify that pool worker tasks usage are computed', async () => {
928 const pool = new FixedClusterPool(
929 numberOfWorkers,
930 './tests/worker-files/cluster/testWorker.js'
931 )
932 const promises = new Set()
933 const maxMultiplier = 2
934 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
935 promises.add(pool.execute())
936 }
937 for (const workerNode of pool.workerNodes) {
938 expect(workerNode.usage).toStrictEqual({
939 tasks: {
940 executed: 0,
941 executing: maxMultiplier,
942 queued: 0,
943 maxQueued: 0,
944 stolen: 0,
945 failed: 0
946 },
947 runTime: {
948 history: expect.any(CircularArray)
949 },
950 waitTime: {
951 history: expect.any(CircularArray)
952 },
953 elu: {
954 idle: {
955 history: expect.any(CircularArray)
956 },
957 active: {
958 history: expect.any(CircularArray)
959 }
960 }
961 })
962 }
963 await Promise.all(promises)
964 for (const workerNode of pool.workerNodes) {
965 expect(workerNode.usage).toStrictEqual({
966 tasks: {
967 executed: maxMultiplier,
968 executing: 0,
969 queued: 0,
970 maxQueued: 0,
971 stolen: 0,
972 failed: 0
973 },
974 runTime: {
975 history: expect.any(CircularArray)
976 },
977 waitTime: {
978 history: expect.any(CircularArray)
979 },
980 elu: {
981 idle: {
982 history: expect.any(CircularArray)
983 },
984 active: {
985 history: expect.any(CircularArray)
986 }
987 }
988 })
989 }
990 await pool.destroy()
991 })
992
993 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
994 const pool = new DynamicThreadPool(
995 Math.floor(numberOfWorkers / 2),
996 numberOfWorkers,
997 './tests/worker-files/thread/testWorker.mjs'
998 )
999 const promises = new Set()
1000 const maxMultiplier = 2
1001 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1002 promises.add(pool.execute())
1003 }
1004 await Promise.all(promises)
1005 for (const workerNode of pool.workerNodes) {
1006 expect(workerNode.usage).toStrictEqual({
1007 tasks: {
1008 executed: expect.any(Number),
1009 executing: 0,
1010 queued: 0,
1011 maxQueued: 0,
1012 stolen: 0,
1013 failed: 0
1014 },
1015 runTime: {
1016 history: expect.any(CircularArray)
1017 },
1018 waitTime: {
1019 history: expect.any(CircularArray)
1020 },
1021 elu: {
1022 idle: {
1023 history: expect.any(CircularArray)
1024 },
1025 active: {
1026 history: expect.any(CircularArray)
1027 }
1028 }
1029 })
1030 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1031 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1032 numberOfWorkers * maxMultiplier
1033 )
1034 expect(workerNode.usage.runTime.history.length).toBe(0)
1035 expect(workerNode.usage.waitTime.history.length).toBe(0)
1036 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1037 expect(workerNode.usage.elu.active.history.length).toBe(0)
1038 }
1039 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1040 for (const workerNode of pool.workerNodes) {
1041 expect(workerNode.usage).toStrictEqual({
1042 tasks: {
1043 executed: 0,
1044 executing: 0,
1045 queued: 0,
1046 maxQueued: 0,
1047 stolen: 0,
1048 failed: 0
1049 },
1050 runTime: {
1051 history: expect.any(CircularArray)
1052 },
1053 waitTime: {
1054 history: expect.any(CircularArray)
1055 },
1056 elu: {
1057 idle: {
1058 history: expect.any(CircularArray)
1059 },
1060 active: {
1061 history: expect.any(CircularArray)
1062 }
1063 }
1064 })
1065 expect(workerNode.usage.runTime.history.length).toBe(0)
1066 expect(workerNode.usage.waitTime.history.length).toBe(0)
1067 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1068 expect(workerNode.usage.elu.active.history.length).toBe(0)
1069 }
1070 await pool.destroy()
1071 })
1072
1073 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1074 const pool = new DynamicClusterPool(
1075 Math.floor(numberOfWorkers / 2),
1076 numberOfWorkers,
1077 './tests/worker-files/cluster/testWorker.js'
1078 )
1079 expect(pool.emitter.eventNames()).toStrictEqual([])
1080 let poolInfo
1081 let poolReady = 0
1082 pool.emitter.on(PoolEvents.ready, info => {
1083 ++poolReady
1084 poolInfo = info
1085 })
1086 await waitPoolEvents(pool, PoolEvents.ready, 1)
1087 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1088 expect(poolReady).toBe(1)
1089 expect(poolInfo).toStrictEqual({
1090 version,
1091 type: PoolTypes.dynamic,
1092 worker: WorkerTypes.cluster,
1093 started: true,
1094 ready: true,
1095 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1096 minSize: expect.any(Number),
1097 maxSize: expect.any(Number),
1098 workerNodes: expect.any(Number),
1099 idleWorkerNodes: expect.any(Number),
1100 busyWorkerNodes: expect.any(Number),
1101 executedTasks: expect.any(Number),
1102 executingTasks: expect.any(Number),
1103 failedTasks: expect.any(Number)
1104 })
1105 await pool.destroy()
1106 })
1107
1108 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1109 const pool = new FixedThreadPool(
1110 numberOfWorkers,
1111 './tests/worker-files/thread/testWorker.mjs'
1112 )
1113 expect(pool.emitter.eventNames()).toStrictEqual([])
1114 const promises = new Set()
1115 let poolBusy = 0
1116 let poolInfo
1117 pool.emitter.on(PoolEvents.busy, info => {
1118 ++poolBusy
1119 poolInfo = info
1120 })
1121 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1122 for (let i = 0; i < numberOfWorkers * 2; i++) {
1123 promises.add(pool.execute())
1124 }
1125 await Promise.all(promises)
1126 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1127 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1128 expect(poolBusy).toBe(numberOfWorkers + 1)
1129 expect(poolInfo).toStrictEqual({
1130 version,
1131 type: PoolTypes.fixed,
1132 worker: WorkerTypes.thread,
1133 started: true,
1134 ready: true,
1135 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 minSize: expect.any(Number),
1137 maxSize: expect.any(Number),
1138 workerNodes: expect.any(Number),
1139 idleWorkerNodes: expect.any(Number),
1140 busyWorkerNodes: expect.any(Number),
1141 executedTasks: expect.any(Number),
1142 executingTasks: expect.any(Number),
1143 failedTasks: expect.any(Number)
1144 })
1145 await pool.destroy()
1146 })
1147
1148 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1149 const pool = new DynamicThreadPool(
1150 Math.floor(numberOfWorkers / 2),
1151 numberOfWorkers,
1152 './tests/worker-files/thread/testWorker.mjs'
1153 )
1154 expect(pool.emitter.eventNames()).toStrictEqual([])
1155 const promises = new Set()
1156 let poolFull = 0
1157 let poolInfo
1158 pool.emitter.on(PoolEvents.full, info => {
1159 ++poolFull
1160 poolInfo = info
1161 })
1162 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1163 for (let i = 0; i < numberOfWorkers * 2; i++) {
1164 promises.add(pool.execute())
1165 }
1166 await Promise.all(promises)
1167 expect(poolFull).toBe(1)
1168 expect(poolInfo).toStrictEqual({
1169 version,
1170 type: PoolTypes.dynamic,
1171 worker: WorkerTypes.thread,
1172 started: true,
1173 ready: true,
1174 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1175 minSize: expect.any(Number),
1176 maxSize: expect.any(Number),
1177 workerNodes: expect.any(Number),
1178 idleWorkerNodes: expect.any(Number),
1179 busyWorkerNodes: expect.any(Number),
1180 executedTasks: expect.any(Number),
1181 executingTasks: expect.any(Number),
1182 failedTasks: expect.any(Number)
1183 })
1184 await pool.destroy()
1185 })
1186
1187 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1188 const pool = new FixedThreadPool(
1189 numberOfWorkers,
1190 './tests/worker-files/thread/testWorker.mjs',
1191 {
1192 enableTasksQueue: true
1193 }
1194 )
1195 stub(pool, 'hasBackPressure').returns(true)
1196 expect(pool.emitter.eventNames()).toStrictEqual([])
1197 const promises = new Set()
1198 let poolBackPressure = 0
1199 let poolInfo
1200 pool.emitter.on(PoolEvents.backPressure, info => {
1201 ++poolBackPressure
1202 poolInfo = info
1203 })
1204 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1205 for (let i = 0; i < numberOfWorkers + 1; i++) {
1206 promises.add(pool.execute())
1207 }
1208 await Promise.all(promises)
1209 expect(poolBackPressure).toBe(1)
1210 expect(poolInfo).toStrictEqual({
1211 version,
1212 type: PoolTypes.fixed,
1213 worker: WorkerTypes.thread,
1214 started: true,
1215 ready: true,
1216 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1217 minSize: expect.any(Number),
1218 maxSize: expect.any(Number),
1219 workerNodes: expect.any(Number),
1220 idleWorkerNodes: expect.any(Number),
1221 busyWorkerNodes: expect.any(Number),
1222 executedTasks: expect.any(Number),
1223 executingTasks: expect.any(Number),
1224 maxQueuedTasks: expect.any(Number),
1225 queuedTasks: expect.any(Number),
1226 backPressure: true,
1227 stolenTasks: expect.any(Number),
1228 failedTasks: expect.any(Number)
1229 })
1230 expect(pool.hasBackPressure.called).toBe(true)
1231 await pool.destroy()
1232 })
1233
1234 it('Verify that hasTaskFunction() is working', async () => {
1235 const dynamicThreadPool = new DynamicThreadPool(
1236 Math.floor(numberOfWorkers / 2),
1237 numberOfWorkers,
1238 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1239 )
1240 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1241 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1242 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1243 true
1244 )
1245 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1246 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1247 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1248 await dynamicThreadPool.destroy()
1249 const fixedClusterPool = new FixedClusterPool(
1250 numberOfWorkers,
1251 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1252 )
1253 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1254 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1255 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1256 true
1257 )
1258 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1259 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1260 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1261 await fixedClusterPool.destroy()
1262 })
1263
1264 it('Verify that addTaskFunction() is working', async () => {
1265 const dynamicThreadPool = new DynamicThreadPool(
1266 Math.floor(numberOfWorkers / 2),
1267 numberOfWorkers,
1268 './tests/worker-files/thread/testWorker.mjs'
1269 )
1270 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1271 await expect(
1272 dynamicThreadPool.addTaskFunction(0, () => {})
1273 ).rejects.toThrow(new TypeError('name argument must be a string'))
1274 await expect(
1275 dynamicThreadPool.addTaskFunction('', () => {})
1276 ).rejects.toThrow(
1277 new TypeError('name argument must not be an empty string')
1278 )
1279 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1280 new TypeError('fn argument must be a function')
1281 )
1282 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1283 new TypeError('fn argument must be a function')
1284 )
1285 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1286 DEFAULT_TASK_NAME,
1287 'test'
1288 ])
1289 const echoTaskFunction = data => {
1290 return data
1291 }
1292 await expect(
1293 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1294 ).resolves.toBe(true)
1295 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1296 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1297 echoTaskFunction
1298 )
1299 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1300 DEFAULT_TASK_NAME,
1301 'test',
1302 'echo'
1303 ])
1304 const taskFunctionData = { test: 'test' }
1305 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1306 expect(echoResult).toStrictEqual(taskFunctionData)
1307 for (const workerNode of dynamicThreadPool.workerNodes) {
1308 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1309 tasks: {
1310 executed: expect.any(Number),
1311 executing: 0,
1312 queued: 0,
1313 stolen: 0,
1314 failed: 0
1315 },
1316 runTime: {
1317 history: new CircularArray()
1318 },
1319 waitTime: {
1320 history: new CircularArray()
1321 },
1322 elu: {
1323 idle: {
1324 history: new CircularArray()
1325 },
1326 active: {
1327 history: new CircularArray()
1328 }
1329 }
1330 })
1331 }
1332 await dynamicThreadPool.destroy()
1333 })
1334
1335 it('Verify that removeTaskFunction() is working', async () => {
1336 const dynamicThreadPool = new DynamicThreadPool(
1337 Math.floor(numberOfWorkers / 2),
1338 numberOfWorkers,
1339 './tests/worker-files/thread/testWorker.mjs'
1340 )
1341 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1342 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1343 DEFAULT_TASK_NAME,
1344 'test'
1345 ])
1346 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1347 new Error('Cannot remove a task function not handled on the pool side')
1348 )
1349 const echoTaskFunction = data => {
1350 return data
1351 }
1352 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1353 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1354 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1355 echoTaskFunction
1356 )
1357 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1358 DEFAULT_TASK_NAME,
1359 'test',
1360 'echo'
1361 ])
1362 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1363 true
1364 )
1365 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1366 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1367 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1368 DEFAULT_TASK_NAME,
1369 'test'
1370 ])
1371 await dynamicThreadPool.destroy()
1372 })
1373
1374 it('Verify that listTaskFunctionNames() is working', async () => {
1375 const dynamicThreadPool = new DynamicThreadPool(
1376 Math.floor(numberOfWorkers / 2),
1377 numberOfWorkers,
1378 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1379 )
1380 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1381 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1382 DEFAULT_TASK_NAME,
1383 'jsonIntegerSerialization',
1384 'factorial',
1385 'fibonacci'
1386 ])
1387 await dynamicThreadPool.destroy()
1388 const fixedClusterPool = new FixedClusterPool(
1389 numberOfWorkers,
1390 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1391 )
1392 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1393 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1394 DEFAULT_TASK_NAME,
1395 'jsonIntegerSerialization',
1396 'factorial',
1397 'fibonacci'
1398 ])
1399 await fixedClusterPool.destroy()
1400 })
1401
1402 it('Verify that setDefaultTaskFunction() is working', async () => {
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1405 numberOfWorkers,
1406 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1407 )
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1409 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1410 new Error(
1411 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1412 )
1413 )
1414 await expect(
1415 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1416 ).rejects.toThrow(
1417 new Error(
1418 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1419 )
1420 )
1421 await expect(
1422 dynamicThreadPool.setDefaultTaskFunction('unknown')
1423 ).rejects.toThrow(
1424 new Error(
1425 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1426 )
1427 )
1428 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1429 DEFAULT_TASK_NAME,
1430 'jsonIntegerSerialization',
1431 'factorial',
1432 'fibonacci'
1433 ])
1434 await expect(
1435 dynamicThreadPool.setDefaultTaskFunction('factorial')
1436 ).resolves.toBe(true)
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1438 DEFAULT_TASK_NAME,
1439 'factorial',
1440 'jsonIntegerSerialization',
1441 'fibonacci'
1442 ])
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1447 DEFAULT_TASK_NAME,
1448 'fibonacci',
1449 'jsonIntegerSerialization',
1450 'factorial'
1451 ])
1452 await dynamicThreadPool.destroy()
1453 })
1454
1455 it('Verify that multiple task functions worker is working', async () => {
1456 const pool = new DynamicClusterPool(
1457 Math.floor(numberOfWorkers / 2),
1458 numberOfWorkers,
1459 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1460 )
1461 const data = { n: 10 }
1462 const result0 = await pool.execute(data)
1463 expect(result0).toStrictEqual({ ok: 1 })
1464 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1465 expect(result1).toStrictEqual({ ok: 1 })
1466 const result2 = await pool.execute(data, 'factorial')
1467 expect(result2).toBe(3628800)
1468 const result3 = await pool.execute(data, 'fibonacci')
1469 expect(result3).toBe(55)
1470 expect(pool.info.executingTasks).toBe(0)
1471 expect(pool.info.executedTasks).toBe(4)
1472 for (const workerNode of pool.workerNodes) {
1473 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1474 DEFAULT_TASK_NAME,
1475 'jsonIntegerSerialization',
1476 'factorial',
1477 'fibonacci'
1478 ])
1479 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1480 for (const name of pool.listTaskFunctionNames()) {
1481 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1482 tasks: {
1483 executed: expect.any(Number),
1484 executing: 0,
1485 failed: 0,
1486 queued: 0,
1487 stolen: 0
1488 },
1489 runTime: {
1490 history: expect.any(CircularArray)
1491 },
1492 waitTime: {
1493 history: expect.any(CircularArray)
1494 },
1495 elu: {
1496 idle: {
1497 history: expect.any(CircularArray)
1498 },
1499 active: {
1500 history: expect.any(CircularArray)
1501 }
1502 }
1503 })
1504 expect(
1505 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1506 ).toBeGreaterThan(0)
1507 }
1508 expect(
1509 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1510 ).toStrictEqual(
1511 workerNode.getTaskFunctionWorkerUsage(
1512 workerNode.info.taskFunctionNames[1]
1513 )
1514 )
1515 }
1516 await pool.destroy()
1517 })
1518
1519 it('Verify sendKillMessageToWorker()', async () => {
1520 const pool = new DynamicClusterPool(
1521 Math.floor(numberOfWorkers / 2),
1522 numberOfWorkers,
1523 './tests/worker-files/cluster/testWorker.js'
1524 )
1525 const workerNodeKey = 0
1526 await expect(
1527 pool.sendKillMessageToWorker(workerNodeKey)
1528 ).resolves.toBeUndefined()
1529 await pool.destroy()
1530 })
1531
1532 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1533 const pool = new DynamicClusterPool(
1534 Math.floor(numberOfWorkers / 2),
1535 numberOfWorkers,
1536 './tests/worker-files/cluster/testWorker.js'
1537 )
1538 const workerNodeKey = 0
1539 await expect(
1540 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1541 taskFunctionOperation: 'add',
1542 taskFunctionName: 'empty',
1543 taskFunction: (() => {}).toString()
1544 })
1545 ).resolves.toBe(true)
1546 expect(
1547 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1548 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1549 await pool.destroy()
1550 })
1551
1552 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1553 const pool = new DynamicClusterPool(
1554 Math.floor(numberOfWorkers / 2),
1555 numberOfWorkers,
1556 './tests/worker-files/cluster/testWorker.js'
1557 )
1558 await expect(
1559 pool.sendTaskFunctionOperationToWorkers({
1560 taskFunctionOperation: 'add',
1561 taskFunctionName: 'empty',
1562 taskFunction: (() => {}).toString()
1563 })
1564 ).resolves.toBe(true)
1565 for (const workerNode of pool.workerNodes) {
1566 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1567 DEFAULT_TASK_NAME,
1568 'test',
1569 'empty'
1570 ])
1571 }
1572 await pool.destroy()
1573 })
1574 })