Merge dependabot/npm_and_yarn/examples/typescript/http-client-pool/types/node-20...
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
7 import {
8 DynamicClusterPool,
9 DynamicThreadPool,
10 FixedClusterPool,
11 FixedThreadPool,
12 PoolEvents,
13 PoolTypes,
14 WorkerChoiceStrategies,
15 WorkerTypes
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
22
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
25 readFileSync(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
27 'utf8'
28 )
29 ).version
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
32 isMain () {
33 return false
34 }
35 }
36
37 afterEach(() => {
38 restore()
39 })
40
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
43 numberOfWorkers,
44 './tests/worker-files/thread/testWorker.mjs'
45 )
46 expect(pool).toBeInstanceOf(FixedThreadPool)
47 await pool.destroy()
48 })
49
50 it('Verify that pool cannot be created from a non main thread/process', () => {
51 expect(
52 () =>
53 new StubPoolWithIsMain(
54 numberOfWorkers,
55 './tests/worker-files/thread/testWorker.mjs',
56 {
57 errorHandler: e => console.error(e)
58 }
59 )
60 ).toThrow(
61 new Error(
62 'Cannot start a pool from a worker with the same type as the pool'
63 )
64 )
65 })
66
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
69 numberOfWorkers,
70 './tests/worker-files/thread/testWorker.mjs'
71 )
72 expect(pool.starting).toBe(false)
73 expect(pool.started).toBe(true)
74 await pool.destroy()
75 })
76
77 it('Verify that filePath is checked', () => {
78 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
79 new Error("Cannot find the worker file 'undefined'")
80 )
81 expect(
82 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
83 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
84 })
85
86 it('Verify that numberOfWorkers is checked', () => {
87 expect(
88 () =>
89 new FixedThreadPool(
90 undefined,
91 './tests/worker-files/thread/testWorker.mjs'
92 )
93 ).toThrow(
94 new Error(
95 'Cannot instantiate a pool without specifying the number of workers'
96 )
97 )
98 })
99
100 it('Verify that a negative number of workers is checked', () => {
101 expect(
102 () =>
103 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
104 ).toThrow(
105 new RangeError(
106 'Cannot instantiate a pool with a negative number of workers'
107 )
108 )
109 })
110
111 it('Verify that a non integer number of workers is checked', () => {
112 expect(
113 () =>
114 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
115 ).toThrow(
116 new TypeError(
117 'Cannot instantiate a pool with a non safe integer number of workers'
118 )
119 )
120 })
121
122 it('Verify that dynamic pool sizing is checked', () => {
123 expect(
124 () =>
125 new DynamicClusterPool(
126 1,
127 undefined,
128 './tests/worker-files/cluster/testWorker.js'
129 )
130 ).toThrow(
131 new TypeError(
132 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
133 )
134 )
135 expect(
136 () =>
137 new DynamicThreadPool(
138 0.5,
139 1,
140 './tests/worker-files/thread/testWorker.mjs'
141 )
142 ).toThrow(
143 new TypeError(
144 'Cannot instantiate a pool with a non safe integer number of workers'
145 )
146 )
147 expect(
148 () =>
149 new DynamicClusterPool(
150 0,
151 0.5,
152 './tests/worker-files/cluster/testWorker.js'
153 )
154 ).toThrow(
155 new TypeError(
156 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
157 )
158 )
159 expect(
160 () =>
161 new DynamicThreadPool(
162 2,
163 1,
164 './tests/worker-files/thread/testWorker.mjs'
165 )
166 ).toThrow(
167 new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
169 )
170 )
171 expect(
172 () =>
173 new DynamicThreadPool(
174 0,
175 0,
176 './tests/worker-files/thread/testWorker.mjs'
177 )
178 ).toThrow(
179 new RangeError(
180 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
181 )
182 )
183 expect(
184 () =>
185 new DynamicClusterPool(
186 1,
187 1,
188 './tests/worker-files/cluster/testWorker.js'
189 )
190 ).toThrow(
191 new RangeError(
192 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
193 )
194 )
195 })
196
197 it('Verify that pool options are checked', async () => {
198 let pool = new FixedThreadPool(
199 numberOfWorkers,
200 './tests/worker-files/thread/testWorker.mjs'
201 )
202 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
203 expect(pool.opts).toStrictEqual({
204 startWorkers: true,
205 enableEvents: true,
206 restartWorkerOnError: true,
207 enableTasksQueue: false,
208 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
209 workerChoiceStrategyOptions: {
210 retries: 6,
211 runTime: { median: false },
212 waitTime: { median: false },
213 elu: { median: false }
214 }
215 })
216 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
217 retries: 6,
218 runTime: { median: false },
219 waitTime: { median: false },
220 elu: { median: false }
221 })
222 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
223 .workerChoiceStrategies) {
224 expect(workerChoiceStrategy.opts).toStrictEqual({
225 retries: 6,
226 runTime: { median: false },
227 waitTime: { median: false },
228 elu: { median: false }
229 })
230 }
231 await pool.destroy()
232 const testHandler = () => console.info('test handler executed')
233 pool = new FixedThreadPool(
234 numberOfWorkers,
235 './tests/worker-files/thread/testWorker.mjs',
236 {
237 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
238 workerChoiceStrategyOptions: {
239 runTime: { median: true },
240 weights: { 0: 300, 1: 200 }
241 },
242 enableEvents: false,
243 restartWorkerOnError: false,
244 enableTasksQueue: true,
245 tasksQueueOptions: { concurrency: 2 },
246 messageHandler: testHandler,
247 errorHandler: testHandler,
248 onlineHandler: testHandler,
249 exitHandler: testHandler
250 }
251 )
252 expect(pool.emitter).toBeUndefined()
253 expect(pool.opts).toStrictEqual({
254 startWorkers: true,
255 enableEvents: false,
256 restartWorkerOnError: false,
257 enableTasksQueue: true,
258 tasksQueueOptions: {
259 concurrency: 2,
260 size: Math.pow(numberOfWorkers, 2),
261 taskStealing: true,
262 tasksStealingOnBackPressure: true
263 },
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
266 retries: 6,
267 runTime: { median: true },
268 waitTime: { median: false },
269 elu: { median: false },
270 weights: { 0: 300, 1: 200 }
271 },
272 onlineHandler: testHandler,
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 exitHandler: testHandler
276 })
277 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
278 retries: 6,
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
283 })
284 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
285 .workerChoiceStrategies) {
286 expect(workerChoiceStrategy.opts).toStrictEqual({
287 retries: 6,
288 runTime: { median: true },
289 waitTime: { median: false },
290 elu: { median: false },
291 weights: { 0: 300, 1: 200 }
292 })
293 }
294 await pool.destroy()
295 })
296
297 it('Verify that pool options are validated', () => {
298 expect(
299 () =>
300 new FixedThreadPool(
301 numberOfWorkers,
302 './tests/worker-files/thread/testWorker.mjs',
303 {
304 workerChoiceStrategy: 'invalidStrategy'
305 }
306 )
307 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
308 expect(
309 () =>
310 new FixedThreadPool(
311 numberOfWorkers,
312 './tests/worker-files/thread/testWorker.mjs',
313 {
314 workerChoiceStrategyOptions: {
315 retries: 'invalidChoiceRetries'
316 }
317 }
318 )
319 ).toThrow(
320 new TypeError(
321 'Invalid worker choice strategy options: retries must be an integer'
322 )
323 )
324 expect(
325 () =>
326 new FixedThreadPool(
327 numberOfWorkers,
328 './tests/worker-files/thread/testWorker.mjs',
329 {
330 workerChoiceStrategyOptions: {
331 retries: -1
332 }
333 }
334 )
335 ).toThrow(
336 new RangeError(
337 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
338 )
339 )
340 expect(
341 () =>
342 new FixedThreadPool(
343 numberOfWorkers,
344 './tests/worker-files/thread/testWorker.mjs',
345 {
346 workerChoiceStrategyOptions: { weights: {} }
347 }
348 )
349 ).toThrow(
350 new Error(
351 'Invalid worker choice strategy options: must have a weight for each worker node'
352 )
353 )
354 expect(
355 () =>
356 new FixedThreadPool(
357 numberOfWorkers,
358 './tests/worker-files/thread/testWorker.mjs',
359 {
360 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
361 }
362 )
363 ).toThrow(
364 new Error(
365 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
366 )
367 )
368 expect(
369 () =>
370 new FixedThreadPool(
371 numberOfWorkers,
372 './tests/worker-files/thread/testWorker.mjs',
373 {
374 enableTasksQueue: true,
375 tasksQueueOptions: 'invalidTasksQueueOptions'
376 }
377 )
378 ).toThrow(
379 new TypeError('Invalid tasks queue options: must be a plain object')
380 )
381 expect(
382 () =>
383 new FixedThreadPool(
384 numberOfWorkers,
385 './tests/worker-files/thread/testWorker.mjs',
386 {
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: 0 }
389 }
390 )
391 ).toThrow(
392 new RangeError(
393 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
394 )
395 )
396 expect(
397 () =>
398 new FixedThreadPool(
399 numberOfWorkers,
400 './tests/worker-files/thread/testWorker.mjs',
401 {
402 enableTasksQueue: true,
403 tasksQueueOptions: { concurrency: -1 }
404 }
405 )
406 ).toThrow(
407 new RangeError(
408 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
409 )
410 )
411 expect(
412 () =>
413 new FixedThreadPool(
414 numberOfWorkers,
415 './tests/worker-files/thread/testWorker.mjs',
416 {
417 enableTasksQueue: true,
418 tasksQueueOptions: { concurrency: 0.2 }
419 }
420 )
421 ).toThrow(
422 new TypeError('Invalid worker node tasks concurrency: must be an integer')
423 )
424 expect(
425 () =>
426 new FixedThreadPool(
427 numberOfWorkers,
428 './tests/worker-files/thread/testWorker.mjs',
429 {
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: 0 }
432 }
433 )
434 ).toThrow(
435 new RangeError(
436 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
437 )
438 )
439 expect(
440 () =>
441 new FixedThreadPool(
442 numberOfWorkers,
443 './tests/worker-files/thread/testWorker.mjs',
444 {
445 enableTasksQueue: true,
446 tasksQueueOptions: { size: -1 }
447 }
448 )
449 ).toThrow(
450 new RangeError(
451 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
452 )
453 )
454 expect(
455 () =>
456 new FixedThreadPool(
457 numberOfWorkers,
458 './tests/worker-files/thread/testWorker.mjs',
459 {
460 enableTasksQueue: true,
461 tasksQueueOptions: { size: 0.2 }
462 }
463 )
464 ).toThrow(
465 new TypeError('Invalid worker node tasks queue size: must be an integer')
466 )
467 })
468
469 it('Verify that pool worker choice strategy options can be set', async () => {
470 const pool = new FixedThreadPool(
471 numberOfWorkers,
472 './tests/worker-files/thread/testWorker.mjs',
473 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
474 )
475 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
476 retries: 6,
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false }
480 })
481 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
482 retries: 6,
483 runTime: { median: false },
484 waitTime: { median: false },
485 elu: { median: false }
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual({
490 retries: 6,
491 runTime: { median: false },
492 waitTime: { median: false },
493 elu: { median: false }
494 })
495 }
496 expect(
497 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
498 ).toStrictEqual({
499 runTime: {
500 aggregate: true,
501 average: true,
502 median: false
503 },
504 waitTime: {
505 aggregate: false,
506 average: false,
507 median: false
508 },
509 elu: {
510 aggregate: true,
511 average: true,
512 median: false
513 }
514 })
515 pool.setWorkerChoiceStrategyOptions({
516 runTime: { median: true },
517 elu: { median: true }
518 })
519 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
520 retries: 6,
521 runTime: { median: true },
522 waitTime: { median: false },
523 elu: { median: true }
524 })
525 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
526 retries: 6,
527 runTime: { median: true },
528 waitTime: { median: false },
529 elu: { median: true }
530 })
531 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
532 .workerChoiceStrategies) {
533 expect(workerChoiceStrategy.opts).toStrictEqual({
534 retries: 6,
535 runTime: { median: true },
536 waitTime: { median: false },
537 elu: { median: true }
538 })
539 }
540 expect(
541 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
542 ).toStrictEqual({
543 runTime: {
544 aggregate: true,
545 average: false,
546 median: true
547 },
548 waitTime: {
549 aggregate: false,
550 average: false,
551 median: false
552 },
553 elu: {
554 aggregate: true,
555 average: false,
556 median: true
557 }
558 })
559 pool.setWorkerChoiceStrategyOptions({
560 runTime: { median: false },
561 elu: { median: false }
562 })
563 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
564 retries: 6,
565 runTime: { median: false },
566 waitTime: { median: false },
567 elu: { median: false }
568 })
569 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
570 retries: 6,
571 runTime: { median: false },
572 waitTime: { median: false },
573 elu: { median: false }
574 })
575 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
576 .workerChoiceStrategies) {
577 expect(workerChoiceStrategy.opts).toStrictEqual({
578 retries: 6,
579 runTime: { median: false },
580 waitTime: { median: false },
581 elu: { median: false }
582 })
583 }
584 expect(
585 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
586 ).toStrictEqual({
587 runTime: {
588 aggregate: true,
589 average: true,
590 median: false
591 },
592 waitTime: {
593 aggregate: false,
594 average: false,
595 median: false
596 },
597 elu: {
598 aggregate: true,
599 average: true,
600 median: false
601 }
602 })
603 expect(() =>
604 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
605 ).toThrow(
606 new TypeError(
607 'Invalid worker choice strategy options: must be a plain object'
608 )
609 )
610 expect(() =>
611 pool.setWorkerChoiceStrategyOptions({
612 retries: 'invalidChoiceRetries'
613 })
614 ).toThrow(
615 new TypeError(
616 'Invalid worker choice strategy options: retries must be an integer'
617 )
618 )
619 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
620 new RangeError(
621 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
622 )
623 )
624 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
625 new Error(
626 'Invalid worker choice strategy options: must have a weight for each worker node'
627 )
628 )
629 expect(() =>
630 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
631 ).toThrow(
632 new Error(
633 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
634 )
635 )
636 await pool.destroy()
637 })
638
639 it('Verify that pool tasks queue can be enabled/disabled', async () => {
640 const pool = new FixedThreadPool(
641 numberOfWorkers,
642 './tests/worker-files/thread/testWorker.mjs'
643 )
644 expect(pool.opts.enableTasksQueue).toBe(false)
645 expect(pool.opts.tasksQueueOptions).toBeUndefined()
646 pool.enableTasksQueue(true)
647 expect(pool.opts.enableTasksQueue).toBe(true)
648 expect(pool.opts.tasksQueueOptions).toStrictEqual({
649 concurrency: 1,
650 size: Math.pow(numberOfWorkers, 2),
651 taskStealing: true,
652 tasksStealingOnBackPressure: true
653 })
654 pool.enableTasksQueue(true, { concurrency: 2 })
655 expect(pool.opts.enableTasksQueue).toBe(true)
656 expect(pool.opts.tasksQueueOptions).toStrictEqual({
657 concurrency: 2,
658 size: Math.pow(numberOfWorkers, 2),
659 taskStealing: true,
660 tasksStealingOnBackPressure: true
661 })
662 pool.enableTasksQueue(false)
663 expect(pool.opts.enableTasksQueue).toBe(false)
664 expect(pool.opts.tasksQueueOptions).toBeUndefined()
665 await pool.destroy()
666 })
667
668 it('Verify that pool tasks queue options can be set', async () => {
669 const pool = new FixedThreadPool(
670 numberOfWorkers,
671 './tests/worker-files/thread/testWorker.mjs',
672 { enableTasksQueue: true }
673 )
674 expect(pool.opts.tasksQueueOptions).toStrictEqual({
675 concurrency: 1,
676 size: Math.pow(numberOfWorkers, 2),
677 taskStealing: true,
678 tasksStealingOnBackPressure: true
679 })
680 for (const workerNode of pool.workerNodes) {
681 expect(workerNode.tasksQueueBackPressureSize).toBe(
682 pool.opts.tasksQueueOptions.size
683 )
684 }
685 pool.setTasksQueueOptions({
686 concurrency: 2,
687 size: 2,
688 taskStealing: false,
689 tasksStealingOnBackPressure: false
690 })
691 expect(pool.opts.tasksQueueOptions).toStrictEqual({
692 concurrency: 2,
693 size: 2,
694 taskStealing: false,
695 tasksStealingOnBackPressure: false
696 })
697 for (const workerNode of pool.workerNodes) {
698 expect(workerNode.tasksQueueBackPressureSize).toBe(
699 pool.opts.tasksQueueOptions.size
700 )
701 }
702 pool.setTasksQueueOptions({
703 concurrency: 1,
704 taskStealing: true,
705 tasksStealingOnBackPressure: true
706 })
707 expect(pool.opts.tasksQueueOptions).toStrictEqual({
708 concurrency: 1,
709 size: Math.pow(numberOfWorkers, 2),
710 taskStealing: true,
711 tasksStealingOnBackPressure: true
712 })
713 for (const workerNode of pool.workerNodes) {
714 expect(workerNode.tasksQueueBackPressureSize).toBe(
715 pool.opts.tasksQueueOptions.size
716 )
717 }
718 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
719 new TypeError('Invalid tasks queue options: must be a plain object')
720 )
721 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
722 new RangeError(
723 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
724 )
725 )
726 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
727 new RangeError(
728 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
729 )
730 )
731 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
732 new TypeError('Invalid worker node tasks concurrency: must be an integer')
733 )
734 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
735 new RangeError(
736 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
737 )
738 )
739 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
740 new RangeError(
741 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
742 )
743 )
744 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
745 new TypeError('Invalid worker node tasks queue size: must be an integer')
746 )
747 await pool.destroy()
748 })
749
750 it('Verify that pool info is set', async () => {
751 let pool = new FixedThreadPool(
752 numberOfWorkers,
753 './tests/worker-files/thread/testWorker.mjs'
754 )
755 expect(pool.info).toStrictEqual({
756 version,
757 type: PoolTypes.fixed,
758 worker: WorkerTypes.thread,
759 started: true,
760 ready: true,
761 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
762 minSize: numberOfWorkers,
763 maxSize: numberOfWorkers,
764 workerNodes: numberOfWorkers,
765 idleWorkerNodes: numberOfWorkers,
766 busyWorkerNodes: 0,
767 executedTasks: 0,
768 executingTasks: 0,
769 failedTasks: 0
770 })
771 await pool.destroy()
772 pool = new DynamicClusterPool(
773 Math.floor(numberOfWorkers / 2),
774 numberOfWorkers,
775 './tests/worker-files/cluster/testWorker.js'
776 )
777 expect(pool.info).toStrictEqual({
778 version,
779 type: PoolTypes.dynamic,
780 worker: WorkerTypes.cluster,
781 started: true,
782 ready: true,
783 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
784 minSize: Math.floor(numberOfWorkers / 2),
785 maxSize: numberOfWorkers,
786 workerNodes: Math.floor(numberOfWorkers / 2),
787 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
788 busyWorkerNodes: 0,
789 executedTasks: 0,
790 executingTasks: 0,
791 failedTasks: 0
792 })
793 await pool.destroy()
794 })
795
796 it('Verify that pool worker tasks usage are initialized', async () => {
797 const pool = new FixedClusterPool(
798 numberOfWorkers,
799 './tests/worker-files/cluster/testWorker.js'
800 )
801 for (const workerNode of pool.workerNodes) {
802 expect(workerNode).toBeInstanceOf(WorkerNode)
803 expect(workerNode.usage).toStrictEqual({
804 tasks: {
805 executed: 0,
806 executing: 0,
807 queued: 0,
808 maxQueued: 0,
809 stolen: 0,
810 failed: 0
811 },
812 runTime: {
813 history: new CircularArray()
814 },
815 waitTime: {
816 history: new CircularArray()
817 },
818 elu: {
819 idle: {
820 history: new CircularArray()
821 },
822 active: {
823 history: new CircularArray()
824 }
825 }
826 })
827 }
828 await pool.destroy()
829 })
830
831 it('Verify that pool worker tasks queue are initialized', async () => {
832 let pool = new FixedClusterPool(
833 numberOfWorkers,
834 './tests/worker-files/cluster/testWorker.js'
835 )
836 for (const workerNode of pool.workerNodes) {
837 expect(workerNode).toBeInstanceOf(WorkerNode)
838 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
839 expect(workerNode.tasksQueue.size).toBe(0)
840 expect(workerNode.tasksQueue.maxSize).toBe(0)
841 }
842 await pool.destroy()
843 pool = new DynamicThreadPool(
844 Math.floor(numberOfWorkers / 2),
845 numberOfWorkers,
846 './tests/worker-files/thread/testWorker.mjs'
847 )
848 for (const workerNode of pool.workerNodes) {
849 expect(workerNode).toBeInstanceOf(WorkerNode)
850 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
851 expect(workerNode.tasksQueue.size).toBe(0)
852 expect(workerNode.tasksQueue.maxSize).toBe(0)
853 }
854 await pool.destroy()
855 })
856
857 it('Verify that pool worker info are initialized', async () => {
858 let pool = new FixedClusterPool(
859 numberOfWorkers,
860 './tests/worker-files/cluster/testWorker.js'
861 )
862 for (const workerNode of pool.workerNodes) {
863 expect(workerNode).toBeInstanceOf(WorkerNode)
864 expect(workerNode.info).toStrictEqual({
865 id: expect.any(Number),
866 type: WorkerTypes.cluster,
867 dynamic: false,
868 ready: true
869 })
870 }
871 await pool.destroy()
872 pool = new DynamicThreadPool(
873 Math.floor(numberOfWorkers / 2),
874 numberOfWorkers,
875 './tests/worker-files/thread/testWorker.mjs'
876 )
877 for (const workerNode of pool.workerNodes) {
878 expect(workerNode).toBeInstanceOf(WorkerNode)
879 expect(workerNode.info).toStrictEqual({
880 id: expect.any(Number),
881 type: WorkerTypes.thread,
882 dynamic: false,
883 ready: true
884 })
885 }
886 await pool.destroy()
887 })
888
889 it('Verify that pool can be started after initialization', async () => {
890 const pool = new FixedClusterPool(
891 numberOfWorkers,
892 './tests/worker-files/cluster/testWorker.js',
893 {
894 startWorkers: false
895 }
896 )
897 expect(pool.info.started).toBe(false)
898 expect(pool.info.ready).toBe(false)
899 expect(pool.workerNodes).toStrictEqual([])
900 await expect(pool.execute()).rejects.toThrow(
901 new Error('Cannot execute a task on not started pool')
902 )
903 pool.start()
904 expect(pool.info.started).toBe(true)
905 expect(pool.info.ready).toBe(true)
906 expect(pool.workerNodes.length).toBe(numberOfWorkers)
907 for (const workerNode of pool.workerNodes) {
908 expect(workerNode).toBeInstanceOf(WorkerNode)
909 }
910 await pool.destroy()
911 })
912
913 it('Verify that pool execute() arguments are checked', async () => {
914 const pool = new FixedClusterPool(
915 numberOfWorkers,
916 './tests/worker-files/cluster/testWorker.js'
917 )
918 await expect(pool.execute(undefined, 0)).rejects.toThrow(
919 new TypeError('name argument must be a string')
920 )
921 await expect(pool.execute(undefined, '')).rejects.toThrow(
922 new TypeError('name argument must not be an empty string')
923 )
924 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
925 new TypeError('transferList argument must be an array')
926 )
927 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
928 "Task function 'unknown' not found"
929 )
930 await pool.destroy()
931 await expect(pool.execute()).rejects.toThrow(
932 new Error('Cannot execute a task on not started pool')
933 )
934 })
935
936 it('Verify that pool worker tasks usage are computed', async () => {
937 const pool = new FixedClusterPool(
938 numberOfWorkers,
939 './tests/worker-files/cluster/testWorker.js'
940 )
941 const promises = new Set()
942 const maxMultiplier = 2
943 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
944 promises.add(pool.execute())
945 }
946 for (const workerNode of pool.workerNodes) {
947 expect(workerNode.usage).toStrictEqual({
948 tasks: {
949 executed: 0,
950 executing: maxMultiplier,
951 queued: 0,
952 maxQueued: 0,
953 stolen: 0,
954 failed: 0
955 },
956 runTime: {
957 history: expect.any(CircularArray)
958 },
959 waitTime: {
960 history: expect.any(CircularArray)
961 },
962 elu: {
963 idle: {
964 history: expect.any(CircularArray)
965 },
966 active: {
967 history: expect.any(CircularArray)
968 }
969 }
970 })
971 }
972 await Promise.all(promises)
973 for (const workerNode of pool.workerNodes) {
974 expect(workerNode.usage).toStrictEqual({
975 tasks: {
976 executed: maxMultiplier,
977 executing: 0,
978 queued: 0,
979 maxQueued: 0,
980 stolen: 0,
981 failed: 0
982 },
983 runTime: {
984 history: expect.any(CircularArray)
985 },
986 waitTime: {
987 history: expect.any(CircularArray)
988 },
989 elu: {
990 idle: {
991 history: expect.any(CircularArray)
992 },
993 active: {
994 history: expect.any(CircularArray)
995 }
996 }
997 })
998 }
999 await pool.destroy()
1000 })
1001
1002 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1003 const pool = new DynamicThreadPool(
1004 Math.floor(numberOfWorkers / 2),
1005 numberOfWorkers,
1006 './tests/worker-files/thread/testWorker.mjs'
1007 )
1008 const promises = new Set()
1009 const maxMultiplier = 2
1010 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1011 promises.add(pool.execute())
1012 }
1013 await Promise.all(promises)
1014 for (const workerNode of pool.workerNodes) {
1015 expect(workerNode.usage).toStrictEqual({
1016 tasks: {
1017 executed: expect.any(Number),
1018 executing: 0,
1019 queued: 0,
1020 maxQueued: 0,
1021 stolen: 0,
1022 failed: 0
1023 },
1024 runTime: {
1025 history: expect.any(CircularArray)
1026 },
1027 waitTime: {
1028 history: expect.any(CircularArray)
1029 },
1030 elu: {
1031 idle: {
1032 history: expect.any(CircularArray)
1033 },
1034 active: {
1035 history: expect.any(CircularArray)
1036 }
1037 }
1038 })
1039 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1040 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1041 numberOfWorkers * maxMultiplier
1042 )
1043 expect(workerNode.usage.runTime.history.length).toBe(0)
1044 expect(workerNode.usage.waitTime.history.length).toBe(0)
1045 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1046 expect(workerNode.usage.elu.active.history.length).toBe(0)
1047 }
1048 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1049 for (const workerNode of pool.workerNodes) {
1050 expect(workerNode.usage).toStrictEqual({
1051 tasks: {
1052 executed: 0,
1053 executing: 0,
1054 queued: 0,
1055 maxQueued: 0,
1056 stolen: 0,
1057 failed: 0
1058 },
1059 runTime: {
1060 history: expect.any(CircularArray)
1061 },
1062 waitTime: {
1063 history: expect.any(CircularArray)
1064 },
1065 elu: {
1066 idle: {
1067 history: expect.any(CircularArray)
1068 },
1069 active: {
1070 history: expect.any(CircularArray)
1071 }
1072 }
1073 })
1074 expect(workerNode.usage.runTime.history.length).toBe(0)
1075 expect(workerNode.usage.waitTime.history.length).toBe(0)
1076 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1077 expect(workerNode.usage.elu.active.history.length).toBe(0)
1078 }
1079 await pool.destroy()
1080 })
1081
1082 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1083 const pool = new DynamicClusterPool(
1084 Math.floor(numberOfWorkers / 2),
1085 numberOfWorkers,
1086 './tests/worker-files/cluster/testWorker.js'
1087 )
1088 expect(pool.emitter.eventNames()).toStrictEqual([])
1089 let poolInfo
1090 let poolReady = 0
1091 pool.emitter.on(PoolEvents.ready, info => {
1092 ++poolReady
1093 poolInfo = info
1094 })
1095 await waitPoolEvents(pool, PoolEvents.ready, 1)
1096 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1097 expect(poolReady).toBe(1)
1098 expect(poolInfo).toStrictEqual({
1099 version,
1100 type: PoolTypes.dynamic,
1101 worker: WorkerTypes.cluster,
1102 started: true,
1103 ready: true,
1104 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1105 minSize: expect.any(Number),
1106 maxSize: expect.any(Number),
1107 workerNodes: expect.any(Number),
1108 idleWorkerNodes: expect.any(Number),
1109 busyWorkerNodes: expect.any(Number),
1110 executedTasks: expect.any(Number),
1111 executingTasks: expect.any(Number),
1112 failedTasks: expect.any(Number)
1113 })
1114 await pool.destroy()
1115 })
1116
1117 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1118 const pool = new FixedThreadPool(
1119 numberOfWorkers,
1120 './tests/worker-files/thread/testWorker.mjs'
1121 )
1122 expect(pool.emitter.eventNames()).toStrictEqual([])
1123 const promises = new Set()
1124 let poolBusy = 0
1125 let poolInfo
1126 pool.emitter.on(PoolEvents.busy, info => {
1127 ++poolBusy
1128 poolInfo = info
1129 })
1130 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1131 for (let i = 0; i < numberOfWorkers * 2; i++) {
1132 promises.add(pool.execute())
1133 }
1134 await Promise.all(promises)
1135 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1136 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1137 expect(poolBusy).toBe(numberOfWorkers + 1)
1138 expect(poolInfo).toStrictEqual({
1139 version,
1140 type: PoolTypes.fixed,
1141 worker: WorkerTypes.thread,
1142 started: true,
1143 ready: true,
1144 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1145 minSize: expect.any(Number),
1146 maxSize: expect.any(Number),
1147 workerNodes: expect.any(Number),
1148 idleWorkerNodes: expect.any(Number),
1149 busyWorkerNodes: expect.any(Number),
1150 executedTasks: expect.any(Number),
1151 executingTasks: expect.any(Number),
1152 failedTasks: expect.any(Number)
1153 })
1154 await pool.destroy()
1155 })
1156
1157 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1158 const pool = new DynamicThreadPool(
1159 Math.floor(numberOfWorkers / 2),
1160 numberOfWorkers,
1161 './tests/worker-files/thread/testWorker.mjs'
1162 )
1163 expect(pool.emitter.eventNames()).toStrictEqual([])
1164 const promises = new Set()
1165 let poolFull = 0
1166 let poolInfo
1167 pool.emitter.on(PoolEvents.full, info => {
1168 ++poolFull
1169 poolInfo = info
1170 })
1171 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1172 for (let i = 0; i < numberOfWorkers * 2; i++) {
1173 promises.add(pool.execute())
1174 }
1175 await Promise.all(promises)
1176 expect(poolFull).toBe(1)
1177 expect(poolInfo).toStrictEqual({
1178 version,
1179 type: PoolTypes.dynamic,
1180 worker: WorkerTypes.thread,
1181 started: true,
1182 ready: true,
1183 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1184 minSize: expect.any(Number),
1185 maxSize: expect.any(Number),
1186 workerNodes: expect.any(Number),
1187 idleWorkerNodes: expect.any(Number),
1188 busyWorkerNodes: expect.any(Number),
1189 executedTasks: expect.any(Number),
1190 executingTasks: expect.any(Number),
1191 failedTasks: expect.any(Number)
1192 })
1193 await pool.destroy()
1194 })
1195
1196 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1197 const pool = new FixedThreadPool(
1198 numberOfWorkers,
1199 './tests/worker-files/thread/testWorker.mjs',
1200 {
1201 enableTasksQueue: true
1202 }
1203 )
1204 stub(pool, 'hasBackPressure').returns(true)
1205 expect(pool.emitter.eventNames()).toStrictEqual([])
1206 const promises = new Set()
1207 let poolBackPressure = 0
1208 let poolInfo
1209 pool.emitter.on(PoolEvents.backPressure, info => {
1210 ++poolBackPressure
1211 poolInfo = info
1212 })
1213 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1214 for (let i = 0; i < numberOfWorkers + 1; i++) {
1215 promises.add(pool.execute())
1216 }
1217 await Promise.all(promises)
1218 expect(poolBackPressure).toBe(1)
1219 expect(poolInfo).toStrictEqual({
1220 version,
1221 type: PoolTypes.fixed,
1222 worker: WorkerTypes.thread,
1223 started: true,
1224 ready: true,
1225 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1226 minSize: expect.any(Number),
1227 maxSize: expect.any(Number),
1228 workerNodes: expect.any(Number),
1229 idleWorkerNodes: expect.any(Number),
1230 busyWorkerNodes: expect.any(Number),
1231 executedTasks: expect.any(Number),
1232 executingTasks: expect.any(Number),
1233 maxQueuedTasks: expect.any(Number),
1234 queuedTasks: expect.any(Number),
1235 backPressure: true,
1236 stolenTasks: expect.any(Number),
1237 failedTasks: expect.any(Number)
1238 })
1239 expect(pool.hasBackPressure.called).toBe(true)
1240 await pool.destroy()
1241 })
1242
1243 it('Verify that hasTaskFunction() is working', async () => {
1244 const dynamicThreadPool = new DynamicThreadPool(
1245 Math.floor(numberOfWorkers / 2),
1246 numberOfWorkers,
1247 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1248 )
1249 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1250 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1251 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1252 true
1253 )
1254 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1255 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1256 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1257 await dynamicThreadPool.destroy()
1258 const fixedClusterPool = new FixedClusterPool(
1259 numberOfWorkers,
1260 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1261 )
1262 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1263 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1264 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1265 true
1266 )
1267 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1268 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1269 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1270 await fixedClusterPool.destroy()
1271 })
1272
1273 it('Verify that addTaskFunction() is working', async () => {
1274 const dynamicThreadPool = new DynamicThreadPool(
1275 Math.floor(numberOfWorkers / 2),
1276 numberOfWorkers,
1277 './tests/worker-files/thread/testWorker.mjs'
1278 )
1279 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1280 await expect(
1281 dynamicThreadPool.addTaskFunction(0, () => {})
1282 ).rejects.toThrow(new TypeError('name argument must be a string'))
1283 await expect(
1284 dynamicThreadPool.addTaskFunction('', () => {})
1285 ).rejects.toThrow(
1286 new TypeError('name argument must not be an empty string')
1287 )
1288 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1289 new TypeError('fn argument must be a function')
1290 )
1291 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1292 new TypeError('fn argument must be a function')
1293 )
1294 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1295 DEFAULT_TASK_NAME,
1296 'test'
1297 ])
1298 const echoTaskFunction = data => {
1299 return data
1300 }
1301 await expect(
1302 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1303 ).resolves.toBe(true)
1304 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1305 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1306 echoTaskFunction
1307 )
1308 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1309 DEFAULT_TASK_NAME,
1310 'test',
1311 'echo'
1312 ])
1313 const taskFunctionData = { test: 'test' }
1314 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1315 expect(echoResult).toStrictEqual(taskFunctionData)
1316 for (const workerNode of dynamicThreadPool.workerNodes) {
1317 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1318 tasks: {
1319 executed: expect.any(Number),
1320 executing: 0,
1321 queued: 0,
1322 stolen: 0,
1323 failed: 0
1324 },
1325 runTime: {
1326 history: new CircularArray()
1327 },
1328 waitTime: {
1329 history: new CircularArray()
1330 },
1331 elu: {
1332 idle: {
1333 history: new CircularArray()
1334 },
1335 active: {
1336 history: new CircularArray()
1337 }
1338 }
1339 })
1340 }
1341 await dynamicThreadPool.destroy()
1342 })
1343
1344 it('Verify that removeTaskFunction() is working', async () => {
1345 const dynamicThreadPool = new DynamicThreadPool(
1346 Math.floor(numberOfWorkers / 2),
1347 numberOfWorkers,
1348 './tests/worker-files/thread/testWorker.mjs'
1349 )
1350 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1351 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1352 DEFAULT_TASK_NAME,
1353 'test'
1354 ])
1355 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1356 new Error('Cannot remove a task function not handled on the pool side')
1357 )
1358 const echoTaskFunction = data => {
1359 return data
1360 }
1361 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1362 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1363 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1364 echoTaskFunction
1365 )
1366 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1367 DEFAULT_TASK_NAME,
1368 'test',
1369 'echo'
1370 ])
1371 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1372 true
1373 )
1374 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1375 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 DEFAULT_TASK_NAME,
1378 'test'
1379 ])
1380 await dynamicThreadPool.destroy()
1381 })
1382
1383 it('Verify that listTaskFunctionNames() is working', async () => {
1384 const dynamicThreadPool = new DynamicThreadPool(
1385 Math.floor(numberOfWorkers / 2),
1386 numberOfWorkers,
1387 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1388 )
1389 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1390 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1391 DEFAULT_TASK_NAME,
1392 'jsonIntegerSerialization',
1393 'factorial',
1394 'fibonacci'
1395 ])
1396 await dynamicThreadPool.destroy()
1397 const fixedClusterPool = new FixedClusterPool(
1398 numberOfWorkers,
1399 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1400 )
1401 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1402 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1403 DEFAULT_TASK_NAME,
1404 'jsonIntegerSerialization',
1405 'factorial',
1406 'fibonacci'
1407 ])
1408 await fixedClusterPool.destroy()
1409 })
1410
1411 it('Verify that setDefaultTaskFunction() is working', async () => {
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1414 numberOfWorkers,
1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1416 )
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1418 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1419 new Error(
1420 "Task function operation 'default' failed on worker 33 with error: 'TypeError: name parameter is not a string'"
1421 )
1422 )
1423 await expect(
1424 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1425 ).rejects.toThrow(
1426 new Error(
1427 "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1428 )
1429 )
1430 await expect(
1431 dynamicThreadPool.setDefaultTaskFunction('unknown')
1432 ).rejects.toThrow(
1433 new Error(
1434 "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1435 )
1436 )
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1438 DEFAULT_TASK_NAME,
1439 'jsonIntegerSerialization',
1440 'factorial',
1441 'fibonacci'
1442 ])
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('factorial')
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1447 DEFAULT_TASK_NAME,
1448 'factorial',
1449 'jsonIntegerSerialization',
1450 'fibonacci'
1451 ])
1452 await expect(
1453 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1454 ).resolves.toBe(true)
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1456 DEFAULT_TASK_NAME,
1457 'fibonacci',
1458 'jsonIntegerSerialization',
1459 'factorial'
1460 ])
1461 await dynamicThreadPool.destroy()
1462 })
1463
1464 it('Verify that multiple task functions worker is working', async () => {
1465 const pool = new DynamicClusterPool(
1466 Math.floor(numberOfWorkers / 2),
1467 numberOfWorkers,
1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1469 )
1470 const data = { n: 10 }
1471 const result0 = await pool.execute(data)
1472 expect(result0).toStrictEqual({ ok: 1 })
1473 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1474 expect(result1).toStrictEqual({ ok: 1 })
1475 const result2 = await pool.execute(data, 'factorial')
1476 expect(result2).toBe(3628800)
1477 const result3 = await pool.execute(data, 'fibonacci')
1478 expect(result3).toBe(55)
1479 expect(pool.info.executingTasks).toBe(0)
1480 expect(pool.info.executedTasks).toBe(4)
1481 for (const workerNode of pool.workerNodes) {
1482 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1483 DEFAULT_TASK_NAME,
1484 'jsonIntegerSerialization',
1485 'factorial',
1486 'fibonacci'
1487 ])
1488 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1489 for (const name of pool.listTaskFunctionNames()) {
1490 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1491 tasks: {
1492 executed: expect.any(Number),
1493 executing: 0,
1494 failed: 0,
1495 queued: 0,
1496 stolen: 0
1497 },
1498 runTime: {
1499 history: expect.any(CircularArray)
1500 },
1501 waitTime: {
1502 history: expect.any(CircularArray)
1503 },
1504 elu: {
1505 idle: {
1506 history: expect.any(CircularArray)
1507 },
1508 active: {
1509 history: expect.any(CircularArray)
1510 }
1511 }
1512 })
1513 expect(
1514 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1515 ).toBeGreaterThan(0)
1516 }
1517 expect(
1518 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1519 ).toStrictEqual(
1520 workerNode.getTaskFunctionWorkerUsage(
1521 workerNode.info.taskFunctionNames[1]
1522 )
1523 )
1524 }
1525 await pool.destroy()
1526 })
1527
1528 it('Verify sendKillMessageToWorker()', async () => {
1529 const pool = new DynamicClusterPool(
1530 Math.floor(numberOfWorkers / 2),
1531 numberOfWorkers,
1532 './tests/worker-files/cluster/testWorker.js'
1533 )
1534 const workerNodeKey = 0
1535 await expect(
1536 pool.sendKillMessageToWorker(workerNodeKey)
1537 ).resolves.toBeUndefined()
1538 await pool.destroy()
1539 })
1540
1541 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1542 const pool = new DynamicClusterPool(
1543 Math.floor(numberOfWorkers / 2),
1544 numberOfWorkers,
1545 './tests/worker-files/cluster/testWorker.js'
1546 )
1547 const workerNodeKey = 0
1548 await expect(
1549 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1550 taskFunctionOperation: 'add',
1551 taskFunctionName: 'empty',
1552 taskFunction: (() => {}).toString()
1553 })
1554 ).resolves.toBe(true)
1555 expect(
1556 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1557 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1558 await pool.destroy()
1559 })
1560
1561 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1562 const pool = new DynamicClusterPool(
1563 Math.floor(numberOfWorkers / 2),
1564 numberOfWorkers,
1565 './tests/worker-files/cluster/testWorker.js'
1566 )
1567 await expect(
1568 pool.sendTaskFunctionOperationToWorkers({
1569 taskFunctionOperation: 'add',
1570 taskFunctionName: 'empty',
1571 taskFunction: (() => {}).toString()
1572 })
1573 ).resolves.toBe(true)
1574 for (const workerNode of pool.workerNodes) {
1575 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1576 DEFAULT_TASK_NAME,
1577 'test',
1578 'empty'
1579 ])
1580 }
1581 await pool.destroy()
1582 })
1583 })