85d48d6a4e4eecaab910c20814a8abc471cc6fe3
[poolifier.git] / tests / pools / abstract-pool.test.js
1 const { EventEmitter } = require('node:events')
2 const { expect } = require('expect')
3 const sinon = require('sinon')
4 const {
5 DynamicClusterPool,
6 DynamicThreadPool,
7 FixedClusterPool,
8 FixedThreadPool,
9 PoolEvents,
10 PoolTypes,
11 WorkerChoiceStrategies,
12 WorkerTypes
13 } = require('../../lib')
14 const { CircularArray } = require('../../lib/circular-array')
15 const { Deque } = require('../../lib/deque')
16 const { DEFAULT_TASK_NAME } = require('../../lib/utils')
17 const { version } = require('../../package.json')
18 const { waitPoolEvents } = require('../test-utils')
19 const { WorkerNode } = require('../../lib/pools/worker-node')
20
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers = 2
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 afterEach(() => {
30 sinon.restore()
31 })
32
33 it('Simulate pool creation from a non main thread/process', () => {
34 expect(
35 () =>
36 new StubPoolWithIsMain(
37 numberOfWorkers,
38 './tests/worker-files/thread/testWorker.js',
39 {
40 errorHandler: e => console.error(e)
41 }
42 )
43 ).toThrowError(
44 new Error(
45 'Cannot start a pool from a worker with the same type as the pool'
46 )
47 )
48 })
49
50 it('Verify that pool statuses properties are set', async () => {
51 const pool = new FixedThreadPool(
52 numberOfWorkers,
53 './tests/worker-files/thread/testWorker.js'
54 )
55 expect(pool.starting).toBe(false)
56 expect(pool.started).toBe(true)
57 await pool.destroy()
58 })
59
60 it('Verify that filePath is checked', () => {
61 const expectedError = new Error(
62 'Please specify a file with a worker implementation'
63 )
64 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
65 expectedError
66 )
67 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
68 expectedError
69 )
70 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
71 expectedError
72 )
73 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
74 expectedError
75 )
76 expect(
77 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
79 })
80
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(
83 () =>
84 new FixedThreadPool(
85 undefined,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 ).toThrowError(
89 new Error(
90 'Cannot instantiate a pool without specifying the number of workers'
91 )
92 )
93 })
94
95 it('Verify that a negative number of workers is checked', () => {
96 expect(
97 () =>
98 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
99 ).toThrowError(
100 new RangeError(
101 'Cannot instantiate a pool with a negative number of workers'
102 )
103 )
104 })
105
106 it('Verify that a non integer number of workers is checked', () => {
107 expect(
108 () =>
109 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
110 ).toThrowError(
111 new TypeError(
112 'Cannot instantiate a pool with a non safe integer number of workers'
113 )
114 )
115 })
116
117 it('Verify that dynamic pool sizing is checked', () => {
118 expect(
119 () =>
120 new DynamicClusterPool(
121 1,
122 undefined,
123 './tests/worker-files/cluster/testWorker.js'
124 )
125 ).toThrowError(
126 new TypeError(
127 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
128 )
129 )
130 expect(
131 () =>
132 new DynamicThreadPool(
133 0.5,
134 1,
135 './tests/worker-files/thread/testWorker.js'
136 )
137 ).toThrowError(
138 new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 )
142 expect(
143 () =>
144 new DynamicClusterPool(
145 0,
146 0.5,
147 './tests/worker-files/cluster/testWorker.js'
148 )
149 ).toThrowError(
150 new TypeError(
151 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
152 )
153 )
154 expect(
155 () =>
156 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
157 ).toThrowError(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
164 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
165 ).toThrowError(
166 new RangeError(
167 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
168 )
169 )
170 expect(
171 () =>
172 new DynamicClusterPool(
173 1,
174 1,
175 './tests/worker-files/cluster/testWorker.js'
176 )
177 ).toThrowError(
178 new RangeError(
179 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
180 )
181 )
182 })
183
184 it('Verify that pool options are checked', async () => {
185 let pool = new FixedThreadPool(
186 numberOfWorkers,
187 './tests/worker-files/thread/testWorker.js'
188 )
189 expect(pool.emitter).toBeInstanceOf(EventEmitter)
190 expect(pool.opts).toStrictEqual({
191 startWorkers: true,
192 enableEvents: true,
193 restartWorkerOnError: true,
194 enableTasksQueue: false,
195 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
196 workerChoiceStrategyOptions: {
197 retries: 6,
198 runTime: { median: false },
199 waitTime: { median: false },
200 elu: { median: false }
201 }
202 })
203 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
204 retries: 6,
205 runTime: { median: false },
206 waitTime: { median: false },
207 elu: { median: false }
208 })
209 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
210 .workerChoiceStrategies) {
211 expect(workerChoiceStrategy.opts).toStrictEqual({
212 retries: 6,
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
216 })
217 }
218 await pool.destroy()
219 const testHandler = () => console.info('test handler executed')
220 pool = new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.js',
223 {
224 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
225 workerChoiceStrategyOptions: {
226 runTime: { median: true },
227 weights: { 0: 300, 1: 200 }
228 },
229 enableEvents: false,
230 restartWorkerOnError: false,
231 enableTasksQueue: true,
232 tasksQueueOptions: { concurrency: 2 },
233 messageHandler: testHandler,
234 errorHandler: testHandler,
235 onlineHandler: testHandler,
236 exitHandler: testHandler
237 }
238 )
239 expect(pool.emitter).toBeUndefined()
240 expect(pool.opts).toStrictEqual({
241 startWorkers: true,
242 enableEvents: false,
243 restartWorkerOnError: false,
244 enableTasksQueue: true,
245 tasksQueueOptions: {
246 concurrency: 2,
247 size: Math.pow(numberOfWorkers, 2),
248 taskStealing: true,
249 tasksStealingOnBackPressure: true
250 },
251 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
252 workerChoiceStrategyOptions: {
253 retries: 6,
254 runTime: { median: true },
255 waitTime: { median: false },
256 elu: { median: false },
257 weights: { 0: 300, 1: 200 }
258 },
259 onlineHandler: testHandler,
260 messageHandler: testHandler,
261 errorHandler: testHandler,
262 exitHandler: testHandler
263 })
264 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
265 retries: 6,
266 runTime: { median: true },
267 waitTime: { median: false },
268 elu: { median: false },
269 weights: { 0: 300, 1: 200 }
270 })
271 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
272 .workerChoiceStrategies) {
273 expect(workerChoiceStrategy.opts).toStrictEqual({
274 retries: 6,
275 runTime: { median: true },
276 waitTime: { median: false },
277 elu: { median: false },
278 weights: { 0: 300, 1: 200 }
279 })
280 }
281 await pool.destroy()
282 })
283
284 it('Verify that pool options are validated', async () => {
285 expect(
286 () =>
287 new FixedThreadPool(
288 numberOfWorkers,
289 './tests/worker-files/thread/testWorker.js',
290 {
291 workerChoiceStrategy: 'invalidStrategy'
292 }
293 )
294 ).toThrowError(
295 new Error("Invalid worker choice strategy 'invalidStrategy'")
296 )
297 expect(
298 () =>
299 new FixedThreadPool(
300 numberOfWorkers,
301 './tests/worker-files/thread/testWorker.js',
302 {
303 workerChoiceStrategyOptions: {
304 retries: 'invalidChoiceRetries'
305 }
306 }
307 )
308 ).toThrowError(
309 new TypeError(
310 'Invalid worker choice strategy options: retries must be an integer'
311 )
312 )
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.js',
318 {
319 workerChoiceStrategyOptions: {
320 retries: -1
321 }
322 }
323 )
324 ).toThrowError(
325 new RangeError(
326 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
327 )
328 )
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.js',
334 {
335 workerChoiceStrategyOptions: { weights: {} }
336 }
337 )
338 ).toThrowError(
339 new Error(
340 'Invalid worker choice strategy options: must have a weight for each worker node'
341 )
342 )
343 expect(
344 () =>
345 new FixedThreadPool(
346 numberOfWorkers,
347 './tests/worker-files/thread/testWorker.js',
348 {
349 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
350 }
351 )
352 ).toThrowError(
353 new Error(
354 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 )
356 )
357 expect(
358 () =>
359 new FixedThreadPool(
360 numberOfWorkers,
361 './tests/worker-files/thread/testWorker.js',
362 {
363 enableTasksQueue: true,
364 tasksQueueOptions: 'invalidTasksQueueOptions'
365 }
366 )
367 ).toThrowError(
368 new TypeError('Invalid tasks queue options: must be a plain object')
369 )
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
374 './tests/worker-files/thread/testWorker.js',
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: 0 }
378 }
379 )
380 ).toThrowError(
381 new RangeError(
382 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
383 )
384 )
385 expect(
386 () =>
387 new FixedThreadPool(
388 numberOfWorkers,
389 './tests/worker-files/thread/testWorker.js',
390 {
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: -1 }
393 }
394 )
395 ).toThrowError(
396 new RangeError(
397 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 )
399 )
400 expect(
401 () =>
402 new FixedThreadPool(
403 numberOfWorkers,
404 './tests/worker-files/thread/testWorker.js',
405 {
406 enableTasksQueue: true,
407 tasksQueueOptions: { concurrency: 0.2 }
408 }
409 )
410 ).toThrowError(
411 new TypeError('Invalid worker node tasks concurrency: must be an integer')
412 )
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
417 './tests/worker-files/thread/testWorker.js',
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: 0 }
421 }
422 )
423 ).toThrowError(
424 new RangeError(
425 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
426 )
427 )
428 expect(
429 () =>
430 new FixedThreadPool(
431 numberOfWorkers,
432 './tests/worker-files/thread/testWorker.js',
433 {
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: -1 }
436 }
437 )
438 ).toThrowError(
439 new RangeError(
440 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
441 )
442 )
443 expect(
444 () =>
445 new FixedThreadPool(
446 numberOfWorkers,
447 './tests/worker-files/thread/testWorker.js',
448 {
449 enableTasksQueue: true,
450 tasksQueueOptions: { size: 0.2 }
451 }
452 )
453 ).toThrowError(
454 new TypeError('Invalid worker node tasks queue size: must be an integer')
455 )
456 })
457
458 it('Verify that pool worker choice strategy options can be set', async () => {
459 const pool = new FixedThreadPool(
460 numberOfWorkers,
461 './tests/worker-files/thread/testWorker.js',
462 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
463 )
464 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
465 retries: 6,
466 runTime: { median: false },
467 waitTime: { median: false },
468 elu: { median: false }
469 })
470 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
471 retries: 6,
472 runTime: { median: false },
473 waitTime: { median: false },
474 elu: { median: false }
475 })
476 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
477 .workerChoiceStrategies) {
478 expect(workerChoiceStrategy.opts).toStrictEqual({
479 retries: 6,
480 runTime: { median: false },
481 waitTime: { median: false },
482 elu: { median: false }
483 })
484 }
485 expect(
486 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
487 ).toStrictEqual({
488 runTime: {
489 aggregate: true,
490 average: true,
491 median: false
492 },
493 waitTime: {
494 aggregate: false,
495 average: false,
496 median: false
497 },
498 elu: {
499 aggregate: true,
500 average: true,
501 median: false
502 }
503 })
504 pool.setWorkerChoiceStrategyOptions({
505 runTime: { median: true },
506 elu: { median: true }
507 })
508 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
509 retries: 6,
510 runTime: { median: true },
511 waitTime: { median: false },
512 elu: { median: true }
513 })
514 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
515 retries: 6,
516 runTime: { median: true },
517 waitTime: { median: false },
518 elu: { median: true }
519 })
520 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
521 .workerChoiceStrategies) {
522 expect(workerChoiceStrategy.opts).toStrictEqual({
523 retries: 6,
524 runTime: { median: true },
525 waitTime: { median: false },
526 elu: { median: true }
527 })
528 }
529 expect(
530 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
531 ).toStrictEqual({
532 runTime: {
533 aggregate: true,
534 average: false,
535 median: true
536 },
537 waitTime: {
538 aggregate: false,
539 average: false,
540 median: false
541 },
542 elu: {
543 aggregate: true,
544 average: false,
545 median: true
546 }
547 })
548 pool.setWorkerChoiceStrategyOptions({
549 runTime: { median: false },
550 elu: { median: false }
551 })
552 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
553 retries: 6,
554 runTime: { median: false },
555 waitTime: { median: false },
556 elu: { median: false }
557 })
558 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
559 retries: 6,
560 runTime: { median: false },
561 waitTime: { median: false },
562 elu: { median: false }
563 })
564 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
565 .workerChoiceStrategies) {
566 expect(workerChoiceStrategy.opts).toStrictEqual({
567 retries: 6,
568 runTime: { median: false },
569 waitTime: { median: false },
570 elu: { median: false }
571 })
572 }
573 expect(
574 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
575 ).toStrictEqual({
576 runTime: {
577 aggregate: true,
578 average: true,
579 median: false
580 },
581 waitTime: {
582 aggregate: false,
583 average: false,
584 median: false
585 },
586 elu: {
587 aggregate: true,
588 average: true,
589 median: false
590 }
591 })
592 expect(() =>
593 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
594 ).toThrowError(
595 new TypeError(
596 'Invalid worker choice strategy options: must be a plain object'
597 )
598 )
599 expect(() =>
600 pool.setWorkerChoiceStrategyOptions({
601 retries: 'invalidChoiceRetries'
602 })
603 ).toThrowError(
604 new TypeError(
605 'Invalid worker choice strategy options: retries must be an integer'
606 )
607 )
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
610 ).toThrowError(
611 new RangeError(
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
613 )
614 )
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({ weights: {} })
617 ).toThrowError(
618 new Error(
619 'Invalid worker choice strategy options: must have a weight for each worker node'
620 )
621 )
622 expect(() =>
623 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
624 ).toThrowError(
625 new Error(
626 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
627 )
628 )
629 await pool.destroy()
630 })
631
632 it('Verify that pool tasks queue can be enabled/disabled', async () => {
633 const pool = new FixedThreadPool(
634 numberOfWorkers,
635 './tests/worker-files/thread/testWorker.js'
636 )
637 expect(pool.opts.enableTasksQueue).toBe(false)
638 expect(pool.opts.tasksQueueOptions).toBeUndefined()
639 for (const workerNode of pool.workerNodes) {
640 expect(workerNode.onEmptyQueue).toBeUndefined()
641 expect(workerNode.onBackPressure).toBeUndefined()
642 }
643 pool.enableTasksQueue(true)
644 expect(pool.opts.enableTasksQueue).toBe(true)
645 expect(pool.opts.tasksQueueOptions).toStrictEqual({
646 concurrency: 1,
647 size: Math.pow(numberOfWorkers, 2),
648 taskStealing: true,
649 tasksStealingOnBackPressure: true
650 })
651 for (const workerNode of pool.workerNodes) {
652 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
653 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
654 }
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
659 size: Math.pow(numberOfWorkers, 2),
660 taskStealing: true,
661 tasksStealingOnBackPressure: true
662 })
663 for (const workerNode of pool.workerNodes) {
664 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
665 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
666 }
667 pool.enableTasksQueue(false)
668 expect(pool.opts.enableTasksQueue).toBe(false)
669 expect(pool.opts.tasksQueueOptions).toBeUndefined()
670 for (const workerNode of pool.workerNodes) {
671 expect(workerNode.onEmptyQueue).toBeUndefined()
672 expect(workerNode.onBackPressure).toBeUndefined()
673 }
674 await pool.destroy()
675 })
676
677 it('Verify that pool tasks queue options can be set', async () => {
678 const pool = new FixedThreadPool(
679 numberOfWorkers,
680 './tests/worker-files/thread/testWorker.js',
681 { enableTasksQueue: true }
682 )
683 expect(pool.opts.tasksQueueOptions).toStrictEqual({
684 concurrency: 1,
685 size: Math.pow(numberOfWorkers, 2),
686 taskStealing: true,
687 tasksStealingOnBackPressure: true
688 })
689 for (const workerNode of pool.workerNodes) {
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
692 )
693 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
694 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
695 }
696 pool.setTasksQueueOptions({
697 concurrency: 2,
698 size: 2,
699 taskStealing: false,
700 tasksStealingOnBackPressure: false
701 })
702 expect(pool.opts.tasksQueueOptions).toStrictEqual({
703 concurrency: 2,
704 size: 2,
705 taskStealing: false,
706 tasksStealingOnBackPressure: false
707 })
708 for (const workerNode of pool.workerNodes) {
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
711 )
712 expect(workerNode.onEmptyQueue).toBeUndefined()
713 expect(workerNode.onBackPressure).toBeUndefined()
714 }
715 pool.setTasksQueueOptions({
716 concurrency: 1,
717 taskStealing: true,
718 tasksStealingOnBackPressure: true
719 })
720 expect(pool.opts.tasksQueueOptions).toStrictEqual({
721 concurrency: 1,
722 size: Math.pow(numberOfWorkers, 2),
723 taskStealing: true,
724 tasksStealingOnBackPressure: true
725 })
726 for (const workerNode of pool.workerNodes) {
727 expect(workerNode.tasksQueueBackPressureSize).toBe(
728 pool.opts.tasksQueueOptions.size
729 )
730 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
731 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
732 }
733 expect(() =>
734 pool.setTasksQueueOptions('invalidTasksQueueOptions')
735 ).toThrowError(
736 new TypeError('Invalid tasks queue options: must be a plain object')
737 )
738 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
739 new RangeError(
740 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
741 )
742 )
743 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
744 new RangeError(
745 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
746 )
747 )
748 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
749 new TypeError('Invalid worker node tasks concurrency: must be an integer')
750 )
751 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
752 new RangeError(
753 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
754 )
755 )
756 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
757 new RangeError(
758 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
759 )
760 )
761 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
762 new TypeError('Invalid worker node tasks queue size: must be an integer')
763 )
764 await pool.destroy()
765 })
766
767 it('Verify that pool info is set', async () => {
768 let pool = new FixedThreadPool(
769 numberOfWorkers,
770 './tests/worker-files/thread/testWorker.js'
771 )
772 expect(pool.info).toStrictEqual({
773 version,
774 type: PoolTypes.fixed,
775 worker: WorkerTypes.thread,
776 started: true,
777 ready: true,
778 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
779 minSize: numberOfWorkers,
780 maxSize: numberOfWorkers,
781 workerNodes: numberOfWorkers,
782 idleWorkerNodes: numberOfWorkers,
783 busyWorkerNodes: 0,
784 executedTasks: 0,
785 executingTasks: 0,
786 failedTasks: 0
787 })
788 await pool.destroy()
789 pool = new DynamicClusterPool(
790 Math.floor(numberOfWorkers / 2),
791 numberOfWorkers,
792 './tests/worker-files/cluster/testWorker.js'
793 )
794 expect(pool.info).toStrictEqual({
795 version,
796 type: PoolTypes.dynamic,
797 worker: WorkerTypes.cluster,
798 started: true,
799 ready: true,
800 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
801 minSize: Math.floor(numberOfWorkers / 2),
802 maxSize: numberOfWorkers,
803 workerNodes: Math.floor(numberOfWorkers / 2),
804 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
805 busyWorkerNodes: 0,
806 executedTasks: 0,
807 executingTasks: 0,
808 failedTasks: 0
809 })
810 await pool.destroy()
811 })
812
813 it('Verify that pool worker tasks usage are initialized', async () => {
814 const pool = new FixedClusterPool(
815 numberOfWorkers,
816 './tests/worker-files/cluster/testWorker.js'
817 )
818 for (const workerNode of pool.workerNodes) {
819 expect(workerNode).toBeInstanceOf(WorkerNode)
820 expect(workerNode.usage).toStrictEqual({
821 tasks: {
822 executed: 0,
823 executing: 0,
824 queued: 0,
825 maxQueued: 0,
826 stolen: 0,
827 failed: 0
828 },
829 runTime: {
830 history: new CircularArray()
831 },
832 waitTime: {
833 history: new CircularArray()
834 },
835 elu: {
836 idle: {
837 history: new CircularArray()
838 },
839 active: {
840 history: new CircularArray()
841 }
842 }
843 })
844 }
845 await pool.destroy()
846 })
847
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
853 for (const workerNode of pool.workerNodes) {
854 expect(workerNode).toBeInstanceOf(WorkerNode)
855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
856 expect(workerNode.tasksQueue.size).toBe(0)
857 expect(workerNode.tasksQueue.maxSize).toBe(0)
858 }
859 await pool.destroy()
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
862 numberOfWorkers,
863 './tests/worker-files/thread/testWorker.js'
864 )
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 }
871 await pool.destroy()
872 })
873
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
876 numberOfWorkers,
877 './tests/worker-files/cluster/testWorker.js'
878 )
879 for (const workerNode of pool.workerNodes) {
880 expect(workerNode).toBeInstanceOf(WorkerNode)
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
884 dynamic: false,
885 ready: true
886 })
887 }
888 await pool.destroy()
889 pool = new DynamicThreadPool(
890 Math.floor(numberOfWorkers / 2),
891 numberOfWorkers,
892 './tests/worker-files/thread/testWorker.js'
893 )
894 for (const workerNode of pool.workerNodes) {
895 expect(workerNode).toBeInstanceOf(WorkerNode)
896 expect(workerNode.info).toStrictEqual({
897 id: expect.any(Number),
898 type: WorkerTypes.thread,
899 dynamic: false,
900 ready: true
901 })
902 }
903 await pool.destroy()
904 })
905
906 it('Verify that pool can be started after initialization', async () => {
907 const pool = new FixedClusterPool(
908 numberOfWorkers,
909 './tests/worker-files/cluster/testWorker.js',
910 {
911 startWorkers: false
912 }
913 )
914 expect(pool.info.started).toBe(false)
915 expect(pool.info.ready).toBe(false)
916 expect(pool.workerNodes).toStrictEqual([])
917 await expect(pool.execute()).rejects.toThrowError(
918 new Error('Cannot execute a task on not started pool')
919 )
920 pool.start()
921 expect(pool.info.started).toBe(true)
922 expect(pool.info.ready).toBe(true)
923 expect(pool.workerNodes.length).toBe(numberOfWorkers)
924 for (const workerNode of pool.workerNodes) {
925 expect(workerNode).toBeInstanceOf(WorkerNode)
926 }
927 await pool.destroy()
928 })
929
930 it('Verify that pool execute() arguments are checked', async () => {
931 const pool = new FixedClusterPool(
932 numberOfWorkers,
933 './tests/worker-files/cluster/testWorker.js'
934 )
935 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
936 new TypeError('name argument must be a string')
937 )
938 await expect(pool.execute(undefined, '')).rejects.toThrowError(
939 new TypeError('name argument must not be an empty string')
940 )
941 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
942 new TypeError('transferList argument must be an array')
943 )
944 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
945 "Task function 'unknown' not found"
946 )
947 await pool.destroy()
948 await expect(pool.execute()).rejects.toThrowError(
949 new Error('Cannot execute a task on not started pool')
950 )
951 })
952
953 it('Verify that pool worker tasks usage are computed', async () => {
954 const pool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testWorker.js'
957 )
958 const promises = new Set()
959 const maxMultiplier = 2
960 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
961 promises.add(pool.execute())
962 }
963 for (const workerNode of pool.workerNodes) {
964 expect(workerNode.usage).toStrictEqual({
965 tasks: {
966 executed: 0,
967 executing: maxMultiplier,
968 queued: 0,
969 maxQueued: 0,
970 stolen: 0,
971 failed: 0
972 },
973 runTime: {
974 history: expect.any(CircularArray)
975 },
976 waitTime: {
977 history: expect.any(CircularArray)
978 },
979 elu: {
980 idle: {
981 history: expect.any(CircularArray)
982 },
983 active: {
984 history: expect.any(CircularArray)
985 }
986 }
987 })
988 }
989 await Promise.all(promises)
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
992 tasks: {
993 executed: maxMultiplier,
994 executing: 0,
995 queued: 0,
996 maxQueued: 0,
997 stolen: 0,
998 failed: 0
999 },
1000 runTime: {
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
1004 history: expect.any(CircularArray)
1005 },
1006 elu: {
1007 idle: {
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
1011 history: expect.any(CircularArray)
1012 }
1013 }
1014 })
1015 }
1016 await pool.destroy()
1017 })
1018
1019 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1020 const pool = new DynamicThreadPool(
1021 Math.floor(numberOfWorkers / 2),
1022 numberOfWorkers,
1023 './tests/worker-files/thread/testWorker.js'
1024 )
1025 const promises = new Set()
1026 const maxMultiplier = 2
1027 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1028 promises.add(pool.execute())
1029 }
1030 await Promise.all(promises)
1031 for (const workerNode of pool.workerNodes) {
1032 expect(workerNode.usage).toStrictEqual({
1033 tasks: {
1034 executed: expect.any(Number),
1035 executing: 0,
1036 queued: 0,
1037 maxQueued: 0,
1038 stolen: 0,
1039 failed: 0
1040 },
1041 runTime: {
1042 history: expect.any(CircularArray)
1043 },
1044 waitTime: {
1045 history: expect.any(CircularArray)
1046 },
1047 elu: {
1048 idle: {
1049 history: expect.any(CircularArray)
1050 },
1051 active: {
1052 history: expect.any(CircularArray)
1053 }
1054 }
1055 })
1056 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1057 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1058 numberOfWorkers * maxMultiplier
1059 )
1060 expect(workerNode.usage.runTime.history.length).toBe(0)
1061 expect(workerNode.usage.waitTime.history.length).toBe(0)
1062 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1063 expect(workerNode.usage.elu.active.history.length).toBe(0)
1064 }
1065 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1066 for (const workerNode of pool.workerNodes) {
1067 expect(workerNode.usage).toStrictEqual({
1068 tasks: {
1069 executed: 0,
1070 executing: 0,
1071 queued: 0,
1072 maxQueued: 0,
1073 stolen: 0,
1074 failed: 0
1075 },
1076 runTime: {
1077 history: expect.any(CircularArray)
1078 },
1079 waitTime: {
1080 history: expect.any(CircularArray)
1081 },
1082 elu: {
1083 idle: {
1084 history: expect.any(CircularArray)
1085 },
1086 active: {
1087 history: expect.any(CircularArray)
1088 }
1089 }
1090 })
1091 expect(workerNode.usage.runTime.history.length).toBe(0)
1092 expect(workerNode.usage.waitTime.history.length).toBe(0)
1093 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1094 expect(workerNode.usage.elu.active.history.length).toBe(0)
1095 }
1096 await pool.destroy()
1097 })
1098
1099 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1100 const pool = new DynamicClusterPool(
1101 Math.floor(numberOfWorkers / 2),
1102 numberOfWorkers,
1103 './tests/worker-files/cluster/testWorker.js'
1104 )
1105 let poolInfo
1106 let poolReady = 0
1107 pool.emitter.on(PoolEvents.ready, info => {
1108 ++poolReady
1109 poolInfo = info
1110 })
1111 await waitPoolEvents(pool, PoolEvents.ready, 1)
1112 expect(poolReady).toBe(1)
1113 expect(poolInfo).toStrictEqual({
1114 version,
1115 type: PoolTypes.dynamic,
1116 worker: WorkerTypes.cluster,
1117 started: true,
1118 ready: true,
1119 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1120 minSize: expect.any(Number),
1121 maxSize: expect.any(Number),
1122 workerNodes: expect.any(Number),
1123 idleWorkerNodes: expect.any(Number),
1124 busyWorkerNodes: expect.any(Number),
1125 executedTasks: expect.any(Number),
1126 executingTasks: expect.any(Number),
1127 failedTasks: expect.any(Number)
1128 })
1129 await pool.destroy()
1130 })
1131
1132 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1133 const pool = new FixedThreadPool(
1134 numberOfWorkers,
1135 './tests/worker-files/thread/testWorker.js'
1136 )
1137 const promises = new Set()
1138 let poolBusy = 0
1139 let poolInfo
1140 pool.emitter.on(PoolEvents.busy, info => {
1141 ++poolBusy
1142 poolInfo = info
1143 })
1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
1145 promises.add(pool.execute())
1146 }
1147 await Promise.all(promises)
1148 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1149 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1150 expect(poolBusy).toBe(numberOfWorkers + 1)
1151 expect(poolInfo).toStrictEqual({
1152 version,
1153 type: PoolTypes.fixed,
1154 worker: WorkerTypes.thread,
1155 started: true,
1156 ready: true,
1157 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1158 minSize: expect.any(Number),
1159 maxSize: expect.any(Number),
1160 workerNodes: expect.any(Number),
1161 idleWorkerNodes: expect.any(Number),
1162 busyWorkerNodes: expect.any(Number),
1163 executedTasks: expect.any(Number),
1164 executingTasks: expect.any(Number),
1165 failedTasks: expect.any(Number)
1166 })
1167 await pool.destroy()
1168 })
1169
1170 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1171 const pool = new DynamicThreadPool(
1172 Math.floor(numberOfWorkers / 2),
1173 numberOfWorkers,
1174 './tests/worker-files/thread/testWorker.js'
1175 )
1176 const promises = new Set()
1177 let poolFull = 0
1178 let poolInfo
1179 pool.emitter.on(PoolEvents.full, info => {
1180 ++poolFull
1181 poolInfo = info
1182 })
1183 for (let i = 0; i < numberOfWorkers * 2; i++) {
1184 promises.add(pool.execute())
1185 }
1186 await Promise.all(promises)
1187 expect(poolFull).toBe(1)
1188 expect(poolInfo).toStrictEqual({
1189 version,
1190 type: PoolTypes.dynamic,
1191 worker: WorkerTypes.thread,
1192 started: true,
1193 ready: true,
1194 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1195 minSize: expect.any(Number),
1196 maxSize: expect.any(Number),
1197 workerNodes: expect.any(Number),
1198 idleWorkerNodes: expect.any(Number),
1199 busyWorkerNodes: expect.any(Number),
1200 executedTasks: expect.any(Number),
1201 executingTasks: expect.any(Number),
1202 failedTasks: expect.any(Number)
1203 })
1204 await pool.destroy()
1205 })
1206
1207 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1208 const pool = new FixedThreadPool(
1209 numberOfWorkers,
1210 './tests/worker-files/thread/testWorker.js',
1211 {
1212 enableTasksQueue: true
1213 }
1214 )
1215 sinon.stub(pool, 'hasBackPressure').returns(true)
1216 const promises = new Set()
1217 let poolBackPressure = 0
1218 let poolInfo
1219 pool.emitter.on(PoolEvents.backPressure, info => {
1220 ++poolBackPressure
1221 poolInfo = info
1222 })
1223 for (let i = 0; i < numberOfWorkers + 1; i++) {
1224 promises.add(pool.execute())
1225 }
1226 await Promise.all(promises)
1227 expect(poolBackPressure).toBe(1)
1228 expect(poolInfo).toStrictEqual({
1229 version,
1230 type: PoolTypes.fixed,
1231 worker: WorkerTypes.thread,
1232 started: true,
1233 ready: true,
1234 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1235 minSize: expect.any(Number),
1236 maxSize: expect.any(Number),
1237 workerNodes: expect.any(Number),
1238 idleWorkerNodes: expect.any(Number),
1239 busyWorkerNodes: expect.any(Number),
1240 executedTasks: expect.any(Number),
1241 executingTasks: expect.any(Number),
1242 maxQueuedTasks: expect.any(Number),
1243 queuedTasks: expect.any(Number),
1244 backPressure: true,
1245 stolenTasks: expect.any(Number),
1246 failedTasks: expect.any(Number)
1247 })
1248 expect(pool.hasBackPressure.called).toBe(true)
1249 await pool.destroy()
1250 })
1251
1252 it('Verify that hasTaskFunction() is working', async () => {
1253 const dynamicThreadPool = new DynamicThreadPool(
1254 Math.floor(numberOfWorkers / 2),
1255 numberOfWorkers,
1256 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1257 )
1258 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1259 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1260 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1261 true
1262 )
1263 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1264 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1265 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1266 await dynamicThreadPool.destroy()
1267 const fixedClusterPool = new FixedClusterPool(
1268 numberOfWorkers,
1269 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1270 )
1271 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1272 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1273 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1274 true
1275 )
1276 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1277 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1278 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1279 await fixedClusterPool.destroy()
1280 })
1281
1282 it('Verify that addTaskFunction() is working', async () => {
1283 const dynamicThreadPool = new DynamicThreadPool(
1284 Math.floor(numberOfWorkers / 2),
1285 numberOfWorkers,
1286 './tests/worker-files/thread/testWorker.js'
1287 )
1288 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1289 await expect(
1290 dynamicThreadPool.addTaskFunction(0, () => {})
1291 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1292 await expect(
1293 dynamicThreadPool.addTaskFunction('', () => {})
1294 ).rejects.toThrowError(
1295 new TypeError('name argument must not be an empty string')
1296 )
1297 await expect(
1298 dynamicThreadPool.addTaskFunction('test', 0)
1299 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1300 await expect(
1301 dynamicThreadPool.addTaskFunction('test', '')
1302 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1303 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1304 DEFAULT_TASK_NAME,
1305 'test'
1306 ])
1307 const echoTaskFunction = data => {
1308 return data
1309 }
1310 await expect(
1311 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1312 ).resolves.toBe(true)
1313 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1314 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1315 echoTaskFunction
1316 )
1317 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1318 DEFAULT_TASK_NAME,
1319 'test',
1320 'echo'
1321 ])
1322 const taskFunctionData = { test: 'test' }
1323 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1324 expect(echoResult).toStrictEqual(taskFunctionData)
1325 for (const workerNode of dynamicThreadPool.workerNodes) {
1326 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1327 tasks: {
1328 executed: expect.any(Number),
1329 executing: 0,
1330 queued: 0,
1331 stolen: 0,
1332 failed: 0
1333 },
1334 runTime: {
1335 history: new CircularArray()
1336 },
1337 waitTime: {
1338 history: new CircularArray()
1339 },
1340 elu: {
1341 idle: {
1342 history: new CircularArray()
1343 },
1344 active: {
1345 history: new CircularArray()
1346 }
1347 }
1348 })
1349 }
1350 await dynamicThreadPool.destroy()
1351 })
1352
1353 it('Verify that removeTaskFunction() is working', async () => {
1354 const dynamicThreadPool = new DynamicThreadPool(
1355 Math.floor(numberOfWorkers / 2),
1356 numberOfWorkers,
1357 './tests/worker-files/thread/testWorker.js'
1358 )
1359 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1360 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1361 DEFAULT_TASK_NAME,
1362 'test'
1363 ])
1364 await expect(
1365 dynamicThreadPool.removeTaskFunction('test')
1366 ).rejects.toThrowError(
1367 new Error('Cannot remove a task function not handled on the pool side')
1368 )
1369 const echoTaskFunction = data => {
1370 return data
1371 }
1372 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1373 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1374 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1375 echoTaskFunction
1376 )
1377 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1378 DEFAULT_TASK_NAME,
1379 'test',
1380 'echo'
1381 ])
1382 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1383 true
1384 )
1385 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1386 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1387 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1388 DEFAULT_TASK_NAME,
1389 'test'
1390 ])
1391 await dynamicThreadPool.destroy()
1392 })
1393
1394 it('Verify that listTaskFunctionNames() is working', async () => {
1395 const dynamicThreadPool = new DynamicThreadPool(
1396 Math.floor(numberOfWorkers / 2),
1397 numberOfWorkers,
1398 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1399 )
1400 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1401 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1402 DEFAULT_TASK_NAME,
1403 'jsonIntegerSerialization',
1404 'factorial',
1405 'fibonacci'
1406 ])
1407 await dynamicThreadPool.destroy()
1408 const fixedClusterPool = new FixedClusterPool(
1409 numberOfWorkers,
1410 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1411 )
1412 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1413 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1414 DEFAULT_TASK_NAME,
1415 'jsonIntegerSerialization',
1416 'factorial',
1417 'fibonacci'
1418 ])
1419 await fixedClusterPool.destroy()
1420 })
1421
1422 it('Verify that setDefaultTaskFunction() is working', async () => {
1423 const dynamicThreadPool = new DynamicThreadPool(
1424 Math.floor(numberOfWorkers / 2),
1425 numberOfWorkers,
1426 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1427 )
1428 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1429 await expect(
1430 dynamicThreadPool.setDefaultTaskFunction(0)
1431 ).rejects.toThrowError(
1432 new Error(
1433 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1434 )
1435 )
1436 await expect(
1437 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1438 ).rejects.toThrowError(
1439 new Error(
1440 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1441 )
1442 )
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('unknown')
1445 ).rejects.toThrowError(
1446 new Error(
1447 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1448 )
1449 )
1450 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1451 DEFAULT_TASK_NAME,
1452 'jsonIntegerSerialization',
1453 'factorial',
1454 'fibonacci'
1455 ])
1456 await expect(
1457 dynamicThreadPool.setDefaultTaskFunction('factorial')
1458 ).resolves.toBe(true)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 DEFAULT_TASK_NAME,
1461 'factorial',
1462 'jsonIntegerSerialization',
1463 'fibonacci'
1464 ])
1465 await expect(
1466 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1467 ).resolves.toBe(true)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 DEFAULT_TASK_NAME,
1470 'fibonacci',
1471 'jsonIntegerSerialization',
1472 'factorial'
1473 ])
1474 })
1475
1476 it('Verify that multiple task functions worker is working', async () => {
1477 const pool = new DynamicClusterPool(
1478 Math.floor(numberOfWorkers / 2),
1479 numberOfWorkers,
1480 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1481 )
1482 const data = { n: 10 }
1483 const result0 = await pool.execute(data)
1484 expect(result0).toStrictEqual({ ok: 1 })
1485 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1486 expect(result1).toStrictEqual({ ok: 1 })
1487 const result2 = await pool.execute(data, 'factorial')
1488 expect(result2).toBe(3628800)
1489 const result3 = await pool.execute(data, 'fibonacci')
1490 expect(result3).toBe(55)
1491 expect(pool.info.executingTasks).toBe(0)
1492 expect(pool.info.executedTasks).toBe(4)
1493 for (const workerNode of pool.workerNodes) {
1494 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1495 DEFAULT_TASK_NAME,
1496 'jsonIntegerSerialization',
1497 'factorial',
1498 'fibonacci'
1499 ])
1500 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1501 for (const name of pool.listTaskFunctionNames()) {
1502 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1503 tasks: {
1504 executed: expect.any(Number),
1505 executing: 0,
1506 failed: 0,
1507 queued: 0,
1508 stolen: 0
1509 },
1510 runTime: {
1511 history: expect.any(CircularArray)
1512 },
1513 waitTime: {
1514 history: expect.any(CircularArray)
1515 },
1516 elu: {
1517 idle: {
1518 history: expect.any(CircularArray)
1519 },
1520 active: {
1521 history: expect.any(CircularArray)
1522 }
1523 }
1524 })
1525 expect(
1526 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1527 ).toBeGreaterThan(0)
1528 }
1529 expect(
1530 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1531 ).toStrictEqual(
1532 workerNode.getTaskFunctionWorkerUsage(
1533 workerNode.info.taskFunctionNames[1]
1534 )
1535 )
1536 }
1537 await pool.destroy()
1538 })
1539
1540 it('Verify sendKillMessageToWorker()', async () => {
1541 const pool = new DynamicClusterPool(
1542 Math.floor(numberOfWorkers / 2),
1543 numberOfWorkers,
1544 './tests/worker-files/cluster/testWorker.js'
1545 )
1546 const workerNodeKey = 0
1547 await expect(
1548 pool.sendKillMessageToWorker(workerNodeKey)
1549 ).resolves.toBeUndefined()
1550 await pool.destroy()
1551 })
1552
1553 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1554 const pool = new DynamicClusterPool(
1555 Math.floor(numberOfWorkers / 2),
1556 numberOfWorkers,
1557 './tests/worker-files/cluster/testWorker.js'
1558 )
1559 const workerNodeKey = 0
1560 await expect(
1561 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1562 taskFunctionOperation: 'add',
1563 taskFunctionName: 'empty',
1564 taskFunction: (() => {}).toString()
1565 })
1566 ).resolves.toBe(true)
1567 expect(
1568 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1569 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1570 await pool.destroy()
1571 })
1572
1573 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1574 const pool = new DynamicClusterPool(
1575 Math.floor(numberOfWorkers / 2),
1576 numberOfWorkers,
1577 './tests/worker-files/cluster/testWorker.js'
1578 )
1579 await expect(
1580 pool.sendTaskFunctionOperationToWorkers({
1581 taskFunctionOperation: 'add',
1582 taskFunctionName: 'empty',
1583 taskFunction: (() => {}).toString()
1584 })
1585 ).resolves.toBe(true)
1586 for (const workerNode of pool.workerNodes) {
1587 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1588 DEFAULT_TASK_NAME,
1589 'test',
1590 'empty'
1591 ])
1592 }
1593 await pool.destroy()
1594 })
1595 })