test: switch to ESM
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { readFileSync } from 'node:fs'
3 import { expect } from 'expect'
4 import { restore, stub } from 'sinon'
5 import {
6 DynamicClusterPool,
7 DynamicThreadPool,
8 FixedClusterPool,
9 FixedThreadPool,
10 PoolEvents,
11 PoolTypes,
12 WorkerChoiceStrategies,
13 WorkerTypes
14 } from '../../lib/index.js'
15 import { CircularArray } from '../../lib/circular-array.js'
16 import { Deque } from '../../lib/deque.js'
17 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
18 import { waitPoolEvents } from '../test-utils.js'
19 import { WorkerNode } from '../../lib/pools/worker-node.js'
20
21 describe('Abstract pool test suite', () => {
22 const version = JSON.parse(readFileSync('./package.json', 'utf8')).version
23 const numberOfWorkers = 2
24 class StubPoolWithIsMain extends FixedThreadPool {
25 isMain () {
26 return false
27 }
28 }
29
30 afterEach(() => {
31 restore()
32 })
33
34 it('Simulate pool creation from a non main thread/process', () => {
35 expect(
36 () =>
37 new StubPoolWithIsMain(
38 numberOfWorkers,
39 './tests/worker-files/thread/testWorker.js',
40 {
41 errorHandler: e => console.error(e)
42 }
43 )
44 ).toThrowError(
45 new Error(
46 'Cannot start a pool from a worker with the same type as the pool'
47 )
48 )
49 })
50
51 it('Verify that pool statuses properties are set', async () => {
52 const pool = new FixedThreadPool(
53 numberOfWorkers,
54 './tests/worker-files/thread/testWorker.js'
55 )
56 expect(pool.starting).toBe(false)
57 expect(pool.started).toBe(true)
58 await pool.destroy()
59 })
60
61 it('Verify that filePath is checked', () => {
62 const expectedError = new Error(
63 'Please specify a file with a worker implementation'
64 )
65 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
66 expectedError
67 )
68 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
69 expectedError
70 )
71 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
72 expectedError
73 )
74 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
75 expectedError
76 )
77 expect(
78 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
79 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
80 })
81
82 it('Verify that numberOfWorkers is checked', () => {
83 expect(
84 () =>
85 new FixedThreadPool(
86 undefined,
87 './tests/worker-files/thread/testWorker.js'
88 )
89 ).toThrowError(
90 new Error(
91 'Cannot instantiate a pool without specifying the number of workers'
92 )
93 )
94 })
95
96 it('Verify that a negative number of workers is checked', () => {
97 expect(
98 () =>
99 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
100 ).toThrowError(
101 new RangeError(
102 'Cannot instantiate a pool with a negative number of workers'
103 )
104 )
105 })
106
107 it('Verify that a non integer number of workers is checked', () => {
108 expect(
109 () =>
110 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
111 ).toThrowError(
112 new TypeError(
113 'Cannot instantiate a pool with a non safe integer number of workers'
114 )
115 )
116 })
117
118 it('Verify that dynamic pool sizing is checked', () => {
119 expect(
120 () =>
121 new DynamicClusterPool(
122 1,
123 undefined,
124 './tests/worker-files/cluster/testWorker.js'
125 )
126 ).toThrowError(
127 new TypeError(
128 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
129 )
130 )
131 expect(
132 () =>
133 new DynamicThreadPool(
134 0.5,
135 1,
136 './tests/worker-files/thread/testWorker.js'
137 )
138 ).toThrowError(
139 new TypeError(
140 'Cannot instantiate a pool with a non safe integer number of workers'
141 )
142 )
143 expect(
144 () =>
145 new DynamicClusterPool(
146 0,
147 0.5,
148 './tests/worker-files/cluster/testWorker.js'
149 )
150 ).toThrowError(
151 new TypeError(
152 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
153 )
154 )
155 expect(
156 () =>
157 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
158 ).toThrowError(
159 new RangeError(
160 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
161 )
162 )
163 expect(
164 () =>
165 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
166 ).toThrowError(
167 new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
169 )
170 )
171 expect(
172 () =>
173 new DynamicClusterPool(
174 1,
175 1,
176 './tests/worker-files/cluster/testWorker.js'
177 )
178 ).toThrowError(
179 new RangeError(
180 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
181 )
182 )
183 })
184
185 it('Verify that pool options are checked', async () => {
186 let pool = new FixedThreadPool(
187 numberOfWorkers,
188 './tests/worker-files/thread/testWorker.js'
189 )
190 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
191 expect(pool.opts).toStrictEqual({
192 startWorkers: true,
193 enableEvents: true,
194 restartWorkerOnError: true,
195 enableTasksQueue: false,
196 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
197 workerChoiceStrategyOptions: {
198 retries: 6,
199 runTime: { median: false },
200 waitTime: { median: false },
201 elu: { median: false }
202 }
203 })
204 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
205 retries: 6,
206 runTime: { median: false },
207 waitTime: { median: false },
208 elu: { median: false }
209 })
210 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
211 .workerChoiceStrategies) {
212 expect(workerChoiceStrategy.opts).toStrictEqual({
213 retries: 6,
214 runTime: { median: false },
215 waitTime: { median: false },
216 elu: { median: false }
217 })
218 }
219 await pool.destroy()
220 const testHandler = () => console.info('test handler executed')
221 pool = new FixedThreadPool(
222 numberOfWorkers,
223 './tests/worker-files/thread/testWorker.js',
224 {
225 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
226 workerChoiceStrategyOptions: {
227 runTime: { median: true },
228 weights: { 0: 300, 1: 200 }
229 },
230 enableEvents: false,
231 restartWorkerOnError: false,
232 enableTasksQueue: true,
233 tasksQueueOptions: { concurrency: 2 },
234 messageHandler: testHandler,
235 errorHandler: testHandler,
236 onlineHandler: testHandler,
237 exitHandler: testHandler
238 }
239 )
240 expect(pool.emitter).toBeUndefined()
241 expect(pool.opts).toStrictEqual({
242 startWorkers: true,
243 enableEvents: false,
244 restartWorkerOnError: false,
245 enableTasksQueue: true,
246 tasksQueueOptions: {
247 concurrency: 2,
248 size: Math.pow(numberOfWorkers, 2),
249 taskStealing: true,
250 tasksStealingOnBackPressure: true
251 },
252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
253 workerChoiceStrategyOptions: {
254 retries: 6,
255 runTime: { median: true },
256 waitTime: { median: false },
257 elu: { median: false },
258 weights: { 0: 300, 1: 200 }
259 },
260 onlineHandler: testHandler,
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 exitHandler: testHandler
264 })
265 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
266 retries: 6,
267 runTime: { median: true },
268 waitTime: { median: false },
269 elu: { median: false },
270 weights: { 0: 300, 1: 200 }
271 })
272 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
273 .workerChoiceStrategies) {
274 expect(workerChoiceStrategy.opts).toStrictEqual({
275 retries: 6,
276 runTime: { median: true },
277 waitTime: { median: false },
278 elu: { median: false },
279 weights: { 0: 300, 1: 200 }
280 })
281 }
282 await pool.destroy()
283 })
284
285 it('Verify that pool options are validated', async () => {
286 expect(
287 () =>
288 new FixedThreadPool(
289 numberOfWorkers,
290 './tests/worker-files/thread/testWorker.js',
291 {
292 workerChoiceStrategy: 'invalidStrategy'
293 }
294 )
295 ).toThrowError(
296 new Error("Invalid worker choice strategy 'invalidStrategy'")
297 )
298 expect(
299 () =>
300 new FixedThreadPool(
301 numberOfWorkers,
302 './tests/worker-files/thread/testWorker.js',
303 {
304 workerChoiceStrategyOptions: {
305 retries: 'invalidChoiceRetries'
306 }
307 }
308 )
309 ).toThrowError(
310 new TypeError(
311 'Invalid worker choice strategy options: retries must be an integer'
312 )
313 )
314 expect(
315 () =>
316 new FixedThreadPool(
317 numberOfWorkers,
318 './tests/worker-files/thread/testWorker.js',
319 {
320 workerChoiceStrategyOptions: {
321 retries: -1
322 }
323 }
324 )
325 ).toThrowError(
326 new RangeError(
327 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
328 )
329 )
330 expect(
331 () =>
332 new FixedThreadPool(
333 numberOfWorkers,
334 './tests/worker-files/thread/testWorker.js',
335 {
336 workerChoiceStrategyOptions: { weights: {} }
337 }
338 )
339 ).toThrowError(
340 new Error(
341 'Invalid worker choice strategy options: must have a weight for each worker node'
342 )
343 )
344 expect(
345 () =>
346 new FixedThreadPool(
347 numberOfWorkers,
348 './tests/worker-files/thread/testWorker.js',
349 {
350 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
351 }
352 )
353 ).toThrowError(
354 new Error(
355 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
356 )
357 )
358 expect(
359 () =>
360 new FixedThreadPool(
361 numberOfWorkers,
362 './tests/worker-files/thread/testWorker.js',
363 {
364 enableTasksQueue: true,
365 tasksQueueOptions: 'invalidTasksQueueOptions'
366 }
367 )
368 ).toThrowError(
369 new TypeError('Invalid tasks queue options: must be a plain object')
370 )
371 expect(
372 () =>
373 new FixedThreadPool(
374 numberOfWorkers,
375 './tests/worker-files/thread/testWorker.js',
376 {
377 enableTasksQueue: true,
378 tasksQueueOptions: { concurrency: 0 }
379 }
380 )
381 ).toThrowError(
382 new RangeError(
383 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
384 )
385 )
386 expect(
387 () =>
388 new FixedThreadPool(
389 numberOfWorkers,
390 './tests/worker-files/thread/testWorker.js',
391 {
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: -1 }
394 }
395 )
396 ).toThrowError(
397 new RangeError(
398 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
399 )
400 )
401 expect(
402 () =>
403 new FixedThreadPool(
404 numberOfWorkers,
405 './tests/worker-files/thread/testWorker.js',
406 {
407 enableTasksQueue: true,
408 tasksQueueOptions: { concurrency: 0.2 }
409 }
410 )
411 ).toThrowError(
412 new TypeError('Invalid worker node tasks concurrency: must be an integer')
413 )
414 expect(
415 () =>
416 new FixedThreadPool(
417 numberOfWorkers,
418 './tests/worker-files/thread/testWorker.js',
419 {
420 enableTasksQueue: true,
421 tasksQueueOptions: { size: 0 }
422 }
423 )
424 ).toThrowError(
425 new RangeError(
426 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
427 )
428 )
429 expect(
430 () =>
431 new FixedThreadPool(
432 numberOfWorkers,
433 './tests/worker-files/thread/testWorker.js',
434 {
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: -1 }
437 }
438 )
439 ).toThrowError(
440 new RangeError(
441 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
442 )
443 )
444 expect(
445 () =>
446 new FixedThreadPool(
447 numberOfWorkers,
448 './tests/worker-files/thread/testWorker.js',
449 {
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: 0.2 }
452 }
453 )
454 ).toThrowError(
455 new TypeError('Invalid worker node tasks queue size: must be an integer')
456 )
457 })
458
459 it('Verify that pool worker choice strategy options can be set', async () => {
460 const pool = new FixedThreadPool(
461 numberOfWorkers,
462 './tests/worker-files/thread/testWorker.js',
463 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
464 )
465 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
466 retries: 6,
467 runTime: { median: false },
468 waitTime: { median: false },
469 elu: { median: false }
470 })
471 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
472 retries: 6,
473 runTime: { median: false },
474 waitTime: { median: false },
475 elu: { median: false }
476 })
477 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
478 .workerChoiceStrategies) {
479 expect(workerChoiceStrategy.opts).toStrictEqual({
480 retries: 6,
481 runTime: { median: false },
482 waitTime: { median: false },
483 elu: { median: false }
484 })
485 }
486 expect(
487 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
488 ).toStrictEqual({
489 runTime: {
490 aggregate: true,
491 average: true,
492 median: false
493 },
494 waitTime: {
495 aggregate: false,
496 average: false,
497 median: false
498 },
499 elu: {
500 aggregate: true,
501 average: true,
502 median: false
503 }
504 })
505 pool.setWorkerChoiceStrategyOptions({
506 runTime: { median: true },
507 elu: { median: true }
508 })
509 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
510 retries: 6,
511 runTime: { median: true },
512 waitTime: { median: false },
513 elu: { median: true }
514 })
515 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
516 retries: 6,
517 runTime: { median: true },
518 waitTime: { median: false },
519 elu: { median: true }
520 })
521 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
522 .workerChoiceStrategies) {
523 expect(workerChoiceStrategy.opts).toStrictEqual({
524 retries: 6,
525 runTime: { median: true },
526 waitTime: { median: false },
527 elu: { median: true }
528 })
529 }
530 expect(
531 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
532 ).toStrictEqual({
533 runTime: {
534 aggregate: true,
535 average: false,
536 median: true
537 },
538 waitTime: {
539 aggregate: false,
540 average: false,
541 median: false
542 },
543 elu: {
544 aggregate: true,
545 average: false,
546 median: true
547 }
548 })
549 pool.setWorkerChoiceStrategyOptions({
550 runTime: { median: false },
551 elu: { median: false }
552 })
553 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
554 retries: 6,
555 runTime: { median: false },
556 waitTime: { median: false },
557 elu: { median: false }
558 })
559 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
560 retries: 6,
561 runTime: { median: false },
562 waitTime: { median: false },
563 elu: { median: false }
564 })
565 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
566 .workerChoiceStrategies) {
567 expect(workerChoiceStrategy.opts).toStrictEqual({
568 retries: 6,
569 runTime: { median: false },
570 waitTime: { median: false },
571 elu: { median: false }
572 })
573 }
574 expect(
575 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
576 ).toStrictEqual({
577 runTime: {
578 aggregate: true,
579 average: true,
580 median: false
581 },
582 waitTime: {
583 aggregate: false,
584 average: false,
585 median: false
586 },
587 elu: {
588 aggregate: true,
589 average: true,
590 median: false
591 }
592 })
593 expect(() =>
594 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
595 ).toThrowError(
596 new TypeError(
597 'Invalid worker choice strategy options: must be a plain object'
598 )
599 )
600 expect(() =>
601 pool.setWorkerChoiceStrategyOptions({
602 retries: 'invalidChoiceRetries'
603 })
604 ).toThrowError(
605 new TypeError(
606 'Invalid worker choice strategy options: retries must be an integer'
607 )
608 )
609 expect(() =>
610 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
611 ).toThrowError(
612 new RangeError(
613 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
614 )
615 )
616 expect(() =>
617 pool.setWorkerChoiceStrategyOptions({ weights: {} })
618 ).toThrowError(
619 new Error(
620 'Invalid worker choice strategy options: must have a weight for each worker node'
621 )
622 )
623 expect(() =>
624 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
625 ).toThrowError(
626 new Error(
627 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
628 )
629 )
630 await pool.destroy()
631 })
632
633 it('Verify that pool tasks queue can be enabled/disabled', async () => {
634 const pool = new FixedThreadPool(
635 numberOfWorkers,
636 './tests/worker-files/thread/testWorker.js'
637 )
638 expect(pool.opts.enableTasksQueue).toBe(false)
639 expect(pool.opts.tasksQueueOptions).toBeUndefined()
640 for (const workerNode of pool.workerNodes) {
641 expect(workerNode.onEmptyQueue).toBeUndefined()
642 expect(workerNode.onBackPressure).toBeUndefined()
643 }
644 pool.enableTasksQueue(true)
645 expect(pool.opts.enableTasksQueue).toBe(true)
646 expect(pool.opts.tasksQueueOptions).toStrictEqual({
647 concurrency: 1,
648 size: Math.pow(numberOfWorkers, 2),
649 taskStealing: true,
650 tasksStealingOnBackPressure: true
651 })
652 for (const workerNode of pool.workerNodes) {
653 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
654 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
655 }
656 pool.enableTasksQueue(true, { concurrency: 2 })
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 concurrency: 2,
660 size: Math.pow(numberOfWorkers, 2),
661 taskStealing: true,
662 tasksStealingOnBackPressure: true
663 })
664 for (const workerNode of pool.workerNodes) {
665 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
666 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
667 }
668 pool.enableTasksQueue(false)
669 expect(pool.opts.enableTasksQueue).toBe(false)
670 expect(pool.opts.tasksQueueOptions).toBeUndefined()
671 for (const workerNode of pool.workerNodes) {
672 expect(workerNode.onEmptyQueue).toBeUndefined()
673 expect(workerNode.onBackPressure).toBeUndefined()
674 }
675 await pool.destroy()
676 })
677
678 it('Verify that pool tasks queue options can be set', async () => {
679 const pool = new FixedThreadPool(
680 numberOfWorkers,
681 './tests/worker-files/thread/testWorker.js',
682 { enableTasksQueue: true }
683 )
684 expect(pool.opts.tasksQueueOptions).toStrictEqual({
685 concurrency: 1,
686 size: Math.pow(numberOfWorkers, 2),
687 taskStealing: true,
688 tasksStealingOnBackPressure: true
689 })
690 for (const workerNode of pool.workerNodes) {
691 expect(workerNode.tasksQueueBackPressureSize).toBe(
692 pool.opts.tasksQueueOptions.size
693 )
694 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
695 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
696 }
697 pool.setTasksQueueOptions({
698 concurrency: 2,
699 size: 2,
700 taskStealing: false,
701 tasksStealingOnBackPressure: false
702 })
703 expect(pool.opts.tasksQueueOptions).toStrictEqual({
704 concurrency: 2,
705 size: 2,
706 taskStealing: false,
707 tasksStealingOnBackPressure: false
708 })
709 for (const workerNode of pool.workerNodes) {
710 expect(workerNode.tasksQueueBackPressureSize).toBe(
711 pool.opts.tasksQueueOptions.size
712 )
713 expect(workerNode.onEmptyQueue).toBeUndefined()
714 expect(workerNode.onBackPressure).toBeUndefined()
715 }
716 pool.setTasksQueueOptions({
717 concurrency: 1,
718 taskStealing: true,
719 tasksStealingOnBackPressure: true
720 })
721 expect(pool.opts.tasksQueueOptions).toStrictEqual({
722 concurrency: 1,
723 size: Math.pow(numberOfWorkers, 2),
724 taskStealing: true,
725 tasksStealingOnBackPressure: true
726 })
727 for (const workerNode of pool.workerNodes) {
728 expect(workerNode.tasksQueueBackPressureSize).toBe(
729 pool.opts.tasksQueueOptions.size
730 )
731 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
732 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
733 }
734 expect(() =>
735 pool.setTasksQueueOptions('invalidTasksQueueOptions')
736 ).toThrowError(
737 new TypeError('Invalid tasks queue options: must be a plain object')
738 )
739 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
740 new RangeError(
741 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
742 )
743 )
744 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
745 new RangeError(
746 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
747 )
748 )
749 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
750 new TypeError('Invalid worker node tasks concurrency: must be an integer')
751 )
752 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
753 new RangeError(
754 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
755 )
756 )
757 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
758 new RangeError(
759 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
760 )
761 )
762 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
763 new TypeError('Invalid worker node tasks queue size: must be an integer')
764 )
765 await pool.destroy()
766 })
767
768 it('Verify that pool info is set', async () => {
769 let pool = new FixedThreadPool(
770 numberOfWorkers,
771 './tests/worker-files/thread/testWorker.js'
772 )
773 expect(pool.info).toStrictEqual({
774 version,
775 type: PoolTypes.fixed,
776 worker: WorkerTypes.thread,
777 started: true,
778 ready: true,
779 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
780 minSize: numberOfWorkers,
781 maxSize: numberOfWorkers,
782 workerNodes: numberOfWorkers,
783 idleWorkerNodes: numberOfWorkers,
784 busyWorkerNodes: 0,
785 executedTasks: 0,
786 executingTasks: 0,
787 failedTasks: 0
788 })
789 await pool.destroy()
790 pool = new DynamicClusterPool(
791 Math.floor(numberOfWorkers / 2),
792 numberOfWorkers,
793 './tests/worker-files/cluster/testWorker.js'
794 )
795 expect(pool.info).toStrictEqual({
796 version,
797 type: PoolTypes.dynamic,
798 worker: WorkerTypes.cluster,
799 started: true,
800 ready: true,
801 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
802 minSize: Math.floor(numberOfWorkers / 2),
803 maxSize: numberOfWorkers,
804 workerNodes: Math.floor(numberOfWorkers / 2),
805 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
806 busyWorkerNodes: 0,
807 executedTasks: 0,
808 executingTasks: 0,
809 failedTasks: 0
810 })
811 await pool.destroy()
812 })
813
814 it('Verify that pool worker tasks usage are initialized', async () => {
815 const pool = new FixedClusterPool(
816 numberOfWorkers,
817 './tests/worker-files/cluster/testWorker.js'
818 )
819 for (const workerNode of pool.workerNodes) {
820 expect(workerNode).toBeInstanceOf(WorkerNode)
821 expect(workerNode.usage).toStrictEqual({
822 tasks: {
823 executed: 0,
824 executing: 0,
825 queued: 0,
826 maxQueued: 0,
827 stolen: 0,
828 failed: 0
829 },
830 runTime: {
831 history: new CircularArray()
832 },
833 waitTime: {
834 history: new CircularArray()
835 },
836 elu: {
837 idle: {
838 history: new CircularArray()
839 },
840 active: {
841 history: new CircularArray()
842 }
843 }
844 })
845 }
846 await pool.destroy()
847 })
848
849 it('Verify that pool worker tasks queue are initialized', async () => {
850 let pool = new FixedClusterPool(
851 numberOfWorkers,
852 './tests/worker-files/cluster/testWorker.js'
853 )
854 for (const workerNode of pool.workerNodes) {
855 expect(workerNode).toBeInstanceOf(WorkerNode)
856 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
857 expect(workerNode.tasksQueue.size).toBe(0)
858 expect(workerNode.tasksQueue.maxSize).toBe(0)
859 }
860 await pool.destroy()
861 pool = new DynamicThreadPool(
862 Math.floor(numberOfWorkers / 2),
863 numberOfWorkers,
864 './tests/worker-files/thread/testWorker.js'
865 )
866 for (const workerNode of pool.workerNodes) {
867 expect(workerNode).toBeInstanceOf(WorkerNode)
868 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
869 expect(workerNode.tasksQueue.size).toBe(0)
870 expect(workerNode.tasksQueue.maxSize).toBe(0)
871 }
872 await pool.destroy()
873 })
874
875 it('Verify that pool worker info are initialized', async () => {
876 let pool = new FixedClusterPool(
877 numberOfWorkers,
878 './tests/worker-files/cluster/testWorker.js'
879 )
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
882 expect(workerNode.info).toStrictEqual({
883 id: expect.any(Number),
884 type: WorkerTypes.cluster,
885 dynamic: false,
886 ready: true
887 })
888 }
889 await pool.destroy()
890 pool = new DynamicThreadPool(
891 Math.floor(numberOfWorkers / 2),
892 numberOfWorkers,
893 './tests/worker-files/thread/testWorker.js'
894 )
895 for (const workerNode of pool.workerNodes) {
896 expect(workerNode).toBeInstanceOf(WorkerNode)
897 expect(workerNode.info).toStrictEqual({
898 id: expect.any(Number),
899 type: WorkerTypes.thread,
900 dynamic: false,
901 ready: true
902 })
903 }
904 await pool.destroy()
905 })
906
907 it('Verify that pool can be started after initialization', async () => {
908 const pool = new FixedClusterPool(
909 numberOfWorkers,
910 './tests/worker-files/cluster/testWorker.js',
911 {
912 startWorkers: false
913 }
914 )
915 expect(pool.info.started).toBe(false)
916 expect(pool.info.ready).toBe(false)
917 expect(pool.workerNodes).toStrictEqual([])
918 await expect(pool.execute()).rejects.toThrowError(
919 new Error('Cannot execute a task on not started pool')
920 )
921 pool.start()
922 expect(pool.info.started).toBe(true)
923 expect(pool.info.ready).toBe(true)
924 expect(pool.workerNodes.length).toBe(numberOfWorkers)
925 for (const workerNode of pool.workerNodes) {
926 expect(workerNode).toBeInstanceOf(WorkerNode)
927 }
928 await pool.destroy()
929 })
930
931 it('Verify that pool execute() arguments are checked', async () => {
932 const pool = new FixedClusterPool(
933 numberOfWorkers,
934 './tests/worker-files/cluster/testWorker.js'
935 )
936 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
937 new TypeError('name argument must be a string')
938 )
939 await expect(pool.execute(undefined, '')).rejects.toThrowError(
940 new TypeError('name argument must not be an empty string')
941 )
942 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
943 new TypeError('transferList argument must be an array')
944 )
945 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
946 "Task function 'unknown' not found"
947 )
948 await pool.destroy()
949 await expect(pool.execute()).rejects.toThrowError(
950 new Error('Cannot execute a task on not started pool')
951 )
952 })
953
954 it('Verify that pool worker tasks usage are computed', async () => {
955 const pool = new FixedClusterPool(
956 numberOfWorkers,
957 './tests/worker-files/cluster/testWorker.js'
958 )
959 const promises = new Set()
960 const maxMultiplier = 2
961 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
962 promises.add(pool.execute())
963 }
964 for (const workerNode of pool.workerNodes) {
965 expect(workerNode.usage).toStrictEqual({
966 tasks: {
967 executed: 0,
968 executing: maxMultiplier,
969 queued: 0,
970 maxQueued: 0,
971 stolen: 0,
972 failed: 0
973 },
974 runTime: {
975 history: expect.any(CircularArray)
976 },
977 waitTime: {
978 history: expect.any(CircularArray)
979 },
980 elu: {
981 idle: {
982 history: expect.any(CircularArray)
983 },
984 active: {
985 history: expect.any(CircularArray)
986 }
987 }
988 })
989 }
990 await Promise.all(promises)
991 for (const workerNode of pool.workerNodes) {
992 expect(workerNode.usage).toStrictEqual({
993 tasks: {
994 executed: maxMultiplier,
995 executing: 0,
996 queued: 0,
997 maxQueued: 0,
998 stolen: 0,
999 failed: 0
1000 },
1001 runTime: {
1002 history: expect.any(CircularArray)
1003 },
1004 waitTime: {
1005 history: expect.any(CircularArray)
1006 },
1007 elu: {
1008 idle: {
1009 history: expect.any(CircularArray)
1010 },
1011 active: {
1012 history: expect.any(CircularArray)
1013 }
1014 }
1015 })
1016 }
1017 await pool.destroy()
1018 })
1019
1020 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1021 const pool = new DynamicThreadPool(
1022 Math.floor(numberOfWorkers / 2),
1023 numberOfWorkers,
1024 './tests/worker-files/thread/testWorker.js'
1025 )
1026 const promises = new Set()
1027 const maxMultiplier = 2
1028 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1029 promises.add(pool.execute())
1030 }
1031 await Promise.all(promises)
1032 for (const workerNode of pool.workerNodes) {
1033 expect(workerNode.usage).toStrictEqual({
1034 tasks: {
1035 executed: expect.any(Number),
1036 executing: 0,
1037 queued: 0,
1038 maxQueued: 0,
1039 stolen: 0,
1040 failed: 0
1041 },
1042 runTime: {
1043 history: expect.any(CircularArray)
1044 },
1045 waitTime: {
1046 history: expect.any(CircularArray)
1047 },
1048 elu: {
1049 idle: {
1050 history: expect.any(CircularArray)
1051 },
1052 active: {
1053 history: expect.any(CircularArray)
1054 }
1055 }
1056 })
1057 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1058 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1059 numberOfWorkers * maxMultiplier
1060 )
1061 expect(workerNode.usage.runTime.history.length).toBe(0)
1062 expect(workerNode.usage.waitTime.history.length).toBe(0)
1063 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1064 expect(workerNode.usage.elu.active.history.length).toBe(0)
1065 }
1066 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1067 for (const workerNode of pool.workerNodes) {
1068 expect(workerNode.usage).toStrictEqual({
1069 tasks: {
1070 executed: 0,
1071 executing: 0,
1072 queued: 0,
1073 maxQueued: 0,
1074 stolen: 0,
1075 failed: 0
1076 },
1077 runTime: {
1078 history: expect.any(CircularArray)
1079 },
1080 waitTime: {
1081 history: expect.any(CircularArray)
1082 },
1083 elu: {
1084 idle: {
1085 history: expect.any(CircularArray)
1086 },
1087 active: {
1088 history: expect.any(CircularArray)
1089 }
1090 }
1091 })
1092 expect(workerNode.usage.runTime.history.length).toBe(0)
1093 expect(workerNode.usage.waitTime.history.length).toBe(0)
1094 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1095 expect(workerNode.usage.elu.active.history.length).toBe(0)
1096 }
1097 await pool.destroy()
1098 })
1099
1100 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1101 const pool = new DynamicClusterPool(
1102 Math.floor(numberOfWorkers / 2),
1103 numberOfWorkers,
1104 './tests/worker-files/cluster/testWorker.js'
1105 )
1106 expect(pool.emitter.eventNames()).toStrictEqual([])
1107 let poolInfo
1108 let poolReady = 0
1109 pool.emitter.on(PoolEvents.ready, info => {
1110 ++poolReady
1111 poolInfo = info
1112 })
1113 await waitPoolEvents(pool, PoolEvents.ready, 1)
1114 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1115 expect(poolReady).toBe(1)
1116 expect(poolInfo).toStrictEqual({
1117 version,
1118 type: PoolTypes.dynamic,
1119 worker: WorkerTypes.cluster,
1120 started: true,
1121 ready: true,
1122 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1123 minSize: expect.any(Number),
1124 maxSize: expect.any(Number),
1125 workerNodes: expect.any(Number),
1126 idleWorkerNodes: expect.any(Number),
1127 busyWorkerNodes: expect.any(Number),
1128 executedTasks: expect.any(Number),
1129 executingTasks: expect.any(Number),
1130 failedTasks: expect.any(Number)
1131 })
1132 await pool.destroy()
1133 })
1134
1135 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1136 const pool = new FixedThreadPool(
1137 numberOfWorkers,
1138 './tests/worker-files/thread/testWorker.js'
1139 )
1140 expect(pool.emitter.eventNames()).toStrictEqual([])
1141 const promises = new Set()
1142 let poolBusy = 0
1143 let poolInfo
1144 pool.emitter.on(PoolEvents.busy, info => {
1145 ++poolBusy
1146 poolInfo = info
1147 })
1148 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1149 for (let i = 0; i < numberOfWorkers * 2; i++) {
1150 promises.add(pool.execute())
1151 }
1152 await Promise.all(promises)
1153 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1154 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1155 expect(poolBusy).toBe(numberOfWorkers + 1)
1156 expect(poolInfo).toStrictEqual({
1157 version,
1158 type: PoolTypes.fixed,
1159 worker: WorkerTypes.thread,
1160 started: true,
1161 ready: true,
1162 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1163 minSize: expect.any(Number),
1164 maxSize: expect.any(Number),
1165 workerNodes: expect.any(Number),
1166 idleWorkerNodes: expect.any(Number),
1167 busyWorkerNodes: expect.any(Number),
1168 executedTasks: expect.any(Number),
1169 executingTasks: expect.any(Number),
1170 failedTasks: expect.any(Number)
1171 })
1172 await pool.destroy()
1173 })
1174
1175 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1176 const pool = new DynamicThreadPool(
1177 Math.floor(numberOfWorkers / 2),
1178 numberOfWorkers,
1179 './tests/worker-files/thread/testWorker.js'
1180 )
1181 expect(pool.emitter.eventNames()).toStrictEqual([])
1182 const promises = new Set()
1183 let poolFull = 0
1184 let poolInfo
1185 pool.emitter.on(PoolEvents.full, info => {
1186 ++poolFull
1187 poolInfo = info
1188 })
1189 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1190 for (let i = 0; i < numberOfWorkers * 2; i++) {
1191 promises.add(pool.execute())
1192 }
1193 await Promise.all(promises)
1194 expect(poolFull).toBe(1)
1195 expect(poolInfo).toStrictEqual({
1196 version,
1197 type: PoolTypes.dynamic,
1198 worker: WorkerTypes.thread,
1199 started: true,
1200 ready: true,
1201 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1202 minSize: expect.any(Number),
1203 maxSize: expect.any(Number),
1204 workerNodes: expect.any(Number),
1205 idleWorkerNodes: expect.any(Number),
1206 busyWorkerNodes: expect.any(Number),
1207 executedTasks: expect.any(Number),
1208 executingTasks: expect.any(Number),
1209 failedTasks: expect.any(Number)
1210 })
1211 await pool.destroy()
1212 })
1213
1214 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1215 const pool = new FixedThreadPool(
1216 numberOfWorkers,
1217 './tests/worker-files/thread/testWorker.js',
1218 {
1219 enableTasksQueue: true
1220 }
1221 )
1222 stub(pool, 'hasBackPressure').returns(true)
1223 expect(pool.emitter.eventNames()).toStrictEqual([])
1224 const promises = new Set()
1225 let poolBackPressure = 0
1226 let poolInfo
1227 pool.emitter.on(PoolEvents.backPressure, info => {
1228 ++poolBackPressure
1229 poolInfo = info
1230 })
1231 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1232 for (let i = 0; i < numberOfWorkers + 1; i++) {
1233 promises.add(pool.execute())
1234 }
1235 await Promise.all(promises)
1236 expect(poolBackPressure).toBe(1)
1237 expect(poolInfo).toStrictEqual({
1238 version,
1239 type: PoolTypes.fixed,
1240 worker: WorkerTypes.thread,
1241 started: true,
1242 ready: true,
1243 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1244 minSize: expect.any(Number),
1245 maxSize: expect.any(Number),
1246 workerNodes: expect.any(Number),
1247 idleWorkerNodes: expect.any(Number),
1248 busyWorkerNodes: expect.any(Number),
1249 executedTasks: expect.any(Number),
1250 executingTasks: expect.any(Number),
1251 maxQueuedTasks: expect.any(Number),
1252 queuedTasks: expect.any(Number),
1253 backPressure: true,
1254 stolenTasks: expect.any(Number),
1255 failedTasks: expect.any(Number)
1256 })
1257 expect(pool.hasBackPressure.called).toBe(true)
1258 await pool.destroy()
1259 })
1260
1261 it('Verify that hasTaskFunction() is working', async () => {
1262 const dynamicThreadPool = new DynamicThreadPool(
1263 Math.floor(numberOfWorkers / 2),
1264 numberOfWorkers,
1265 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1266 )
1267 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1268 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1269 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1270 true
1271 )
1272 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1273 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1274 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1275 await dynamicThreadPool.destroy()
1276 const fixedClusterPool = new FixedClusterPool(
1277 numberOfWorkers,
1278 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1279 )
1280 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1281 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1282 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1283 true
1284 )
1285 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1286 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1287 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1288 await fixedClusterPool.destroy()
1289 })
1290
1291 it('Verify that addTaskFunction() is working', async () => {
1292 const dynamicThreadPool = new DynamicThreadPool(
1293 Math.floor(numberOfWorkers / 2),
1294 numberOfWorkers,
1295 './tests/worker-files/thread/testWorker.js'
1296 )
1297 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1298 await expect(
1299 dynamicThreadPool.addTaskFunction(0, () => {})
1300 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1301 await expect(
1302 dynamicThreadPool.addTaskFunction('', () => {})
1303 ).rejects.toThrowError(
1304 new TypeError('name argument must not be an empty string')
1305 )
1306 await expect(
1307 dynamicThreadPool.addTaskFunction('test', 0)
1308 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1309 await expect(
1310 dynamicThreadPool.addTaskFunction('test', '')
1311 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1312 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1313 DEFAULT_TASK_NAME,
1314 'test'
1315 ])
1316 const echoTaskFunction = data => {
1317 return data
1318 }
1319 await expect(
1320 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1321 ).resolves.toBe(true)
1322 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1323 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1324 echoTaskFunction
1325 )
1326 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1327 DEFAULT_TASK_NAME,
1328 'test',
1329 'echo'
1330 ])
1331 const taskFunctionData = { test: 'test' }
1332 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1333 expect(echoResult).toStrictEqual(taskFunctionData)
1334 for (const workerNode of dynamicThreadPool.workerNodes) {
1335 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1336 tasks: {
1337 executed: expect.any(Number),
1338 executing: 0,
1339 queued: 0,
1340 stolen: 0,
1341 failed: 0
1342 },
1343 runTime: {
1344 history: new CircularArray()
1345 },
1346 waitTime: {
1347 history: new CircularArray()
1348 },
1349 elu: {
1350 idle: {
1351 history: new CircularArray()
1352 },
1353 active: {
1354 history: new CircularArray()
1355 }
1356 }
1357 })
1358 }
1359 await dynamicThreadPool.destroy()
1360 })
1361
1362 it('Verify that removeTaskFunction() is working', async () => {
1363 const dynamicThreadPool = new DynamicThreadPool(
1364 Math.floor(numberOfWorkers / 2),
1365 numberOfWorkers,
1366 './tests/worker-files/thread/testWorker.js'
1367 )
1368 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1369 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1370 DEFAULT_TASK_NAME,
1371 'test'
1372 ])
1373 await expect(
1374 dynamicThreadPool.removeTaskFunction('test')
1375 ).rejects.toThrowError(
1376 new Error('Cannot remove a task function not handled on the pool side')
1377 )
1378 const echoTaskFunction = data => {
1379 return data
1380 }
1381 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1382 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1383 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1384 echoTaskFunction
1385 )
1386 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1387 DEFAULT_TASK_NAME,
1388 'test',
1389 'echo'
1390 ])
1391 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1392 true
1393 )
1394 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1395 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1396 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1397 DEFAULT_TASK_NAME,
1398 'test'
1399 ])
1400 await dynamicThreadPool.destroy()
1401 })
1402
1403 it('Verify that listTaskFunctionNames() is working', async () => {
1404 const dynamicThreadPool = new DynamicThreadPool(
1405 Math.floor(numberOfWorkers / 2),
1406 numberOfWorkers,
1407 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1408 )
1409 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1410 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1411 DEFAULT_TASK_NAME,
1412 'jsonIntegerSerialization',
1413 'factorial',
1414 'fibonacci'
1415 ])
1416 await dynamicThreadPool.destroy()
1417 const fixedClusterPool = new FixedClusterPool(
1418 numberOfWorkers,
1419 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1420 )
1421 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1422 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1423 DEFAULT_TASK_NAME,
1424 'jsonIntegerSerialization',
1425 'factorial',
1426 'fibonacci'
1427 ])
1428 await fixedClusterPool.destroy()
1429 })
1430
1431 it('Verify that setDefaultTaskFunction() is working', async () => {
1432 const dynamicThreadPool = new DynamicThreadPool(
1433 Math.floor(numberOfWorkers / 2),
1434 numberOfWorkers,
1435 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1436 )
1437 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1438 await expect(
1439 dynamicThreadPool.setDefaultTaskFunction(0)
1440 ).rejects.toThrowError(
1441 new Error(
1442 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1443 )
1444 )
1445 await expect(
1446 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1447 ).rejects.toThrowError(
1448 new Error(
1449 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1450 )
1451 )
1452 await expect(
1453 dynamicThreadPool.setDefaultTaskFunction('unknown')
1454 ).rejects.toThrowError(
1455 new Error(
1456 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1457 )
1458 )
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 DEFAULT_TASK_NAME,
1461 'jsonIntegerSerialization',
1462 'factorial',
1463 'fibonacci'
1464 ])
1465 await expect(
1466 dynamicThreadPool.setDefaultTaskFunction('factorial')
1467 ).resolves.toBe(true)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 DEFAULT_TASK_NAME,
1470 'factorial',
1471 'jsonIntegerSerialization',
1472 'fibonacci'
1473 ])
1474 await expect(
1475 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1476 ).resolves.toBe(true)
1477 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1478 DEFAULT_TASK_NAME,
1479 'fibonacci',
1480 'jsonIntegerSerialization',
1481 'factorial'
1482 ])
1483 })
1484
1485 it('Verify that multiple task functions worker is working', async () => {
1486 const pool = new DynamicClusterPool(
1487 Math.floor(numberOfWorkers / 2),
1488 numberOfWorkers,
1489 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1490 )
1491 const data = { n: 10 }
1492 const result0 = await pool.execute(data)
1493 expect(result0).toStrictEqual({ ok: 1 })
1494 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1495 expect(result1).toStrictEqual({ ok: 1 })
1496 const result2 = await pool.execute(data, 'factorial')
1497 expect(result2).toBe(3628800)
1498 const result3 = await pool.execute(data, 'fibonacci')
1499 expect(result3).toBe(55)
1500 expect(pool.info.executingTasks).toBe(0)
1501 expect(pool.info.executedTasks).toBe(4)
1502 for (const workerNode of pool.workerNodes) {
1503 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1504 DEFAULT_TASK_NAME,
1505 'jsonIntegerSerialization',
1506 'factorial',
1507 'fibonacci'
1508 ])
1509 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1510 for (const name of pool.listTaskFunctionNames()) {
1511 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1512 tasks: {
1513 executed: expect.any(Number),
1514 executing: 0,
1515 failed: 0,
1516 queued: 0,
1517 stolen: 0
1518 },
1519 runTime: {
1520 history: expect.any(CircularArray)
1521 },
1522 waitTime: {
1523 history: expect.any(CircularArray)
1524 },
1525 elu: {
1526 idle: {
1527 history: expect.any(CircularArray)
1528 },
1529 active: {
1530 history: expect.any(CircularArray)
1531 }
1532 }
1533 })
1534 expect(
1535 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1536 ).toBeGreaterThan(0)
1537 }
1538 expect(
1539 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1540 ).toStrictEqual(
1541 workerNode.getTaskFunctionWorkerUsage(
1542 workerNode.info.taskFunctionNames[1]
1543 )
1544 )
1545 }
1546 await pool.destroy()
1547 })
1548
1549 it('Verify sendKillMessageToWorker()', async () => {
1550 const pool = new DynamicClusterPool(
1551 Math.floor(numberOfWorkers / 2),
1552 numberOfWorkers,
1553 './tests/worker-files/cluster/testWorker.js'
1554 )
1555 const workerNodeKey = 0
1556 await expect(
1557 pool.sendKillMessageToWorker(workerNodeKey)
1558 ).resolves.toBeUndefined()
1559 await pool.destroy()
1560 })
1561
1562 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1563 const pool = new DynamicClusterPool(
1564 Math.floor(numberOfWorkers / 2),
1565 numberOfWorkers,
1566 './tests/worker-files/cluster/testWorker.js'
1567 )
1568 const workerNodeKey = 0
1569 await expect(
1570 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1571 taskFunctionOperation: 'add',
1572 taskFunctionName: 'empty',
1573 taskFunction: (() => {}).toString()
1574 })
1575 ).resolves.toBe(true)
1576 expect(
1577 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1578 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1579 await pool.destroy()
1580 })
1581
1582 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1583 const pool = new DynamicClusterPool(
1584 Math.floor(numberOfWorkers / 2),
1585 numberOfWorkers,
1586 './tests/worker-files/cluster/testWorker.js'
1587 )
1588 await expect(
1589 pool.sendTaskFunctionOperationToWorkers({
1590 taskFunctionOperation: 'add',
1591 taskFunctionName: 'empty',
1592 taskFunction: (() => {}).toString()
1593 })
1594 ).resolves.toBe(true)
1595 for (const workerNode of pool.workerNodes) {
1596 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1597 DEFAULT_TASK_NAME,
1598 'test',
1599 'empty'
1600 ])
1601 }
1602 await pool.destroy()
1603 })
1604 })