Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/express-hybrid...
[poolifier.git] / tests / pools / abstract-pool.test.js
1 const { EventEmitterAsyncResource } = require('node:events')
2 const { expect } = require('expect')
3 const sinon = require('sinon')
4 const {
5 DynamicClusterPool,
6 DynamicThreadPool,
7 FixedClusterPool,
8 FixedThreadPool,
9 PoolEvents,
10 PoolTypes,
11 WorkerChoiceStrategies,
12 WorkerTypes
13 } = require('../../lib')
14 const { CircularArray } = require('../../lib/circular-array')
15 const { Deque } = require('../../lib/deque')
16 const { DEFAULT_TASK_NAME } = require('../../lib/utils')
17 const { version } = require('../../package.json')
18 const { waitPoolEvents } = require('../test-utils')
19 const { WorkerNode } = require('../../lib/pools/worker-node')
20
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers = 2
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 afterEach(() => {
30 sinon.restore()
31 })
32
33 it('Simulate pool creation from a non main thread/process', () => {
34 expect(
35 () =>
36 new StubPoolWithIsMain(
37 numberOfWorkers,
38 './tests/worker-files/thread/testWorker.js',
39 {
40 errorHandler: e => console.error(e)
41 }
42 )
43 ).toThrowError(
44 new Error(
45 'Cannot start a pool from a worker with the same type as the pool'
46 )
47 )
48 })
49
50 it('Verify that pool statuses properties are set', async () => {
51 const pool = new FixedThreadPool(
52 numberOfWorkers,
53 './tests/worker-files/thread/testWorker.js'
54 )
55 expect(pool.starting).toBe(false)
56 expect(pool.started).toBe(true)
57 await pool.destroy()
58 })
59
60 it('Verify that filePath is checked', () => {
61 const expectedError = new Error(
62 'Please specify a file with a worker implementation'
63 )
64 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
65 expectedError
66 )
67 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
68 expectedError
69 )
70 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
71 expectedError
72 )
73 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
74 expectedError
75 )
76 expect(
77 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
79 })
80
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(
83 () =>
84 new FixedThreadPool(
85 undefined,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 ).toThrowError(
89 new Error(
90 'Cannot instantiate a pool without specifying the number of workers'
91 )
92 )
93 })
94
95 it('Verify that a negative number of workers is checked', () => {
96 expect(
97 () =>
98 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
99 ).toThrowError(
100 new RangeError(
101 'Cannot instantiate a pool with a negative number of workers'
102 )
103 )
104 })
105
106 it('Verify that a non integer number of workers is checked', () => {
107 expect(
108 () =>
109 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
110 ).toThrowError(
111 new TypeError(
112 'Cannot instantiate a pool with a non safe integer number of workers'
113 )
114 )
115 })
116
117 it('Verify that dynamic pool sizing is checked', () => {
118 expect(
119 () =>
120 new DynamicClusterPool(
121 1,
122 undefined,
123 './tests/worker-files/cluster/testWorker.js'
124 )
125 ).toThrowError(
126 new TypeError(
127 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
128 )
129 )
130 expect(
131 () =>
132 new DynamicThreadPool(
133 0.5,
134 1,
135 './tests/worker-files/thread/testWorker.js'
136 )
137 ).toThrowError(
138 new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 )
142 expect(
143 () =>
144 new DynamicClusterPool(
145 0,
146 0.5,
147 './tests/worker-files/cluster/testWorker.js'
148 )
149 ).toThrowError(
150 new TypeError(
151 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
152 )
153 )
154 expect(
155 () =>
156 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
157 ).toThrowError(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
164 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
165 ).toThrowError(
166 new RangeError(
167 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
168 )
169 )
170 expect(
171 () =>
172 new DynamicClusterPool(
173 1,
174 1,
175 './tests/worker-files/cluster/testWorker.js'
176 )
177 ).toThrowError(
178 new RangeError(
179 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
180 )
181 )
182 })
183
184 it('Verify that pool options are checked', async () => {
185 let pool = new FixedThreadPool(
186 numberOfWorkers,
187 './tests/worker-files/thread/testWorker.js'
188 )
189 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
190 expect(pool.opts).toStrictEqual({
191 startWorkers: true,
192 enableEvents: true,
193 restartWorkerOnError: true,
194 enableTasksQueue: false,
195 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
196 workerChoiceStrategyOptions: {
197 retries: 6,
198 runTime: { median: false },
199 waitTime: { median: false },
200 elu: { median: false }
201 }
202 })
203 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
204 retries: 6,
205 runTime: { median: false },
206 waitTime: { median: false },
207 elu: { median: false }
208 })
209 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
210 .workerChoiceStrategies) {
211 expect(workerChoiceStrategy.opts).toStrictEqual({
212 retries: 6,
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
216 })
217 }
218 await pool.destroy()
219 const testHandler = () => console.info('test handler executed')
220 pool = new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.js',
223 {
224 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
225 workerChoiceStrategyOptions: {
226 runTime: { median: true },
227 weights: { 0: 300, 1: 200 }
228 },
229 enableEvents: false,
230 restartWorkerOnError: false,
231 enableTasksQueue: true,
232 tasksQueueOptions: { concurrency: 2 },
233 messageHandler: testHandler,
234 errorHandler: testHandler,
235 onlineHandler: testHandler,
236 exitHandler: testHandler
237 }
238 )
239 expect(pool.emitter).toBeUndefined()
240 expect(pool.opts).toStrictEqual({
241 startWorkers: true,
242 enableEvents: false,
243 restartWorkerOnError: false,
244 enableTasksQueue: true,
245 tasksQueueOptions: {
246 concurrency: 2,
247 size: Math.pow(numberOfWorkers, 2),
248 taskStealing: true,
249 tasksStealingOnBackPressure: true
250 },
251 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
252 workerChoiceStrategyOptions: {
253 retries: 6,
254 runTime: { median: true },
255 waitTime: { median: false },
256 elu: { median: false },
257 weights: { 0: 300, 1: 200 }
258 },
259 onlineHandler: testHandler,
260 messageHandler: testHandler,
261 errorHandler: testHandler,
262 exitHandler: testHandler
263 })
264 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
265 retries: 6,
266 runTime: { median: true },
267 waitTime: { median: false },
268 elu: { median: false },
269 weights: { 0: 300, 1: 200 }
270 })
271 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
272 .workerChoiceStrategies) {
273 expect(workerChoiceStrategy.opts).toStrictEqual({
274 retries: 6,
275 runTime: { median: true },
276 waitTime: { median: false },
277 elu: { median: false },
278 weights: { 0: 300, 1: 200 }
279 })
280 }
281 await pool.destroy()
282 })
283
284 it('Verify that pool options are validated', async () => {
285 expect(
286 () =>
287 new FixedThreadPool(
288 numberOfWorkers,
289 './tests/worker-files/thread/testWorker.js',
290 {
291 workerChoiceStrategy: 'invalidStrategy'
292 }
293 )
294 ).toThrowError(
295 new Error("Invalid worker choice strategy 'invalidStrategy'")
296 )
297 expect(
298 () =>
299 new FixedThreadPool(
300 numberOfWorkers,
301 './tests/worker-files/thread/testWorker.js',
302 {
303 workerChoiceStrategyOptions: {
304 retries: 'invalidChoiceRetries'
305 }
306 }
307 )
308 ).toThrowError(
309 new TypeError(
310 'Invalid worker choice strategy options: retries must be an integer'
311 )
312 )
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.js',
318 {
319 workerChoiceStrategyOptions: {
320 retries: -1
321 }
322 }
323 )
324 ).toThrowError(
325 new RangeError(
326 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
327 )
328 )
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.js',
334 {
335 workerChoiceStrategyOptions: { weights: {} }
336 }
337 )
338 ).toThrowError(
339 new Error(
340 'Invalid worker choice strategy options: must have a weight for each worker node'
341 )
342 )
343 expect(
344 () =>
345 new FixedThreadPool(
346 numberOfWorkers,
347 './tests/worker-files/thread/testWorker.js',
348 {
349 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
350 }
351 )
352 ).toThrowError(
353 new Error(
354 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 )
356 )
357 expect(
358 () =>
359 new FixedThreadPool(
360 numberOfWorkers,
361 './tests/worker-files/thread/testWorker.js',
362 {
363 enableTasksQueue: true,
364 tasksQueueOptions: 'invalidTasksQueueOptions'
365 }
366 )
367 ).toThrowError(
368 new TypeError('Invalid tasks queue options: must be a plain object')
369 )
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
374 './tests/worker-files/thread/testWorker.js',
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: 0 }
378 }
379 )
380 ).toThrowError(
381 new RangeError(
382 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
383 )
384 )
385 expect(
386 () =>
387 new FixedThreadPool(
388 numberOfWorkers,
389 './tests/worker-files/thread/testWorker.js',
390 {
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: -1 }
393 }
394 )
395 ).toThrowError(
396 new RangeError(
397 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 )
399 )
400 expect(
401 () =>
402 new FixedThreadPool(
403 numberOfWorkers,
404 './tests/worker-files/thread/testWorker.js',
405 {
406 enableTasksQueue: true,
407 tasksQueueOptions: { concurrency: 0.2 }
408 }
409 )
410 ).toThrowError(
411 new TypeError('Invalid worker node tasks concurrency: must be an integer')
412 )
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
417 './tests/worker-files/thread/testWorker.js',
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: 0 }
421 }
422 )
423 ).toThrowError(
424 new RangeError(
425 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
426 )
427 )
428 expect(
429 () =>
430 new FixedThreadPool(
431 numberOfWorkers,
432 './tests/worker-files/thread/testWorker.js',
433 {
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: -1 }
436 }
437 )
438 ).toThrowError(
439 new RangeError(
440 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
441 )
442 )
443 expect(
444 () =>
445 new FixedThreadPool(
446 numberOfWorkers,
447 './tests/worker-files/thread/testWorker.js',
448 {
449 enableTasksQueue: true,
450 tasksQueueOptions: { size: 0.2 }
451 }
452 )
453 ).toThrowError(
454 new TypeError('Invalid worker node tasks queue size: must be an integer')
455 )
456 })
457
458 it('Verify that pool worker choice strategy options can be set', async () => {
459 const pool = new FixedThreadPool(
460 numberOfWorkers,
461 './tests/worker-files/thread/testWorker.js',
462 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
463 )
464 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
465 retries: 6,
466 runTime: { median: false },
467 waitTime: { median: false },
468 elu: { median: false }
469 })
470 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
471 retries: 6,
472 runTime: { median: false },
473 waitTime: { median: false },
474 elu: { median: false }
475 })
476 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
477 .workerChoiceStrategies) {
478 expect(workerChoiceStrategy.opts).toStrictEqual({
479 retries: 6,
480 runTime: { median: false },
481 waitTime: { median: false },
482 elu: { median: false }
483 })
484 }
485 expect(
486 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
487 ).toStrictEqual({
488 runTime: {
489 aggregate: true,
490 average: true,
491 median: false
492 },
493 waitTime: {
494 aggregate: false,
495 average: false,
496 median: false
497 },
498 elu: {
499 aggregate: true,
500 average: true,
501 median: false
502 }
503 })
504 pool.setWorkerChoiceStrategyOptions({
505 runTime: { median: true },
506 elu: { median: true }
507 })
508 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
509 retries: 6,
510 runTime: { median: true },
511 waitTime: { median: false },
512 elu: { median: true }
513 })
514 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
515 retries: 6,
516 runTime: { median: true },
517 waitTime: { median: false },
518 elu: { median: true }
519 })
520 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
521 .workerChoiceStrategies) {
522 expect(workerChoiceStrategy.opts).toStrictEqual({
523 retries: 6,
524 runTime: { median: true },
525 waitTime: { median: false },
526 elu: { median: true }
527 })
528 }
529 expect(
530 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
531 ).toStrictEqual({
532 runTime: {
533 aggregate: true,
534 average: false,
535 median: true
536 },
537 waitTime: {
538 aggregate: false,
539 average: false,
540 median: false
541 },
542 elu: {
543 aggregate: true,
544 average: false,
545 median: true
546 }
547 })
548 pool.setWorkerChoiceStrategyOptions({
549 runTime: { median: false },
550 elu: { median: false }
551 })
552 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
553 retries: 6,
554 runTime: { median: false },
555 waitTime: { median: false },
556 elu: { median: false }
557 })
558 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
559 retries: 6,
560 runTime: { median: false },
561 waitTime: { median: false },
562 elu: { median: false }
563 })
564 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
565 .workerChoiceStrategies) {
566 expect(workerChoiceStrategy.opts).toStrictEqual({
567 retries: 6,
568 runTime: { median: false },
569 waitTime: { median: false },
570 elu: { median: false }
571 })
572 }
573 expect(
574 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
575 ).toStrictEqual({
576 runTime: {
577 aggregate: true,
578 average: true,
579 median: false
580 },
581 waitTime: {
582 aggregate: false,
583 average: false,
584 median: false
585 },
586 elu: {
587 aggregate: true,
588 average: true,
589 median: false
590 }
591 })
592 expect(() =>
593 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
594 ).toThrowError(
595 new TypeError(
596 'Invalid worker choice strategy options: must be a plain object'
597 )
598 )
599 expect(() =>
600 pool.setWorkerChoiceStrategyOptions({
601 retries: 'invalidChoiceRetries'
602 })
603 ).toThrowError(
604 new TypeError(
605 'Invalid worker choice strategy options: retries must be an integer'
606 )
607 )
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
610 ).toThrowError(
611 new RangeError(
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
613 )
614 )
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({ weights: {} })
617 ).toThrowError(
618 new Error(
619 'Invalid worker choice strategy options: must have a weight for each worker node'
620 )
621 )
622 expect(() =>
623 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
624 ).toThrowError(
625 new Error(
626 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
627 )
628 )
629 await pool.destroy()
630 })
631
632 it('Verify that pool tasks queue can be enabled/disabled', async () => {
633 const pool = new FixedThreadPool(
634 numberOfWorkers,
635 './tests/worker-files/thread/testWorker.js'
636 )
637 expect(pool.opts.enableTasksQueue).toBe(false)
638 expect(pool.opts.tasksQueueOptions).toBeUndefined()
639 for (const workerNode of pool.workerNodes) {
640 expect(workerNode.onEmptyQueue).toBeUndefined()
641 expect(workerNode.onBackPressure).toBeUndefined()
642 }
643 pool.enableTasksQueue(true)
644 expect(pool.opts.enableTasksQueue).toBe(true)
645 expect(pool.opts.tasksQueueOptions).toStrictEqual({
646 concurrency: 1,
647 size: Math.pow(numberOfWorkers, 2),
648 taskStealing: true,
649 tasksStealingOnBackPressure: true
650 })
651 for (const workerNode of pool.workerNodes) {
652 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
653 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
654 }
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
659 size: Math.pow(numberOfWorkers, 2),
660 taskStealing: true,
661 tasksStealingOnBackPressure: true
662 })
663 for (const workerNode of pool.workerNodes) {
664 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
665 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
666 }
667 pool.enableTasksQueue(false)
668 expect(pool.opts.enableTasksQueue).toBe(false)
669 expect(pool.opts.tasksQueueOptions).toBeUndefined()
670 for (const workerNode of pool.workerNodes) {
671 expect(workerNode.onEmptyQueue).toBeUndefined()
672 expect(workerNode.onBackPressure).toBeUndefined()
673 }
674 await pool.destroy()
675 })
676
677 it('Verify that pool tasks queue options can be set', async () => {
678 const pool = new FixedThreadPool(
679 numberOfWorkers,
680 './tests/worker-files/thread/testWorker.js',
681 { enableTasksQueue: true }
682 )
683 expect(pool.opts.tasksQueueOptions).toStrictEqual({
684 concurrency: 1,
685 size: Math.pow(numberOfWorkers, 2),
686 taskStealing: true,
687 tasksStealingOnBackPressure: true
688 })
689 for (const workerNode of pool.workerNodes) {
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
692 )
693 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
694 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
695 }
696 pool.setTasksQueueOptions({
697 concurrency: 2,
698 size: 2,
699 taskStealing: false,
700 tasksStealingOnBackPressure: false
701 })
702 expect(pool.opts.tasksQueueOptions).toStrictEqual({
703 concurrency: 2,
704 size: 2,
705 taskStealing: false,
706 tasksStealingOnBackPressure: false
707 })
708 for (const workerNode of pool.workerNodes) {
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
711 )
712 expect(workerNode.onEmptyQueue).toBeUndefined()
713 expect(workerNode.onBackPressure).toBeUndefined()
714 }
715 pool.setTasksQueueOptions({
716 concurrency: 1,
717 taskStealing: true,
718 tasksStealingOnBackPressure: true
719 })
720 expect(pool.opts.tasksQueueOptions).toStrictEqual({
721 concurrency: 1,
722 size: Math.pow(numberOfWorkers, 2),
723 taskStealing: true,
724 tasksStealingOnBackPressure: true
725 })
726 for (const workerNode of pool.workerNodes) {
727 expect(workerNode.tasksQueueBackPressureSize).toBe(
728 pool.opts.tasksQueueOptions.size
729 )
730 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
731 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
732 }
733 expect(() =>
734 pool.setTasksQueueOptions('invalidTasksQueueOptions')
735 ).toThrowError(
736 new TypeError('Invalid tasks queue options: must be a plain object')
737 )
738 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
739 new RangeError(
740 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
741 )
742 )
743 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
744 new RangeError(
745 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
746 )
747 )
748 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
749 new TypeError('Invalid worker node tasks concurrency: must be an integer')
750 )
751 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
752 new RangeError(
753 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
754 )
755 )
756 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
757 new RangeError(
758 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
759 )
760 )
761 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
762 new TypeError('Invalid worker node tasks queue size: must be an integer')
763 )
764 await pool.destroy()
765 })
766
767 it('Verify that pool info is set', async () => {
768 let pool = new FixedThreadPool(
769 numberOfWorkers,
770 './tests/worker-files/thread/testWorker.js'
771 )
772 expect(pool.info).toStrictEqual({
773 version,
774 type: PoolTypes.fixed,
775 worker: WorkerTypes.thread,
776 started: true,
777 ready: true,
778 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
779 minSize: numberOfWorkers,
780 maxSize: numberOfWorkers,
781 workerNodes: numberOfWorkers,
782 idleWorkerNodes: numberOfWorkers,
783 busyWorkerNodes: 0,
784 executedTasks: 0,
785 executingTasks: 0,
786 failedTasks: 0
787 })
788 await pool.destroy()
789 pool = new DynamicClusterPool(
790 Math.floor(numberOfWorkers / 2),
791 numberOfWorkers,
792 './tests/worker-files/cluster/testWorker.js'
793 )
794 expect(pool.info).toStrictEqual({
795 version,
796 type: PoolTypes.dynamic,
797 worker: WorkerTypes.cluster,
798 started: true,
799 ready: true,
800 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
801 minSize: Math.floor(numberOfWorkers / 2),
802 maxSize: numberOfWorkers,
803 workerNodes: Math.floor(numberOfWorkers / 2),
804 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
805 busyWorkerNodes: 0,
806 executedTasks: 0,
807 executingTasks: 0,
808 failedTasks: 0
809 })
810 await pool.destroy()
811 })
812
813 it('Verify that pool worker tasks usage are initialized', async () => {
814 const pool = new FixedClusterPool(
815 numberOfWorkers,
816 './tests/worker-files/cluster/testWorker.js'
817 )
818 for (const workerNode of pool.workerNodes) {
819 expect(workerNode).toBeInstanceOf(WorkerNode)
820 expect(workerNode.usage).toStrictEqual({
821 tasks: {
822 executed: 0,
823 executing: 0,
824 queued: 0,
825 maxQueued: 0,
826 stolen: 0,
827 failed: 0
828 },
829 runTime: {
830 history: new CircularArray()
831 },
832 waitTime: {
833 history: new CircularArray()
834 },
835 elu: {
836 idle: {
837 history: new CircularArray()
838 },
839 active: {
840 history: new CircularArray()
841 }
842 }
843 })
844 }
845 await pool.destroy()
846 })
847
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
853 for (const workerNode of pool.workerNodes) {
854 expect(workerNode).toBeInstanceOf(WorkerNode)
855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
856 expect(workerNode.tasksQueue.size).toBe(0)
857 expect(workerNode.tasksQueue.maxSize).toBe(0)
858 }
859 await pool.destroy()
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
862 numberOfWorkers,
863 './tests/worker-files/thread/testWorker.js'
864 )
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 }
871 await pool.destroy()
872 })
873
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
876 numberOfWorkers,
877 './tests/worker-files/cluster/testWorker.js'
878 )
879 for (const workerNode of pool.workerNodes) {
880 expect(workerNode).toBeInstanceOf(WorkerNode)
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
884 dynamic: false,
885 ready: true
886 })
887 }
888 await pool.destroy()
889 pool = new DynamicThreadPool(
890 Math.floor(numberOfWorkers / 2),
891 numberOfWorkers,
892 './tests/worker-files/thread/testWorker.js'
893 )
894 for (const workerNode of pool.workerNodes) {
895 expect(workerNode).toBeInstanceOf(WorkerNode)
896 expect(workerNode.info).toStrictEqual({
897 id: expect.any(Number),
898 type: WorkerTypes.thread,
899 dynamic: false,
900 ready: true
901 })
902 }
903 await pool.destroy()
904 })
905
906 it('Verify that pool can be started after initialization', async () => {
907 const pool = new FixedClusterPool(
908 numberOfWorkers,
909 './tests/worker-files/cluster/testWorker.js',
910 {
911 startWorkers: false
912 }
913 )
914 expect(pool.info.started).toBe(false)
915 expect(pool.info.ready).toBe(false)
916 expect(pool.workerNodes).toStrictEqual([])
917 await expect(pool.execute()).rejects.toThrowError(
918 new Error('Cannot execute a task on not started pool')
919 )
920 pool.start()
921 expect(pool.info.started).toBe(true)
922 expect(pool.info.ready).toBe(true)
923 expect(pool.workerNodes.length).toBe(numberOfWorkers)
924 for (const workerNode of pool.workerNodes) {
925 expect(workerNode).toBeInstanceOf(WorkerNode)
926 }
927 await pool.destroy()
928 })
929
930 it('Verify that pool execute() arguments are checked', async () => {
931 const pool = new FixedClusterPool(
932 numberOfWorkers,
933 './tests/worker-files/cluster/testWorker.js'
934 )
935 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
936 new TypeError('name argument must be a string')
937 )
938 await expect(pool.execute(undefined, '')).rejects.toThrowError(
939 new TypeError('name argument must not be an empty string')
940 )
941 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
942 new TypeError('transferList argument must be an array')
943 )
944 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
945 "Task function 'unknown' not found"
946 )
947 await pool.destroy()
948 await expect(pool.execute()).rejects.toThrowError(
949 new Error('Cannot execute a task on not started pool')
950 )
951 })
952
953 it('Verify that pool worker tasks usage are computed', async () => {
954 const pool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testWorker.js'
957 )
958 const promises = new Set()
959 const maxMultiplier = 2
960 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
961 promises.add(pool.execute())
962 }
963 for (const workerNode of pool.workerNodes) {
964 expect(workerNode.usage).toStrictEqual({
965 tasks: {
966 executed: 0,
967 executing: maxMultiplier,
968 queued: 0,
969 maxQueued: 0,
970 stolen: 0,
971 failed: 0
972 },
973 runTime: {
974 history: expect.any(CircularArray)
975 },
976 waitTime: {
977 history: expect.any(CircularArray)
978 },
979 elu: {
980 idle: {
981 history: expect.any(CircularArray)
982 },
983 active: {
984 history: expect.any(CircularArray)
985 }
986 }
987 })
988 }
989 await Promise.all(promises)
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
992 tasks: {
993 executed: maxMultiplier,
994 executing: 0,
995 queued: 0,
996 maxQueued: 0,
997 stolen: 0,
998 failed: 0
999 },
1000 runTime: {
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
1004 history: expect.any(CircularArray)
1005 },
1006 elu: {
1007 idle: {
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
1011 history: expect.any(CircularArray)
1012 }
1013 }
1014 })
1015 }
1016 await pool.destroy()
1017 })
1018
1019 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1020 const pool = new DynamicThreadPool(
1021 Math.floor(numberOfWorkers / 2),
1022 numberOfWorkers,
1023 './tests/worker-files/thread/testWorker.js'
1024 )
1025 const promises = new Set()
1026 const maxMultiplier = 2
1027 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1028 promises.add(pool.execute())
1029 }
1030 await Promise.all(promises)
1031 for (const workerNode of pool.workerNodes) {
1032 expect(workerNode.usage).toStrictEqual({
1033 tasks: {
1034 executed: expect.any(Number),
1035 executing: 0,
1036 queued: 0,
1037 maxQueued: 0,
1038 stolen: 0,
1039 failed: 0
1040 },
1041 runTime: {
1042 history: expect.any(CircularArray)
1043 },
1044 waitTime: {
1045 history: expect.any(CircularArray)
1046 },
1047 elu: {
1048 idle: {
1049 history: expect.any(CircularArray)
1050 },
1051 active: {
1052 history: expect.any(CircularArray)
1053 }
1054 }
1055 })
1056 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1057 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1058 numberOfWorkers * maxMultiplier
1059 )
1060 expect(workerNode.usage.runTime.history.length).toBe(0)
1061 expect(workerNode.usage.waitTime.history.length).toBe(0)
1062 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1063 expect(workerNode.usage.elu.active.history.length).toBe(0)
1064 }
1065 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1066 for (const workerNode of pool.workerNodes) {
1067 expect(workerNode.usage).toStrictEqual({
1068 tasks: {
1069 executed: 0,
1070 executing: 0,
1071 queued: 0,
1072 maxQueued: 0,
1073 stolen: 0,
1074 failed: 0
1075 },
1076 runTime: {
1077 history: expect.any(CircularArray)
1078 },
1079 waitTime: {
1080 history: expect.any(CircularArray)
1081 },
1082 elu: {
1083 idle: {
1084 history: expect.any(CircularArray)
1085 },
1086 active: {
1087 history: expect.any(CircularArray)
1088 }
1089 }
1090 })
1091 expect(workerNode.usage.runTime.history.length).toBe(0)
1092 expect(workerNode.usage.waitTime.history.length).toBe(0)
1093 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1094 expect(workerNode.usage.elu.active.history.length).toBe(0)
1095 }
1096 await pool.destroy()
1097 })
1098
1099 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1100 const pool = new DynamicClusterPool(
1101 Math.floor(numberOfWorkers / 2),
1102 numberOfWorkers,
1103 './tests/worker-files/cluster/testWorker.js'
1104 )
1105 expect(pool.emitter.eventNames()).toStrictEqual([])
1106 let poolInfo
1107 let poolReady = 0
1108 pool.emitter.on(PoolEvents.ready, info => {
1109 ++poolReady
1110 poolInfo = info
1111 })
1112 await waitPoolEvents(pool, PoolEvents.ready, 1)
1113 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1114 expect(poolReady).toBe(1)
1115 expect(poolInfo).toStrictEqual({
1116 version,
1117 type: PoolTypes.dynamic,
1118 worker: WorkerTypes.cluster,
1119 started: true,
1120 ready: true,
1121 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1122 minSize: expect.any(Number),
1123 maxSize: expect.any(Number),
1124 workerNodes: expect.any(Number),
1125 idleWorkerNodes: expect.any(Number),
1126 busyWorkerNodes: expect.any(Number),
1127 executedTasks: expect.any(Number),
1128 executingTasks: expect.any(Number),
1129 failedTasks: expect.any(Number)
1130 })
1131 await pool.destroy()
1132 })
1133
1134 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1135 const pool = new FixedThreadPool(
1136 numberOfWorkers,
1137 './tests/worker-files/thread/testWorker.js'
1138 )
1139 expect(pool.emitter.eventNames()).toStrictEqual([])
1140 const promises = new Set()
1141 let poolBusy = 0
1142 let poolInfo
1143 pool.emitter.on(PoolEvents.busy, info => {
1144 ++poolBusy
1145 poolInfo = info
1146 })
1147 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1148 for (let i = 0; i < numberOfWorkers * 2; i++) {
1149 promises.add(pool.execute())
1150 }
1151 await Promise.all(promises)
1152 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1153 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1154 expect(poolBusy).toBe(numberOfWorkers + 1)
1155 expect(poolInfo).toStrictEqual({
1156 version,
1157 type: PoolTypes.fixed,
1158 worker: WorkerTypes.thread,
1159 started: true,
1160 ready: true,
1161 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1162 minSize: expect.any(Number),
1163 maxSize: expect.any(Number),
1164 workerNodes: expect.any(Number),
1165 idleWorkerNodes: expect.any(Number),
1166 busyWorkerNodes: expect.any(Number),
1167 executedTasks: expect.any(Number),
1168 executingTasks: expect.any(Number),
1169 failedTasks: expect.any(Number)
1170 })
1171 await pool.destroy()
1172 })
1173
1174 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1175 const pool = new DynamicThreadPool(
1176 Math.floor(numberOfWorkers / 2),
1177 numberOfWorkers,
1178 './tests/worker-files/thread/testWorker.js'
1179 )
1180 expect(pool.emitter.eventNames()).toStrictEqual([])
1181 const promises = new Set()
1182 let poolFull = 0
1183 let poolInfo
1184 pool.emitter.on(PoolEvents.full, info => {
1185 ++poolFull
1186 poolInfo = info
1187 })
1188 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1189 for (let i = 0; i < numberOfWorkers * 2; i++) {
1190 promises.add(pool.execute())
1191 }
1192 await Promise.all(promises)
1193 expect(poolFull).toBe(1)
1194 expect(poolInfo).toStrictEqual({
1195 version,
1196 type: PoolTypes.dynamic,
1197 worker: WorkerTypes.thread,
1198 started: true,
1199 ready: true,
1200 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1201 minSize: expect.any(Number),
1202 maxSize: expect.any(Number),
1203 workerNodes: expect.any(Number),
1204 idleWorkerNodes: expect.any(Number),
1205 busyWorkerNodes: expect.any(Number),
1206 executedTasks: expect.any(Number),
1207 executingTasks: expect.any(Number),
1208 failedTasks: expect.any(Number)
1209 })
1210 await pool.destroy()
1211 })
1212
1213 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1214 const pool = new FixedThreadPool(
1215 numberOfWorkers,
1216 './tests/worker-files/thread/testWorker.js',
1217 {
1218 enableTasksQueue: true
1219 }
1220 )
1221 sinon.stub(pool, 'hasBackPressure').returns(true)
1222 expect(pool.emitter.eventNames()).toStrictEqual([])
1223 const promises = new Set()
1224 let poolBackPressure = 0
1225 let poolInfo
1226 pool.emitter.on(PoolEvents.backPressure, info => {
1227 ++poolBackPressure
1228 poolInfo = info
1229 })
1230 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1231 for (let i = 0; i < numberOfWorkers + 1; i++) {
1232 promises.add(pool.execute())
1233 }
1234 await Promise.all(promises)
1235 expect(poolBackPressure).toBe(1)
1236 expect(poolInfo).toStrictEqual({
1237 version,
1238 type: PoolTypes.fixed,
1239 worker: WorkerTypes.thread,
1240 started: true,
1241 ready: true,
1242 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1243 minSize: expect.any(Number),
1244 maxSize: expect.any(Number),
1245 workerNodes: expect.any(Number),
1246 idleWorkerNodes: expect.any(Number),
1247 busyWorkerNodes: expect.any(Number),
1248 executedTasks: expect.any(Number),
1249 executingTasks: expect.any(Number),
1250 maxQueuedTasks: expect.any(Number),
1251 queuedTasks: expect.any(Number),
1252 backPressure: true,
1253 stolenTasks: expect.any(Number),
1254 failedTasks: expect.any(Number)
1255 })
1256 expect(pool.hasBackPressure.called).toBe(true)
1257 await pool.destroy()
1258 })
1259
1260 it('Verify that hasTaskFunction() is working', async () => {
1261 const dynamicThreadPool = new DynamicThreadPool(
1262 Math.floor(numberOfWorkers / 2),
1263 numberOfWorkers,
1264 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1265 )
1266 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1267 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1268 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1269 true
1270 )
1271 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1272 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1273 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1274 await dynamicThreadPool.destroy()
1275 const fixedClusterPool = new FixedClusterPool(
1276 numberOfWorkers,
1277 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1278 )
1279 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1280 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1281 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1282 true
1283 )
1284 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1285 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1286 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1287 await fixedClusterPool.destroy()
1288 })
1289
1290 it('Verify that addTaskFunction() is working', async () => {
1291 const dynamicThreadPool = new DynamicThreadPool(
1292 Math.floor(numberOfWorkers / 2),
1293 numberOfWorkers,
1294 './tests/worker-files/thread/testWorker.js'
1295 )
1296 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1297 await expect(
1298 dynamicThreadPool.addTaskFunction(0, () => {})
1299 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1300 await expect(
1301 dynamicThreadPool.addTaskFunction('', () => {})
1302 ).rejects.toThrowError(
1303 new TypeError('name argument must not be an empty string')
1304 )
1305 await expect(
1306 dynamicThreadPool.addTaskFunction('test', 0)
1307 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1308 await expect(
1309 dynamicThreadPool.addTaskFunction('test', '')
1310 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1311 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1312 DEFAULT_TASK_NAME,
1313 'test'
1314 ])
1315 const echoTaskFunction = data => {
1316 return data
1317 }
1318 await expect(
1319 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1320 ).resolves.toBe(true)
1321 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1322 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1323 echoTaskFunction
1324 )
1325 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1326 DEFAULT_TASK_NAME,
1327 'test',
1328 'echo'
1329 ])
1330 const taskFunctionData = { test: 'test' }
1331 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1332 expect(echoResult).toStrictEqual(taskFunctionData)
1333 for (const workerNode of dynamicThreadPool.workerNodes) {
1334 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1335 tasks: {
1336 executed: expect.any(Number),
1337 executing: 0,
1338 queued: 0,
1339 stolen: 0,
1340 failed: 0
1341 },
1342 runTime: {
1343 history: new CircularArray()
1344 },
1345 waitTime: {
1346 history: new CircularArray()
1347 },
1348 elu: {
1349 idle: {
1350 history: new CircularArray()
1351 },
1352 active: {
1353 history: new CircularArray()
1354 }
1355 }
1356 })
1357 }
1358 await dynamicThreadPool.destroy()
1359 })
1360
1361 it('Verify that removeTaskFunction() is working', async () => {
1362 const dynamicThreadPool = new DynamicThreadPool(
1363 Math.floor(numberOfWorkers / 2),
1364 numberOfWorkers,
1365 './tests/worker-files/thread/testWorker.js'
1366 )
1367 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1368 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1369 DEFAULT_TASK_NAME,
1370 'test'
1371 ])
1372 await expect(
1373 dynamicThreadPool.removeTaskFunction('test')
1374 ).rejects.toThrowError(
1375 new Error('Cannot remove a task function not handled on the pool side')
1376 )
1377 const echoTaskFunction = data => {
1378 return data
1379 }
1380 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1381 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1382 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1383 echoTaskFunction
1384 )
1385 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1386 DEFAULT_TASK_NAME,
1387 'test',
1388 'echo'
1389 ])
1390 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1391 true
1392 )
1393 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1394 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1395 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1396 DEFAULT_TASK_NAME,
1397 'test'
1398 ])
1399 await dynamicThreadPool.destroy()
1400 })
1401
1402 it('Verify that listTaskFunctionNames() is working', async () => {
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1405 numberOfWorkers,
1406 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1407 )
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1409 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1410 DEFAULT_TASK_NAME,
1411 'jsonIntegerSerialization',
1412 'factorial',
1413 'fibonacci'
1414 ])
1415 await dynamicThreadPool.destroy()
1416 const fixedClusterPool = new FixedClusterPool(
1417 numberOfWorkers,
1418 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1419 )
1420 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1421 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1422 DEFAULT_TASK_NAME,
1423 'jsonIntegerSerialization',
1424 'factorial',
1425 'fibonacci'
1426 ])
1427 await fixedClusterPool.destroy()
1428 })
1429
1430 it('Verify that setDefaultTaskFunction() is working', async () => {
1431 const dynamicThreadPool = new DynamicThreadPool(
1432 Math.floor(numberOfWorkers / 2),
1433 numberOfWorkers,
1434 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1435 )
1436 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1437 await expect(
1438 dynamicThreadPool.setDefaultTaskFunction(0)
1439 ).rejects.toThrowError(
1440 new Error(
1441 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1442 )
1443 )
1444 await expect(
1445 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1446 ).rejects.toThrowError(
1447 new Error(
1448 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1449 )
1450 )
1451 await expect(
1452 dynamicThreadPool.setDefaultTaskFunction('unknown')
1453 ).rejects.toThrowError(
1454 new Error(
1455 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1456 )
1457 )
1458 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1459 DEFAULT_TASK_NAME,
1460 'jsonIntegerSerialization',
1461 'factorial',
1462 'fibonacci'
1463 ])
1464 await expect(
1465 dynamicThreadPool.setDefaultTaskFunction('factorial')
1466 ).resolves.toBe(true)
1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1468 DEFAULT_TASK_NAME,
1469 'factorial',
1470 'jsonIntegerSerialization',
1471 'fibonacci'
1472 ])
1473 await expect(
1474 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1475 ).resolves.toBe(true)
1476 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1477 DEFAULT_TASK_NAME,
1478 'fibonacci',
1479 'jsonIntegerSerialization',
1480 'factorial'
1481 ])
1482 })
1483
1484 it('Verify that multiple task functions worker is working', async () => {
1485 const pool = new DynamicClusterPool(
1486 Math.floor(numberOfWorkers / 2),
1487 numberOfWorkers,
1488 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1489 )
1490 const data = { n: 10 }
1491 const result0 = await pool.execute(data)
1492 expect(result0).toStrictEqual({ ok: 1 })
1493 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1494 expect(result1).toStrictEqual({ ok: 1 })
1495 const result2 = await pool.execute(data, 'factorial')
1496 expect(result2).toBe(3628800)
1497 const result3 = await pool.execute(data, 'fibonacci')
1498 expect(result3).toBe(55)
1499 expect(pool.info.executingTasks).toBe(0)
1500 expect(pool.info.executedTasks).toBe(4)
1501 for (const workerNode of pool.workerNodes) {
1502 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1503 DEFAULT_TASK_NAME,
1504 'jsonIntegerSerialization',
1505 'factorial',
1506 'fibonacci'
1507 ])
1508 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1509 for (const name of pool.listTaskFunctionNames()) {
1510 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1511 tasks: {
1512 executed: expect.any(Number),
1513 executing: 0,
1514 failed: 0,
1515 queued: 0,
1516 stolen: 0
1517 },
1518 runTime: {
1519 history: expect.any(CircularArray)
1520 },
1521 waitTime: {
1522 history: expect.any(CircularArray)
1523 },
1524 elu: {
1525 idle: {
1526 history: expect.any(CircularArray)
1527 },
1528 active: {
1529 history: expect.any(CircularArray)
1530 }
1531 }
1532 })
1533 expect(
1534 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1535 ).toBeGreaterThan(0)
1536 }
1537 expect(
1538 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1539 ).toStrictEqual(
1540 workerNode.getTaskFunctionWorkerUsage(
1541 workerNode.info.taskFunctionNames[1]
1542 )
1543 )
1544 }
1545 await pool.destroy()
1546 })
1547
1548 it('Verify sendKillMessageToWorker()', async () => {
1549 const pool = new DynamicClusterPool(
1550 Math.floor(numberOfWorkers / 2),
1551 numberOfWorkers,
1552 './tests/worker-files/cluster/testWorker.js'
1553 )
1554 const workerNodeKey = 0
1555 await expect(
1556 pool.sendKillMessageToWorker(workerNodeKey)
1557 ).resolves.toBeUndefined()
1558 await pool.destroy()
1559 })
1560
1561 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1562 const pool = new DynamicClusterPool(
1563 Math.floor(numberOfWorkers / 2),
1564 numberOfWorkers,
1565 './tests/worker-files/cluster/testWorker.js'
1566 )
1567 const workerNodeKey = 0
1568 await expect(
1569 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1570 taskFunctionOperation: 'add',
1571 taskFunctionName: 'empty',
1572 taskFunction: (() => {}).toString()
1573 })
1574 ).resolves.toBe(true)
1575 expect(
1576 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1577 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1578 await pool.destroy()
1579 })
1580
1581 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1582 const pool = new DynamicClusterPool(
1583 Math.floor(numberOfWorkers / 2),
1584 numberOfWorkers,
1585 './tests/worker-files/cluster/testWorker.js'
1586 )
1587 await expect(
1588 pool.sendTaskFunctionOperationToWorkers({
1589 taskFunctionOperation: 'add',
1590 taskFunctionName: 'empty',
1591 taskFunction: (() => {}).toString()
1592 })
1593 ).resolves.toBe(true)
1594 for (const workerNode of pool.workerNodes) {
1595 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1596 DEFAULT_TASK_NAME,
1597 'test',
1598 'empty'
1599 ])
1600 }
1601 await pool.destroy()
1602 })
1603 })