Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
7 import {
8 DynamicClusterPool,
9 DynamicThreadPool,
10 FixedClusterPool,
11 FixedThreadPool,
12 PoolEvents,
13 PoolTypes,
14 WorkerChoiceStrategies,
15 WorkerTypes
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
22
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
25 readFileSync(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
27 'utf8'
28 )
29 ).version
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
32 isMain () {
33 return false
34 }
35 }
36
37 afterEach(() => {
38 restore()
39 })
40
41 it('Simulate pool creation from a non main thread/process', () => {
42 expect(
43 () =>
44 new StubPoolWithIsMain(
45 numberOfWorkers,
46 './tests/worker-files/thread/testWorker.mjs',
47 {
48 errorHandler: e => console.error(e)
49 }
50 )
51 ).toThrowError(
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
55 )
56 })
57
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
61 './tests/worker-files/thread/testWorker.mjs'
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
66 })
67
68 it('Verify that filePath is checked', () => {
69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
70 new Error("Cannot find the worker file 'undefined'")
71 )
72 expect(
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
74 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
75 })
76
77 it('Verify that numberOfWorkers is checked', () => {
78 expect(
79 () =>
80 new FixedThreadPool(
81 undefined,
82 './tests/worker-files/thread/testWorker.mjs'
83 )
84 ).toThrowError(
85 new Error(
86 'Cannot instantiate a pool without specifying the number of workers'
87 )
88 )
89 })
90
91 it('Verify that a negative number of workers is checked', () => {
92 expect(
93 () =>
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
95 ).toThrowError(
96 new RangeError(
97 'Cannot instantiate a pool with a negative number of workers'
98 )
99 )
100 })
101
102 it('Verify that a non integer number of workers is checked', () => {
103 expect(
104 () =>
105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
106 ).toThrowError(
107 new TypeError(
108 'Cannot instantiate a pool with a non safe integer number of workers'
109 )
110 )
111 })
112
113 it('Verify that dynamic pool sizing is checked', () => {
114 expect(
115 () =>
116 new DynamicClusterPool(
117 1,
118 undefined,
119 './tests/worker-files/cluster/testWorker.js'
120 )
121 ).toThrowError(
122 new TypeError(
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 )
125 )
126 expect(
127 () =>
128 new DynamicThreadPool(
129 0.5,
130 1,
131 './tests/worker-files/thread/testWorker.mjs'
132 )
133 ).toThrowError(
134 new TypeError(
135 'Cannot instantiate a pool with a non safe integer number of workers'
136 )
137 )
138 expect(
139 () =>
140 new DynamicClusterPool(
141 0,
142 0.5,
143 './tests/worker-files/cluster/testWorker.js'
144 )
145 ).toThrowError(
146 new TypeError(
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 )
149 )
150 expect(
151 () =>
152 new DynamicThreadPool(
153 2,
154 1,
155 './tests/worker-files/thread/testWorker.mjs'
156 )
157 ).toThrowError(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
164 new DynamicThreadPool(
165 0,
166 0,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
169 ).toThrowError(
170 new RangeError(
171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
172 )
173 )
174 expect(
175 () =>
176 new DynamicClusterPool(
177 1,
178 1,
179 './tests/worker-files/cluster/testWorker.js'
180 )
181 ).toThrowError(
182 new RangeError(
183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
184 )
185 )
186 })
187
188 it('Verify that pool options are checked', async () => {
189 let pool = new FixedThreadPool(
190 numberOfWorkers,
191 './tests/worker-files/thread/testWorker.mjs'
192 )
193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
194 expect(pool.opts).toStrictEqual({
195 startWorkers: true,
196 enableEvents: true,
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
201 retries: 6,
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
205 }
206 })
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
208 retries: 6,
209 runTime: { median: false },
210 waitTime: { median: false },
211 elu: { median: false }
212 })
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
216 retries: 6,
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
220 })
221 }
222 await pool.destroy()
223 const testHandler = () => console.info('test handler executed')
224 pool = new FixedThreadPool(
225 numberOfWorkers,
226 './tests/worker-files/thread/testWorker.mjs',
227 {
228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
229 workerChoiceStrategyOptions: {
230 runTime: { median: true },
231 weights: { 0: 300, 1: 200 }
232 },
233 enableEvents: false,
234 restartWorkerOnError: false,
235 enableTasksQueue: true,
236 tasksQueueOptions: { concurrency: 2 },
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
241 }
242 )
243 expect(pool.emitter).toBeUndefined()
244 expect(pool.opts).toStrictEqual({
245 startWorkers: true,
246 enableEvents: false,
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
249 tasksQueueOptions: {
250 concurrency: 2,
251 size: Math.pow(numberOfWorkers, 2),
252 taskStealing: true,
253 tasksStealingOnBackPressure: true
254 },
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
257 retries: 6,
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
262 },
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
267 })
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
269 retries: 6,
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
274 })
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
278 retries: 6,
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
283 })
284 }
285 await pool.destroy()
286 })
287
288 it('Verify that pool options are validated', () => {
289 expect(
290 () =>
291 new FixedThreadPool(
292 numberOfWorkers,
293 './tests/worker-files/thread/testWorker.mjs',
294 {
295 workerChoiceStrategy: 'invalidStrategy'
296 }
297 )
298 ).toThrowError(
299 new Error("Invalid worker choice strategy 'invalidStrategy'")
300 )
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
305 './tests/worker-files/thread/testWorker.mjs',
306 {
307 workerChoiceStrategyOptions: {
308 retries: 'invalidChoiceRetries'
309 }
310 }
311 )
312 ).toThrowError(
313 new TypeError(
314 'Invalid worker choice strategy options: retries must be an integer'
315 )
316 )
317 expect(
318 () =>
319 new FixedThreadPool(
320 numberOfWorkers,
321 './tests/worker-files/thread/testWorker.mjs',
322 {
323 workerChoiceStrategyOptions: {
324 retries: -1
325 }
326 }
327 )
328 ).toThrowError(
329 new RangeError(
330 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
331 )
332 )
333 expect(
334 () =>
335 new FixedThreadPool(
336 numberOfWorkers,
337 './tests/worker-files/thread/testWorker.mjs',
338 {
339 workerChoiceStrategyOptions: { weights: {} }
340 }
341 )
342 ).toThrowError(
343 new Error(
344 'Invalid worker choice strategy options: must have a weight for each worker node'
345 )
346 )
347 expect(
348 () =>
349 new FixedThreadPool(
350 numberOfWorkers,
351 './tests/worker-files/thread/testWorker.mjs',
352 {
353 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
354 }
355 )
356 ).toThrowError(
357 new Error(
358 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
359 )
360 )
361 expect(
362 () =>
363 new FixedThreadPool(
364 numberOfWorkers,
365 './tests/worker-files/thread/testWorker.mjs',
366 {
367 enableTasksQueue: true,
368 tasksQueueOptions: 'invalidTasksQueueOptions'
369 }
370 )
371 ).toThrowError(
372 new TypeError('Invalid tasks queue options: must be a plain object')
373 )
374 expect(
375 () =>
376 new FixedThreadPool(
377 numberOfWorkers,
378 './tests/worker-files/thread/testWorker.mjs',
379 {
380 enableTasksQueue: true,
381 tasksQueueOptions: { concurrency: 0 }
382 }
383 )
384 ).toThrowError(
385 new RangeError(
386 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
387 )
388 )
389 expect(
390 () =>
391 new FixedThreadPool(
392 numberOfWorkers,
393 './tests/worker-files/thread/testWorker.mjs',
394 {
395 enableTasksQueue: true,
396 tasksQueueOptions: { concurrency: -1 }
397 }
398 )
399 ).toThrowError(
400 new RangeError(
401 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
402 )
403 )
404 expect(
405 () =>
406 new FixedThreadPool(
407 numberOfWorkers,
408 './tests/worker-files/thread/testWorker.mjs',
409 {
410 enableTasksQueue: true,
411 tasksQueueOptions: { concurrency: 0.2 }
412 }
413 )
414 ).toThrowError(
415 new TypeError('Invalid worker node tasks concurrency: must be an integer')
416 )
417 expect(
418 () =>
419 new FixedThreadPool(
420 numberOfWorkers,
421 './tests/worker-files/thread/testWorker.mjs',
422 {
423 enableTasksQueue: true,
424 tasksQueueOptions: { size: 0 }
425 }
426 )
427 ).toThrowError(
428 new RangeError(
429 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
430 )
431 )
432 expect(
433 () =>
434 new FixedThreadPool(
435 numberOfWorkers,
436 './tests/worker-files/thread/testWorker.mjs',
437 {
438 enableTasksQueue: true,
439 tasksQueueOptions: { size: -1 }
440 }
441 )
442 ).toThrowError(
443 new RangeError(
444 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
445 )
446 )
447 expect(
448 () =>
449 new FixedThreadPool(
450 numberOfWorkers,
451 './tests/worker-files/thread/testWorker.mjs',
452 {
453 enableTasksQueue: true,
454 tasksQueueOptions: { size: 0.2 }
455 }
456 )
457 ).toThrowError(
458 new TypeError('Invalid worker node tasks queue size: must be an integer')
459 )
460 })
461
462 it('Verify that pool worker choice strategy options can be set', async () => {
463 const pool = new FixedThreadPool(
464 numberOfWorkers,
465 './tests/worker-files/thread/testWorker.mjs',
466 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
467 )
468 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
469 retries: 6,
470 runTime: { median: false },
471 waitTime: { median: false },
472 elu: { median: false }
473 })
474 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
475 retries: 6,
476 runTime: { median: false },
477 waitTime: { median: false },
478 elu: { median: false }
479 })
480 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
481 .workerChoiceStrategies) {
482 expect(workerChoiceStrategy.opts).toStrictEqual({
483 retries: 6,
484 runTime: { median: false },
485 waitTime: { median: false },
486 elu: { median: false }
487 })
488 }
489 expect(
490 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 ).toStrictEqual({
492 runTime: {
493 aggregate: true,
494 average: true,
495 median: false
496 },
497 waitTime: {
498 aggregate: false,
499 average: false,
500 median: false
501 },
502 elu: {
503 aggregate: true,
504 average: true,
505 median: false
506 }
507 })
508 pool.setWorkerChoiceStrategyOptions({
509 runTime: { median: true },
510 elu: { median: true }
511 })
512 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
513 retries: 6,
514 runTime: { median: true },
515 waitTime: { median: false },
516 elu: { median: true }
517 })
518 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
519 retries: 6,
520 runTime: { median: true },
521 waitTime: { median: false },
522 elu: { median: true }
523 })
524 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
525 .workerChoiceStrategies) {
526 expect(workerChoiceStrategy.opts).toStrictEqual({
527 retries: 6,
528 runTime: { median: true },
529 waitTime: { median: false },
530 elu: { median: true }
531 })
532 }
533 expect(
534 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
535 ).toStrictEqual({
536 runTime: {
537 aggregate: true,
538 average: false,
539 median: true
540 },
541 waitTime: {
542 aggregate: false,
543 average: false,
544 median: false
545 },
546 elu: {
547 aggregate: true,
548 average: false,
549 median: true
550 }
551 })
552 pool.setWorkerChoiceStrategyOptions({
553 runTime: { median: false },
554 elu: { median: false }
555 })
556 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
557 retries: 6,
558 runTime: { median: false },
559 waitTime: { median: false },
560 elu: { median: false }
561 })
562 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
563 retries: 6,
564 runTime: { median: false },
565 waitTime: { median: false },
566 elu: { median: false }
567 })
568 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
569 .workerChoiceStrategies) {
570 expect(workerChoiceStrategy.opts).toStrictEqual({
571 retries: 6,
572 runTime: { median: false },
573 waitTime: { median: false },
574 elu: { median: false }
575 })
576 }
577 expect(
578 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
579 ).toStrictEqual({
580 runTime: {
581 aggregate: true,
582 average: true,
583 median: false
584 },
585 waitTime: {
586 aggregate: false,
587 average: false,
588 median: false
589 },
590 elu: {
591 aggregate: true,
592 average: true,
593 median: false
594 }
595 })
596 expect(() =>
597 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
598 ).toThrowError(
599 new TypeError(
600 'Invalid worker choice strategy options: must be a plain object'
601 )
602 )
603 expect(() =>
604 pool.setWorkerChoiceStrategyOptions({
605 retries: 'invalidChoiceRetries'
606 })
607 ).toThrowError(
608 new TypeError(
609 'Invalid worker choice strategy options: retries must be an integer'
610 )
611 )
612 expect(() =>
613 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
614 ).toThrowError(
615 new RangeError(
616 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
617 )
618 )
619 expect(() =>
620 pool.setWorkerChoiceStrategyOptions({ weights: {} })
621 ).toThrowError(
622 new Error(
623 'Invalid worker choice strategy options: must have a weight for each worker node'
624 )
625 )
626 expect(() =>
627 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
628 ).toThrowError(
629 new Error(
630 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
631 )
632 )
633 await pool.destroy()
634 })
635
636 it('Verify that pool tasks queue can be enabled/disabled', async () => {
637 const pool = new FixedThreadPool(
638 numberOfWorkers,
639 './tests/worker-files/thread/testWorker.mjs'
640 )
641 expect(pool.opts.enableTasksQueue).toBe(false)
642 expect(pool.opts.tasksQueueOptions).toBeUndefined()
643 for (const workerNode of pool.workerNodes) {
644 expect(workerNode.onEmptyQueue).toBeUndefined()
645 expect(workerNode.onBackPressure).toBeUndefined()
646 }
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 concurrency: 1,
651 size: Math.pow(numberOfWorkers, 2),
652 taskStealing: true,
653 tasksStealingOnBackPressure: true
654 })
655 for (const workerNode of pool.workerNodes) {
656 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
657 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
658 }
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 2,
663 size: Math.pow(numberOfWorkers, 2),
664 taskStealing: true,
665 tasksStealingOnBackPressure: true
666 })
667 for (const workerNode of pool.workerNodes) {
668 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
669 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
670 }
671 pool.enableTasksQueue(false)
672 expect(pool.opts.enableTasksQueue).toBe(false)
673 expect(pool.opts.tasksQueueOptions).toBeUndefined()
674 for (const workerNode of pool.workerNodes) {
675 expect(workerNode.onEmptyQueue).toBeUndefined()
676 expect(workerNode.onBackPressure).toBeUndefined()
677 }
678 await pool.destroy()
679 })
680
681 it('Verify that pool tasks queue options can be set', async () => {
682 const pool = new FixedThreadPool(
683 numberOfWorkers,
684 './tests/worker-files/thread/testWorker.mjs',
685 { enableTasksQueue: true }
686 )
687 expect(pool.opts.tasksQueueOptions).toStrictEqual({
688 concurrency: 1,
689 size: Math.pow(numberOfWorkers, 2),
690 taskStealing: true,
691 tasksStealingOnBackPressure: true
692 })
693 for (const workerNode of pool.workerNodes) {
694 expect(workerNode.tasksQueueBackPressureSize).toBe(
695 pool.opts.tasksQueueOptions.size
696 )
697 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
698 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
699 }
700 pool.setTasksQueueOptions({
701 concurrency: 2,
702 size: 2,
703 taskStealing: false,
704 tasksStealingOnBackPressure: false
705 })
706 expect(pool.opts.tasksQueueOptions).toStrictEqual({
707 concurrency: 2,
708 size: 2,
709 taskStealing: false,
710 tasksStealingOnBackPressure: false
711 })
712 for (const workerNode of pool.workerNodes) {
713 expect(workerNode.tasksQueueBackPressureSize).toBe(
714 pool.opts.tasksQueueOptions.size
715 )
716 expect(workerNode.onEmptyQueue).toBeUndefined()
717 expect(workerNode.onBackPressure).toBeUndefined()
718 }
719 pool.setTasksQueueOptions({
720 concurrency: 1,
721 taskStealing: true,
722 tasksStealingOnBackPressure: true
723 })
724 expect(pool.opts.tasksQueueOptions).toStrictEqual({
725 concurrency: 1,
726 size: Math.pow(numberOfWorkers, 2),
727 taskStealing: true,
728 tasksStealingOnBackPressure: true
729 })
730 for (const workerNode of pool.workerNodes) {
731 expect(workerNode.tasksQueueBackPressureSize).toBe(
732 pool.opts.tasksQueueOptions.size
733 )
734 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
735 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
736 }
737 expect(() =>
738 pool.setTasksQueueOptions('invalidTasksQueueOptions')
739 ).toThrowError(
740 new TypeError('Invalid tasks queue options: must be a plain object')
741 )
742 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
743 new RangeError(
744 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
745 )
746 )
747 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
748 new RangeError(
749 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
750 )
751 )
752 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
753 new TypeError('Invalid worker node tasks concurrency: must be an integer')
754 )
755 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
756 new RangeError(
757 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
758 )
759 )
760 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
761 new RangeError(
762 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
763 )
764 )
765 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
766 new TypeError('Invalid worker node tasks queue size: must be an integer')
767 )
768 await pool.destroy()
769 })
770
771 it('Verify that pool info is set', async () => {
772 let pool = new FixedThreadPool(
773 numberOfWorkers,
774 './tests/worker-files/thread/testWorker.mjs'
775 )
776 expect(pool.info).toStrictEqual({
777 version,
778 type: PoolTypes.fixed,
779 worker: WorkerTypes.thread,
780 started: true,
781 ready: true,
782 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
783 minSize: numberOfWorkers,
784 maxSize: numberOfWorkers,
785 workerNodes: numberOfWorkers,
786 idleWorkerNodes: numberOfWorkers,
787 busyWorkerNodes: 0,
788 executedTasks: 0,
789 executingTasks: 0,
790 failedTasks: 0
791 })
792 await pool.destroy()
793 pool = new DynamicClusterPool(
794 Math.floor(numberOfWorkers / 2),
795 numberOfWorkers,
796 './tests/worker-files/cluster/testWorker.js'
797 )
798 expect(pool.info).toStrictEqual({
799 version,
800 type: PoolTypes.dynamic,
801 worker: WorkerTypes.cluster,
802 started: true,
803 ready: true,
804 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
805 minSize: Math.floor(numberOfWorkers / 2),
806 maxSize: numberOfWorkers,
807 workerNodes: Math.floor(numberOfWorkers / 2),
808 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
809 busyWorkerNodes: 0,
810 executedTasks: 0,
811 executingTasks: 0,
812 failedTasks: 0
813 })
814 await pool.destroy()
815 })
816
817 it('Verify that pool worker tasks usage are initialized', async () => {
818 const pool = new FixedClusterPool(
819 numberOfWorkers,
820 './tests/worker-files/cluster/testWorker.js'
821 )
822 for (const workerNode of pool.workerNodes) {
823 expect(workerNode).toBeInstanceOf(WorkerNode)
824 expect(workerNode.usage).toStrictEqual({
825 tasks: {
826 executed: 0,
827 executing: 0,
828 queued: 0,
829 maxQueued: 0,
830 stolen: 0,
831 failed: 0
832 },
833 runTime: {
834 history: new CircularArray()
835 },
836 waitTime: {
837 history: new CircularArray()
838 },
839 elu: {
840 idle: {
841 history: new CircularArray()
842 },
843 active: {
844 history: new CircularArray()
845 }
846 }
847 })
848 }
849 await pool.destroy()
850 })
851
852 it('Verify that pool worker tasks queue are initialized', async () => {
853 let pool = new FixedClusterPool(
854 numberOfWorkers,
855 './tests/worker-files/cluster/testWorker.js'
856 )
857 for (const workerNode of pool.workerNodes) {
858 expect(workerNode).toBeInstanceOf(WorkerNode)
859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
860 expect(workerNode.tasksQueue.size).toBe(0)
861 expect(workerNode.tasksQueue.maxSize).toBe(0)
862 }
863 await pool.destroy()
864 pool = new DynamicThreadPool(
865 Math.floor(numberOfWorkers / 2),
866 numberOfWorkers,
867 './tests/worker-files/thread/testWorker.mjs'
868 )
869 for (const workerNode of pool.workerNodes) {
870 expect(workerNode).toBeInstanceOf(WorkerNode)
871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
872 expect(workerNode.tasksQueue.size).toBe(0)
873 expect(workerNode.tasksQueue.maxSize).toBe(0)
874 }
875 await pool.destroy()
876 })
877
878 it('Verify that pool worker info are initialized', async () => {
879 let pool = new FixedClusterPool(
880 numberOfWorkers,
881 './tests/worker-files/cluster/testWorker.js'
882 )
883 for (const workerNode of pool.workerNodes) {
884 expect(workerNode).toBeInstanceOf(WorkerNode)
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.cluster,
888 dynamic: false,
889 ready: true
890 })
891 }
892 await pool.destroy()
893 pool = new DynamicThreadPool(
894 Math.floor(numberOfWorkers / 2),
895 numberOfWorkers,
896 './tests/worker-files/thread/testWorker.mjs'
897 )
898 for (const workerNode of pool.workerNodes) {
899 expect(workerNode).toBeInstanceOf(WorkerNode)
900 expect(workerNode.info).toStrictEqual({
901 id: expect.any(Number),
902 type: WorkerTypes.thread,
903 dynamic: false,
904 ready: true
905 })
906 }
907 await pool.destroy()
908 })
909
910 it('Verify that pool can be started after initialization', async () => {
911 const pool = new FixedClusterPool(
912 numberOfWorkers,
913 './tests/worker-files/cluster/testWorker.js',
914 {
915 startWorkers: false
916 }
917 )
918 expect(pool.info.started).toBe(false)
919 expect(pool.info.ready).toBe(false)
920 expect(pool.workerNodes).toStrictEqual([])
921 await expect(pool.execute()).rejects.toThrowError(
922 new Error('Cannot execute a task on not started pool')
923 )
924 pool.start()
925 expect(pool.info.started).toBe(true)
926 expect(pool.info.ready).toBe(true)
927 expect(pool.workerNodes.length).toBe(numberOfWorkers)
928 for (const workerNode of pool.workerNodes) {
929 expect(workerNode).toBeInstanceOf(WorkerNode)
930 }
931 await pool.destroy()
932 })
933
934 it('Verify that pool execute() arguments are checked', async () => {
935 const pool = new FixedClusterPool(
936 numberOfWorkers,
937 './tests/worker-files/cluster/testWorker.js'
938 )
939 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
940 new TypeError('name argument must be a string')
941 )
942 await expect(pool.execute(undefined, '')).rejects.toThrowError(
943 new TypeError('name argument must not be an empty string')
944 )
945 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
946 new TypeError('transferList argument must be an array')
947 )
948 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
949 "Task function 'unknown' not found"
950 )
951 await pool.destroy()
952 await expect(pool.execute()).rejects.toThrowError(
953 new Error('Cannot execute a task on not started pool')
954 )
955 })
956
957 it('Verify that pool worker tasks usage are computed', async () => {
958 const pool = new FixedClusterPool(
959 numberOfWorkers,
960 './tests/worker-files/cluster/testWorker.js'
961 )
962 const promises = new Set()
963 const maxMultiplier = 2
964 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
965 promises.add(pool.execute())
966 }
967 for (const workerNode of pool.workerNodes) {
968 expect(workerNode.usage).toStrictEqual({
969 tasks: {
970 executed: 0,
971 executing: maxMultiplier,
972 queued: 0,
973 maxQueued: 0,
974 stolen: 0,
975 failed: 0
976 },
977 runTime: {
978 history: expect.any(CircularArray)
979 },
980 waitTime: {
981 history: expect.any(CircularArray)
982 },
983 elu: {
984 idle: {
985 history: expect.any(CircularArray)
986 },
987 active: {
988 history: expect.any(CircularArray)
989 }
990 }
991 })
992 }
993 await Promise.all(promises)
994 for (const workerNode of pool.workerNodes) {
995 expect(workerNode.usage).toStrictEqual({
996 tasks: {
997 executed: maxMultiplier,
998 executing: 0,
999 queued: 0,
1000 maxQueued: 0,
1001 stolen: 0,
1002 failed: 0
1003 },
1004 runTime: {
1005 history: expect.any(CircularArray)
1006 },
1007 waitTime: {
1008 history: expect.any(CircularArray)
1009 },
1010 elu: {
1011 idle: {
1012 history: expect.any(CircularArray)
1013 },
1014 active: {
1015 history: expect.any(CircularArray)
1016 }
1017 }
1018 })
1019 }
1020 await pool.destroy()
1021 })
1022
1023 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1024 const pool = new DynamicThreadPool(
1025 Math.floor(numberOfWorkers / 2),
1026 numberOfWorkers,
1027 './tests/worker-files/thread/testWorker.mjs'
1028 )
1029 const promises = new Set()
1030 const maxMultiplier = 2
1031 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1032 promises.add(pool.execute())
1033 }
1034 await Promise.all(promises)
1035 for (const workerNode of pool.workerNodes) {
1036 expect(workerNode.usage).toStrictEqual({
1037 tasks: {
1038 executed: expect.any(Number),
1039 executing: 0,
1040 queued: 0,
1041 maxQueued: 0,
1042 stolen: 0,
1043 failed: 0
1044 },
1045 runTime: {
1046 history: expect.any(CircularArray)
1047 },
1048 waitTime: {
1049 history: expect.any(CircularArray)
1050 },
1051 elu: {
1052 idle: {
1053 history: expect.any(CircularArray)
1054 },
1055 active: {
1056 history: expect.any(CircularArray)
1057 }
1058 }
1059 })
1060 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1061 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1062 numberOfWorkers * maxMultiplier
1063 )
1064 expect(workerNode.usage.runTime.history.length).toBe(0)
1065 expect(workerNode.usage.waitTime.history.length).toBe(0)
1066 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1067 expect(workerNode.usage.elu.active.history.length).toBe(0)
1068 }
1069 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1070 for (const workerNode of pool.workerNodes) {
1071 expect(workerNode.usage).toStrictEqual({
1072 tasks: {
1073 executed: 0,
1074 executing: 0,
1075 queued: 0,
1076 maxQueued: 0,
1077 stolen: 0,
1078 failed: 0
1079 },
1080 runTime: {
1081 history: expect.any(CircularArray)
1082 },
1083 waitTime: {
1084 history: expect.any(CircularArray)
1085 },
1086 elu: {
1087 idle: {
1088 history: expect.any(CircularArray)
1089 },
1090 active: {
1091 history: expect.any(CircularArray)
1092 }
1093 }
1094 })
1095 expect(workerNode.usage.runTime.history.length).toBe(0)
1096 expect(workerNode.usage.waitTime.history.length).toBe(0)
1097 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1098 expect(workerNode.usage.elu.active.history.length).toBe(0)
1099 }
1100 await pool.destroy()
1101 })
1102
1103 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1104 const pool = new DynamicClusterPool(
1105 Math.floor(numberOfWorkers / 2),
1106 numberOfWorkers,
1107 './tests/worker-files/cluster/testWorker.js'
1108 )
1109 expect(pool.emitter.eventNames()).toStrictEqual([])
1110 let poolInfo
1111 let poolReady = 0
1112 pool.emitter.on(PoolEvents.ready, info => {
1113 ++poolReady
1114 poolInfo = info
1115 })
1116 await waitPoolEvents(pool, PoolEvents.ready, 1)
1117 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1118 expect(poolReady).toBe(1)
1119 expect(poolInfo).toStrictEqual({
1120 version,
1121 type: PoolTypes.dynamic,
1122 worker: WorkerTypes.cluster,
1123 started: true,
1124 ready: true,
1125 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1126 minSize: expect.any(Number),
1127 maxSize: expect.any(Number),
1128 workerNodes: expect.any(Number),
1129 idleWorkerNodes: expect.any(Number),
1130 busyWorkerNodes: expect.any(Number),
1131 executedTasks: expect.any(Number),
1132 executingTasks: expect.any(Number),
1133 failedTasks: expect.any(Number)
1134 })
1135 await pool.destroy()
1136 })
1137
1138 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1139 const pool = new FixedThreadPool(
1140 numberOfWorkers,
1141 './tests/worker-files/thread/testWorker.mjs'
1142 )
1143 expect(pool.emitter.eventNames()).toStrictEqual([])
1144 const promises = new Set()
1145 let poolBusy = 0
1146 let poolInfo
1147 pool.emitter.on(PoolEvents.busy, info => {
1148 ++poolBusy
1149 poolInfo = info
1150 })
1151 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1152 for (let i = 0; i < numberOfWorkers * 2; i++) {
1153 promises.add(pool.execute())
1154 }
1155 await Promise.all(promises)
1156 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1157 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1158 expect(poolBusy).toBe(numberOfWorkers + 1)
1159 expect(poolInfo).toStrictEqual({
1160 version,
1161 type: PoolTypes.fixed,
1162 worker: WorkerTypes.thread,
1163 started: true,
1164 ready: true,
1165 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1166 minSize: expect.any(Number),
1167 maxSize: expect.any(Number),
1168 workerNodes: expect.any(Number),
1169 idleWorkerNodes: expect.any(Number),
1170 busyWorkerNodes: expect.any(Number),
1171 executedTasks: expect.any(Number),
1172 executingTasks: expect.any(Number),
1173 failedTasks: expect.any(Number)
1174 })
1175 await pool.destroy()
1176 })
1177
1178 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1179 const pool = new DynamicThreadPool(
1180 Math.floor(numberOfWorkers / 2),
1181 numberOfWorkers,
1182 './tests/worker-files/thread/testWorker.mjs'
1183 )
1184 expect(pool.emitter.eventNames()).toStrictEqual([])
1185 const promises = new Set()
1186 let poolFull = 0
1187 let poolInfo
1188 pool.emitter.on(PoolEvents.full, info => {
1189 ++poolFull
1190 poolInfo = info
1191 })
1192 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1193 for (let i = 0; i < numberOfWorkers * 2; i++) {
1194 promises.add(pool.execute())
1195 }
1196 await Promise.all(promises)
1197 expect(poolFull).toBe(1)
1198 expect(poolInfo).toStrictEqual({
1199 version,
1200 type: PoolTypes.dynamic,
1201 worker: WorkerTypes.thread,
1202 started: true,
1203 ready: true,
1204 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1205 minSize: expect.any(Number),
1206 maxSize: expect.any(Number),
1207 workerNodes: expect.any(Number),
1208 idleWorkerNodes: expect.any(Number),
1209 busyWorkerNodes: expect.any(Number),
1210 executedTasks: expect.any(Number),
1211 executingTasks: expect.any(Number),
1212 failedTasks: expect.any(Number)
1213 })
1214 await pool.destroy()
1215 })
1216
1217 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1218 const pool = new FixedThreadPool(
1219 numberOfWorkers,
1220 './tests/worker-files/thread/testWorker.mjs',
1221 {
1222 enableTasksQueue: true
1223 }
1224 )
1225 stub(pool, 'hasBackPressure').returns(true)
1226 expect(pool.emitter.eventNames()).toStrictEqual([])
1227 const promises = new Set()
1228 let poolBackPressure = 0
1229 let poolInfo
1230 pool.emitter.on(PoolEvents.backPressure, info => {
1231 ++poolBackPressure
1232 poolInfo = info
1233 })
1234 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1235 for (let i = 0; i < numberOfWorkers + 1; i++) {
1236 promises.add(pool.execute())
1237 }
1238 await Promise.all(promises)
1239 expect(poolBackPressure).toBe(1)
1240 expect(poolInfo).toStrictEqual({
1241 version,
1242 type: PoolTypes.fixed,
1243 worker: WorkerTypes.thread,
1244 started: true,
1245 ready: true,
1246 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1247 minSize: expect.any(Number),
1248 maxSize: expect.any(Number),
1249 workerNodes: expect.any(Number),
1250 idleWorkerNodes: expect.any(Number),
1251 busyWorkerNodes: expect.any(Number),
1252 executedTasks: expect.any(Number),
1253 executingTasks: expect.any(Number),
1254 maxQueuedTasks: expect.any(Number),
1255 queuedTasks: expect.any(Number),
1256 backPressure: true,
1257 stolenTasks: expect.any(Number),
1258 failedTasks: expect.any(Number)
1259 })
1260 expect(pool.hasBackPressure.called).toBe(true)
1261 await pool.destroy()
1262 })
1263
1264 it('Verify that hasTaskFunction() is working', async () => {
1265 const dynamicThreadPool = new DynamicThreadPool(
1266 Math.floor(numberOfWorkers / 2),
1267 numberOfWorkers,
1268 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1269 )
1270 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1271 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1272 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1273 true
1274 )
1275 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1276 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1278 await dynamicThreadPool.destroy()
1279 const fixedClusterPool = new FixedClusterPool(
1280 numberOfWorkers,
1281 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1282 )
1283 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1284 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1285 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1286 true
1287 )
1288 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1289 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1291 await fixedClusterPool.destroy()
1292 })
1293
1294 it('Verify that addTaskFunction() is working', async () => {
1295 const dynamicThreadPool = new DynamicThreadPool(
1296 Math.floor(numberOfWorkers / 2),
1297 numberOfWorkers,
1298 './tests/worker-files/thread/testWorker.mjs'
1299 )
1300 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1301 await expect(
1302 dynamicThreadPool.addTaskFunction(0, () => {})
1303 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1304 await expect(
1305 dynamicThreadPool.addTaskFunction('', () => {})
1306 ).rejects.toThrowError(
1307 new TypeError('name argument must not be an empty string')
1308 )
1309 await expect(
1310 dynamicThreadPool.addTaskFunction('test', 0)
1311 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1312 await expect(
1313 dynamicThreadPool.addTaskFunction('test', '')
1314 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1315 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1316 DEFAULT_TASK_NAME,
1317 'test'
1318 ])
1319 const echoTaskFunction = data => {
1320 return data
1321 }
1322 await expect(
1323 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1324 ).resolves.toBe(true)
1325 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1326 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1327 echoTaskFunction
1328 )
1329 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1330 DEFAULT_TASK_NAME,
1331 'test',
1332 'echo'
1333 ])
1334 const taskFunctionData = { test: 'test' }
1335 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1336 expect(echoResult).toStrictEqual(taskFunctionData)
1337 for (const workerNode of dynamicThreadPool.workerNodes) {
1338 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1339 tasks: {
1340 executed: expect.any(Number),
1341 executing: 0,
1342 queued: 0,
1343 stolen: 0,
1344 failed: 0
1345 },
1346 runTime: {
1347 history: new CircularArray()
1348 },
1349 waitTime: {
1350 history: new CircularArray()
1351 },
1352 elu: {
1353 idle: {
1354 history: new CircularArray()
1355 },
1356 active: {
1357 history: new CircularArray()
1358 }
1359 }
1360 })
1361 }
1362 await dynamicThreadPool.destroy()
1363 })
1364
1365 it('Verify that removeTaskFunction() is working', async () => {
1366 const dynamicThreadPool = new DynamicThreadPool(
1367 Math.floor(numberOfWorkers / 2),
1368 numberOfWorkers,
1369 './tests/worker-files/thread/testWorker.mjs'
1370 )
1371 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1372 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1373 DEFAULT_TASK_NAME,
1374 'test'
1375 ])
1376 await expect(
1377 dynamicThreadPool.removeTaskFunction('test')
1378 ).rejects.toThrowError(
1379 new Error('Cannot remove a task function not handled on the pool side')
1380 )
1381 const echoTaskFunction = data => {
1382 return data
1383 }
1384 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1385 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1386 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1387 echoTaskFunction
1388 )
1389 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1390 DEFAULT_TASK_NAME,
1391 'test',
1392 'echo'
1393 ])
1394 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1395 true
1396 )
1397 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1398 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1399 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1400 DEFAULT_TASK_NAME,
1401 'test'
1402 ])
1403 await dynamicThreadPool.destroy()
1404 })
1405
1406 it('Verify that listTaskFunctionNames() is working', async () => {
1407 const dynamicThreadPool = new DynamicThreadPool(
1408 Math.floor(numberOfWorkers / 2),
1409 numberOfWorkers,
1410 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1411 )
1412 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1413 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1414 DEFAULT_TASK_NAME,
1415 'jsonIntegerSerialization',
1416 'factorial',
1417 'fibonacci'
1418 ])
1419 await dynamicThreadPool.destroy()
1420 const fixedClusterPool = new FixedClusterPool(
1421 numberOfWorkers,
1422 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1423 )
1424 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1425 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1426 DEFAULT_TASK_NAME,
1427 'jsonIntegerSerialization',
1428 'factorial',
1429 'fibonacci'
1430 ])
1431 await fixedClusterPool.destroy()
1432 })
1433
1434 it('Verify that setDefaultTaskFunction() is working', async () => {
1435 const dynamicThreadPool = new DynamicThreadPool(
1436 Math.floor(numberOfWorkers / 2),
1437 numberOfWorkers,
1438 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1439 )
1440 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1441 await expect(
1442 dynamicThreadPool.setDefaultTaskFunction(0)
1443 ).rejects.toThrowError(
1444 new Error(
1445 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1446 )
1447 )
1448 await expect(
1449 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1450 ).rejects.toThrowError(
1451 new Error(
1452 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1453 )
1454 )
1455 await expect(
1456 dynamicThreadPool.setDefaultTaskFunction('unknown')
1457 ).rejects.toThrowError(
1458 new Error(
1459 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1460 )
1461 )
1462 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1463 DEFAULT_TASK_NAME,
1464 'jsonIntegerSerialization',
1465 'factorial',
1466 'fibonacci'
1467 ])
1468 await expect(
1469 dynamicThreadPool.setDefaultTaskFunction('factorial')
1470 ).resolves.toBe(true)
1471 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1472 DEFAULT_TASK_NAME,
1473 'factorial',
1474 'jsonIntegerSerialization',
1475 'fibonacci'
1476 ])
1477 await expect(
1478 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1479 ).resolves.toBe(true)
1480 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1481 DEFAULT_TASK_NAME,
1482 'fibonacci',
1483 'jsonIntegerSerialization',
1484 'factorial'
1485 ])
1486 })
1487
1488 it('Verify that multiple task functions worker is working', async () => {
1489 const pool = new DynamicClusterPool(
1490 Math.floor(numberOfWorkers / 2),
1491 numberOfWorkers,
1492 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1493 )
1494 const data = { n: 10 }
1495 const result0 = await pool.execute(data)
1496 expect(result0).toStrictEqual({ ok: 1 })
1497 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1498 expect(result1).toStrictEqual({ ok: 1 })
1499 const result2 = await pool.execute(data, 'factorial')
1500 expect(result2).toBe(3628800)
1501 const result3 = await pool.execute(data, 'fibonacci')
1502 expect(result3).toBe(55)
1503 expect(pool.info.executingTasks).toBe(0)
1504 expect(pool.info.executedTasks).toBe(4)
1505 for (const workerNode of pool.workerNodes) {
1506 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1507 DEFAULT_TASK_NAME,
1508 'jsonIntegerSerialization',
1509 'factorial',
1510 'fibonacci'
1511 ])
1512 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1513 for (const name of pool.listTaskFunctionNames()) {
1514 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1515 tasks: {
1516 executed: expect.any(Number),
1517 executing: 0,
1518 failed: 0,
1519 queued: 0,
1520 stolen: 0
1521 },
1522 runTime: {
1523 history: expect.any(CircularArray)
1524 },
1525 waitTime: {
1526 history: expect.any(CircularArray)
1527 },
1528 elu: {
1529 idle: {
1530 history: expect.any(CircularArray)
1531 },
1532 active: {
1533 history: expect.any(CircularArray)
1534 }
1535 }
1536 })
1537 expect(
1538 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1539 ).toBeGreaterThan(0)
1540 }
1541 expect(
1542 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1543 ).toStrictEqual(
1544 workerNode.getTaskFunctionWorkerUsage(
1545 workerNode.info.taskFunctionNames[1]
1546 )
1547 )
1548 }
1549 await pool.destroy()
1550 })
1551
1552 it('Verify sendKillMessageToWorker()', async () => {
1553 const pool = new DynamicClusterPool(
1554 Math.floor(numberOfWorkers / 2),
1555 numberOfWorkers,
1556 './tests/worker-files/cluster/testWorker.js'
1557 )
1558 const workerNodeKey = 0
1559 await expect(
1560 pool.sendKillMessageToWorker(workerNodeKey)
1561 ).resolves.toBeUndefined()
1562 await pool.destroy()
1563 })
1564
1565 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1566 const pool = new DynamicClusterPool(
1567 Math.floor(numberOfWorkers / 2),
1568 numberOfWorkers,
1569 './tests/worker-files/cluster/testWorker.js'
1570 )
1571 const workerNodeKey = 0
1572 await expect(
1573 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1574 taskFunctionOperation: 'add',
1575 taskFunctionName: 'empty',
1576 taskFunction: (() => {}).toString()
1577 })
1578 ).resolves.toBe(true)
1579 expect(
1580 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1581 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1582 await pool.destroy()
1583 })
1584
1585 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1586 const pool = new DynamicClusterPool(
1587 Math.floor(numberOfWorkers / 2),
1588 numberOfWorkers,
1589 './tests/worker-files/cluster/testWorker.js'
1590 )
1591 await expect(
1592 pool.sendTaskFunctionOperationToWorkers({
1593 taskFunctionOperation: 'add',
1594 taskFunctionName: 'empty',
1595 taskFunction: (() => {}).toString()
1596 })
1597 ).resolves.toBe(true)
1598 for (const workerNode of pool.workerNodes) {
1599 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1600 DEFAULT_TASK_NAME,
1601 'test',
1602 'empty'
1603 ])
1604 }
1605 await pool.destroy()
1606 })
1607 })