refactor: factor out worker helpers
[poolifier.git] / tests / pools / abstract-pool.test.js
1 const { EventEmitter } = require('node:events')
2 const { expect } = require('expect')
3 const sinon = require('sinon')
4 const {
5 DynamicClusterPool,
6 DynamicThreadPool,
7 FixedClusterPool,
8 FixedThreadPool,
9 PoolEvents,
10 PoolTypes,
11 WorkerChoiceStrategies,
12 WorkerTypes
13 } = require('../../lib')
14 const { CircularArray } = require('../../lib/circular-array')
15 const { Deque } = require('../../lib/deque')
16 const { DEFAULT_TASK_NAME } = require('../../lib/utils')
17 const { version } = require('../../package.json')
18 const { waitPoolEvents } = require('../test-utils')
19 const { WorkerNode } = require('../../lib/pools/worker-node')
20
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers = 2
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 afterEach(() => {
30 sinon.restore()
31 })
32
33 it('Simulate pool creation from a non main thread/process', () => {
34 expect(
35 () =>
36 new StubPoolWithIsMain(
37 numberOfWorkers,
38 './tests/worker-files/thread/testWorker.js',
39 {
40 errorHandler: e => console.error(e)
41 }
42 )
43 ).toThrowError(
44 new Error(
45 'Cannot start a pool from a worker with the same type as the pool'
46 )
47 )
48 })
49
50 it('Verify that pool statuses properties are set', async () => {
51 const pool = new FixedThreadPool(
52 numberOfWorkers,
53 './tests/worker-files/thread/testWorker.js'
54 )
55 expect(pool.starting).toBe(false)
56 expect(pool.started).toBe(true)
57 await pool.destroy()
58 })
59
60 it('Verify that filePath is checked', () => {
61 const expectedError = new Error(
62 'Please specify a file with a worker implementation'
63 )
64 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
65 expectedError
66 )
67 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
68 expectedError
69 )
70 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
71 expectedError
72 )
73 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
74 expectedError
75 )
76 expect(
77 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
79 })
80
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(() => new FixedThreadPool()).toThrowError(
83 new Error(
84 'Cannot instantiate a pool without specifying the number of workers'
85 )
86 )
87 })
88
89 it('Verify that a negative number of workers is checked', () => {
90 expect(
91 () =>
92 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
93 ).toThrowError(
94 new RangeError(
95 'Cannot instantiate a pool with a negative number of workers'
96 )
97 )
98 })
99
100 it('Verify that a non integer number of workers is checked', () => {
101 expect(
102 () =>
103 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
104 ).toThrowError(
105 new TypeError(
106 'Cannot instantiate a pool with a non safe integer number of workers'
107 )
108 )
109 })
110
111 it('Verify that dynamic pool sizing is checked', () => {
112 expect(
113 () =>
114 new DynamicClusterPool(
115 1,
116 undefined,
117 './tests/worker-files/cluster/testWorker.js'
118 )
119 ).toThrowError(
120 new TypeError(
121 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
122 )
123 )
124 expect(
125 () =>
126 new DynamicThreadPool(
127 0.5,
128 1,
129 './tests/worker-files/thread/testWorker.js'
130 )
131 ).toThrowError(
132 new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 )
136 expect(
137 () =>
138 new DynamicClusterPool(
139 0,
140 0.5,
141 './tests/worker-files/cluster/testWorker.js'
142 )
143 ).toThrowError(
144 new TypeError(
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
146 )
147 )
148 expect(
149 () =>
150 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
151 ).toThrowError(
152 new RangeError(
153 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
159 ).toThrowError(
160 new RangeError(
161 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
162 )
163 )
164 expect(
165 () =>
166 new DynamicClusterPool(
167 1,
168 1,
169 './tests/worker-files/cluster/testWorker.js'
170 )
171 ).toThrowError(
172 new RangeError(
173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
174 )
175 )
176 })
177
178 it('Verify that pool options are checked', async () => {
179 let pool = new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js'
182 )
183 expect(pool.emitter).toBeInstanceOf(EventEmitter)
184 expect(pool.opts).toStrictEqual({
185 startWorkers: true,
186 enableEvents: true,
187 restartWorkerOnError: true,
188 enableTasksQueue: false,
189 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
190 workerChoiceStrategyOptions: {
191 retries: 6,
192 runTime: { median: false },
193 waitTime: { median: false },
194 elu: { median: false }
195 }
196 })
197 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
198 retries: 6,
199 runTime: { median: false },
200 waitTime: { median: false },
201 elu: { median: false }
202 })
203 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
204 .workerChoiceStrategies) {
205 expect(workerChoiceStrategy.opts).toStrictEqual({
206 retries: 6,
207 runTime: { median: false },
208 waitTime: { median: false },
209 elu: { median: false }
210 })
211 }
212 await pool.destroy()
213 const testHandler = () => console.info('test handler executed')
214 pool = new FixedThreadPool(
215 numberOfWorkers,
216 './tests/worker-files/thread/testWorker.js',
217 {
218 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
219 workerChoiceStrategyOptions: {
220 runTime: { median: true },
221 weights: { 0: 300, 1: 200 }
222 },
223 enableEvents: false,
224 restartWorkerOnError: false,
225 enableTasksQueue: true,
226 tasksQueueOptions: { concurrency: 2 },
227 messageHandler: testHandler,
228 errorHandler: testHandler,
229 onlineHandler: testHandler,
230 exitHandler: testHandler
231 }
232 )
233 expect(pool.emitter).toBeUndefined()
234 expect(pool.opts).toStrictEqual({
235 startWorkers: true,
236 enableEvents: false,
237 restartWorkerOnError: false,
238 enableTasksQueue: true,
239 tasksQueueOptions: {
240 concurrency: 2,
241 size: Math.pow(numberOfWorkers, 2),
242 taskStealing: true,
243 tasksStealingOnBackPressure: true
244 },
245 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
246 workerChoiceStrategyOptions: {
247 retries: 6,
248 runTime: { median: true },
249 waitTime: { median: false },
250 elu: { median: false },
251 weights: { 0: 300, 1: 200 }
252 },
253 onlineHandler: testHandler,
254 messageHandler: testHandler,
255 errorHandler: testHandler,
256 exitHandler: testHandler
257 })
258 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
259 retries: 6,
260 runTime: { median: true },
261 waitTime: { median: false },
262 elu: { median: false },
263 weights: { 0: 300, 1: 200 }
264 })
265 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
266 .workerChoiceStrategies) {
267 expect(workerChoiceStrategy.opts).toStrictEqual({
268 retries: 6,
269 runTime: { median: true },
270 waitTime: { median: false },
271 elu: { median: false },
272 weights: { 0: 300, 1: 200 }
273 })
274 }
275 await pool.destroy()
276 })
277
278 it('Verify that pool options are validated', async () => {
279 expect(
280 () =>
281 new FixedThreadPool(
282 numberOfWorkers,
283 './tests/worker-files/thread/testWorker.js',
284 {
285 workerChoiceStrategy: 'invalidStrategy'
286 }
287 )
288 ).toThrowError(
289 new Error("Invalid worker choice strategy 'invalidStrategy'")
290 )
291 expect(
292 () =>
293 new FixedThreadPool(
294 numberOfWorkers,
295 './tests/worker-files/thread/testWorker.js',
296 {
297 workerChoiceStrategyOptions: {
298 retries: 'invalidChoiceRetries'
299 }
300 }
301 )
302 ).toThrowError(
303 new TypeError(
304 'Invalid worker choice strategy options: retries must be an integer'
305 )
306 )
307 expect(
308 () =>
309 new FixedThreadPool(
310 numberOfWorkers,
311 './tests/worker-files/thread/testWorker.js',
312 {
313 workerChoiceStrategyOptions: {
314 retries: -1
315 }
316 }
317 )
318 ).toThrowError(
319 new RangeError(
320 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
321 )
322 )
323 expect(
324 () =>
325 new FixedThreadPool(
326 numberOfWorkers,
327 './tests/worker-files/thread/testWorker.js',
328 {
329 workerChoiceStrategyOptions: { weights: {} }
330 }
331 )
332 ).toThrowError(
333 new Error(
334 'Invalid worker choice strategy options: must have a weight for each worker node'
335 )
336 )
337 expect(
338 () =>
339 new FixedThreadPool(
340 numberOfWorkers,
341 './tests/worker-files/thread/testWorker.js',
342 {
343 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
344 }
345 )
346 ).toThrowError(
347 new Error(
348 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
349 )
350 )
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
355 './tests/worker-files/thread/testWorker.js',
356 {
357 enableTasksQueue: true,
358 tasksQueueOptions: 'invalidTasksQueueOptions'
359 }
360 )
361 ).toThrowError(
362 new TypeError('Invalid tasks queue options: must be a plain object')
363 )
364 expect(
365 () =>
366 new FixedThreadPool(
367 numberOfWorkers,
368 './tests/worker-files/thread/testWorker.js',
369 {
370 enableTasksQueue: true,
371 tasksQueueOptions: { concurrency: 0 }
372 }
373 )
374 ).toThrowError(
375 new RangeError(
376 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
377 )
378 )
379 expect(
380 () =>
381 new FixedThreadPool(
382 numberOfWorkers,
383 './tests/worker-files/thread/testWorker.js',
384 {
385 enableTasksQueue: true,
386 tasksQueueOptions: { concurrency: -1 }
387 }
388 )
389 ).toThrowError(
390 new RangeError(
391 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
392 )
393 )
394 expect(
395 () =>
396 new FixedThreadPool(
397 numberOfWorkers,
398 './tests/worker-files/thread/testWorker.js',
399 {
400 enableTasksQueue: true,
401 tasksQueueOptions: { concurrency: 0.2 }
402 }
403 )
404 ).toThrowError(
405 new TypeError('Invalid worker node tasks concurrency: must be an integer')
406 )
407 expect(
408 () =>
409 new FixedThreadPool(
410 numberOfWorkers,
411 './tests/worker-files/thread/testWorker.js',
412 {
413 enableTasksQueue: true,
414 tasksQueueOptions: { size: 0 }
415 }
416 )
417 ).toThrowError(
418 new RangeError(
419 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
420 )
421 )
422 expect(
423 () =>
424 new FixedThreadPool(
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.js',
427 {
428 enableTasksQueue: true,
429 tasksQueueOptions: { size: -1 }
430 }
431 )
432 ).toThrowError(
433 new RangeError(
434 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
435 )
436 )
437 expect(
438 () =>
439 new FixedThreadPool(
440 numberOfWorkers,
441 './tests/worker-files/thread/testWorker.js',
442 {
443 enableTasksQueue: true,
444 tasksQueueOptions: { size: 0.2 }
445 }
446 )
447 ).toThrowError(
448 new TypeError('Invalid worker node tasks queue size: must be an integer')
449 )
450 })
451
452 it('Verify that pool worker choice strategy options can be set', async () => {
453 const pool = new FixedThreadPool(
454 numberOfWorkers,
455 './tests/worker-files/thread/testWorker.js',
456 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
457 )
458 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
459 retries: 6,
460 runTime: { median: false },
461 waitTime: { median: false },
462 elu: { median: false }
463 })
464 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
465 retries: 6,
466 runTime: { median: false },
467 waitTime: { median: false },
468 elu: { median: false }
469 })
470 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
471 .workerChoiceStrategies) {
472 expect(workerChoiceStrategy.opts).toStrictEqual({
473 retries: 6,
474 runTime: { median: false },
475 waitTime: { median: false },
476 elu: { median: false }
477 })
478 }
479 expect(
480 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 ).toStrictEqual({
482 runTime: {
483 aggregate: true,
484 average: true,
485 median: false
486 },
487 waitTime: {
488 aggregate: false,
489 average: false,
490 median: false
491 },
492 elu: {
493 aggregate: true,
494 average: true,
495 median: false
496 }
497 })
498 pool.setWorkerChoiceStrategyOptions({
499 runTime: { median: true },
500 elu: { median: true }
501 })
502 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
503 retries: 6,
504 runTime: { median: true },
505 waitTime: { median: false },
506 elu: { median: true }
507 })
508 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
509 retries: 6,
510 runTime: { median: true },
511 waitTime: { median: false },
512 elu: { median: true }
513 })
514 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
515 .workerChoiceStrategies) {
516 expect(workerChoiceStrategy.opts).toStrictEqual({
517 retries: 6,
518 runTime: { median: true },
519 waitTime: { median: false },
520 elu: { median: true }
521 })
522 }
523 expect(
524 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
525 ).toStrictEqual({
526 runTime: {
527 aggregate: true,
528 average: false,
529 median: true
530 },
531 waitTime: {
532 aggregate: false,
533 average: false,
534 median: false
535 },
536 elu: {
537 aggregate: true,
538 average: false,
539 median: true
540 }
541 })
542 pool.setWorkerChoiceStrategyOptions({
543 runTime: { median: false },
544 elu: { median: false }
545 })
546 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
547 retries: 6,
548 runTime: { median: false },
549 waitTime: { median: false },
550 elu: { median: false }
551 })
552 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
553 retries: 6,
554 runTime: { median: false },
555 waitTime: { median: false },
556 elu: { median: false }
557 })
558 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
559 .workerChoiceStrategies) {
560 expect(workerChoiceStrategy.opts).toStrictEqual({
561 retries: 6,
562 runTime: { median: false },
563 waitTime: { median: false },
564 elu: { median: false }
565 })
566 }
567 expect(
568 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
569 ).toStrictEqual({
570 runTime: {
571 aggregate: true,
572 average: true,
573 median: false
574 },
575 waitTime: {
576 aggregate: false,
577 average: false,
578 median: false
579 },
580 elu: {
581 aggregate: true,
582 average: true,
583 median: false
584 }
585 })
586 expect(() =>
587 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
588 ).toThrowError(
589 new TypeError(
590 'Invalid worker choice strategy options: must be a plain object'
591 )
592 )
593 expect(() =>
594 pool.setWorkerChoiceStrategyOptions({
595 retries: 'invalidChoiceRetries'
596 })
597 ).toThrowError(
598 new TypeError(
599 'Invalid worker choice strategy options: retries must be an integer'
600 )
601 )
602 expect(() =>
603 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
604 ).toThrowError(
605 new RangeError(
606 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
607 )
608 )
609 expect(() =>
610 pool.setWorkerChoiceStrategyOptions({ weights: {} })
611 ).toThrowError(
612 new Error(
613 'Invalid worker choice strategy options: must have a weight for each worker node'
614 )
615 )
616 expect(() =>
617 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
618 ).toThrowError(
619 new Error(
620 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
621 )
622 )
623 await pool.destroy()
624 })
625
626 it('Verify that pool tasks queue can be enabled/disabled', async () => {
627 const pool = new FixedThreadPool(
628 numberOfWorkers,
629 './tests/worker-files/thread/testWorker.js'
630 )
631 expect(pool.opts.enableTasksQueue).toBe(false)
632 expect(pool.opts.tasksQueueOptions).toBeUndefined()
633 for (const workerNode of pool.workerNodes) {
634 expect(workerNode.onEmptyQueue).toBeUndefined()
635 expect(workerNode.onBackPressure).toBeUndefined()
636 }
637 pool.enableTasksQueue(true)
638 expect(pool.opts.enableTasksQueue).toBe(true)
639 expect(pool.opts.tasksQueueOptions).toStrictEqual({
640 concurrency: 1,
641 size: Math.pow(numberOfWorkers, 2),
642 taskStealing: true,
643 tasksStealingOnBackPressure: true
644 })
645 for (const workerNode of pool.workerNodes) {
646 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
647 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
648 }
649 pool.enableTasksQueue(true, { concurrency: 2 })
650 expect(pool.opts.enableTasksQueue).toBe(true)
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 concurrency: 2,
653 size: Math.pow(numberOfWorkers, 2),
654 taskStealing: true,
655 tasksStealingOnBackPressure: true
656 })
657 for (const workerNode of pool.workerNodes) {
658 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
659 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
660 }
661 pool.enableTasksQueue(false)
662 expect(pool.opts.enableTasksQueue).toBe(false)
663 expect(pool.opts.tasksQueueOptions).toBeUndefined()
664 for (const workerNode of pool.workerNodes) {
665 expect(workerNode.onEmptyQueue).toBeUndefined()
666 expect(workerNode.onBackPressure).toBeUndefined()
667 }
668 await pool.destroy()
669 })
670
671 it('Verify that pool tasks queue options can be set', async () => {
672 const pool = new FixedThreadPool(
673 numberOfWorkers,
674 './tests/worker-files/thread/testWorker.js',
675 { enableTasksQueue: true }
676 )
677 expect(pool.opts.tasksQueueOptions).toStrictEqual({
678 concurrency: 1,
679 size: Math.pow(numberOfWorkers, 2),
680 taskStealing: true,
681 tasksStealingOnBackPressure: true
682 })
683 for (const workerNode of pool.workerNodes) {
684 expect(workerNode.tasksQueueBackPressureSize).toBe(
685 pool.opts.tasksQueueOptions.size
686 )
687 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
688 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
689 }
690 pool.setTasksQueueOptions({
691 concurrency: 2,
692 size: 2,
693 taskStealing: false,
694 tasksStealingOnBackPressure: false
695 })
696 expect(pool.opts.tasksQueueOptions).toStrictEqual({
697 concurrency: 2,
698 size: 2,
699 taskStealing: false,
700 tasksStealingOnBackPressure: false
701 })
702 for (const workerNode of pool.workerNodes) {
703 expect(workerNode.tasksQueueBackPressureSize).toBe(
704 pool.opts.tasksQueueOptions.size
705 )
706 expect(workerNode.onEmptyQueue).toBeUndefined()
707 expect(workerNode.onBackPressure).toBeUndefined()
708 }
709 pool.setTasksQueueOptions({
710 concurrency: 1,
711 taskStealing: true,
712 tasksStealingOnBackPressure: true
713 })
714 expect(pool.opts.tasksQueueOptions).toStrictEqual({
715 concurrency: 1,
716 size: Math.pow(numberOfWorkers, 2),
717 taskStealing: true,
718 tasksStealingOnBackPressure: true
719 })
720 for (const workerNode of pool.workerNodes) {
721 expect(workerNode.tasksQueueBackPressureSize).toBe(
722 pool.opts.tasksQueueOptions.size
723 )
724 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
725 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
726 }
727 expect(() =>
728 pool.setTasksQueueOptions('invalidTasksQueueOptions')
729 ).toThrowError(
730 new TypeError('Invalid tasks queue options: must be a plain object')
731 )
732 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
733 new RangeError(
734 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
735 )
736 )
737 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
738 new RangeError(
739 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
740 )
741 )
742 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
743 new TypeError('Invalid worker node tasks concurrency: must be an integer')
744 )
745 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
746 new RangeError(
747 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
748 )
749 )
750 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
751 new RangeError(
752 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
753 )
754 )
755 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
756 new TypeError('Invalid worker node tasks queue size: must be an integer')
757 )
758 await pool.destroy()
759 })
760
761 it('Verify that pool info is set', async () => {
762 let pool = new FixedThreadPool(
763 numberOfWorkers,
764 './tests/worker-files/thread/testWorker.js'
765 )
766 expect(pool.info).toStrictEqual({
767 version,
768 type: PoolTypes.fixed,
769 worker: WorkerTypes.thread,
770 started: true,
771 ready: true,
772 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
773 minSize: numberOfWorkers,
774 maxSize: numberOfWorkers,
775 workerNodes: numberOfWorkers,
776 idleWorkerNodes: numberOfWorkers,
777 busyWorkerNodes: 0,
778 executedTasks: 0,
779 executingTasks: 0,
780 failedTasks: 0
781 })
782 await pool.destroy()
783 pool = new DynamicClusterPool(
784 Math.floor(numberOfWorkers / 2),
785 numberOfWorkers,
786 './tests/worker-files/cluster/testWorker.js'
787 )
788 expect(pool.info).toStrictEqual({
789 version,
790 type: PoolTypes.dynamic,
791 worker: WorkerTypes.cluster,
792 started: true,
793 ready: true,
794 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
795 minSize: Math.floor(numberOfWorkers / 2),
796 maxSize: numberOfWorkers,
797 workerNodes: Math.floor(numberOfWorkers / 2),
798 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
799 busyWorkerNodes: 0,
800 executedTasks: 0,
801 executingTasks: 0,
802 failedTasks: 0
803 })
804 await pool.destroy()
805 })
806
807 it('Verify that pool worker tasks usage are initialized', async () => {
808 const pool = new FixedClusterPool(
809 numberOfWorkers,
810 './tests/worker-files/cluster/testWorker.js'
811 )
812 for (const workerNode of pool.workerNodes) {
813 expect(workerNode).toBeInstanceOf(WorkerNode)
814 expect(workerNode.usage).toStrictEqual({
815 tasks: {
816 executed: 0,
817 executing: 0,
818 queued: 0,
819 maxQueued: 0,
820 stolen: 0,
821 failed: 0
822 },
823 runTime: {
824 history: new CircularArray()
825 },
826 waitTime: {
827 history: new CircularArray()
828 },
829 elu: {
830 idle: {
831 history: new CircularArray()
832 },
833 active: {
834 history: new CircularArray()
835 }
836 }
837 })
838 }
839 await pool.destroy()
840 })
841
842 it('Verify that pool worker tasks queue are initialized', async () => {
843 let pool = new FixedClusterPool(
844 numberOfWorkers,
845 './tests/worker-files/cluster/testWorker.js'
846 )
847 for (const workerNode of pool.workerNodes) {
848 expect(workerNode).toBeInstanceOf(WorkerNode)
849 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
850 expect(workerNode.tasksQueue.size).toBe(0)
851 expect(workerNode.tasksQueue.maxSize).toBe(0)
852 }
853 await pool.destroy()
854 pool = new DynamicThreadPool(
855 Math.floor(numberOfWorkers / 2),
856 numberOfWorkers,
857 './tests/worker-files/thread/testWorker.js'
858 )
859 for (const workerNode of pool.workerNodes) {
860 expect(workerNode).toBeInstanceOf(WorkerNode)
861 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
862 expect(workerNode.tasksQueue.size).toBe(0)
863 expect(workerNode.tasksQueue.maxSize).toBe(0)
864 }
865 await pool.destroy()
866 })
867
868 it('Verify that pool worker info are initialized', async () => {
869 let pool = new FixedClusterPool(
870 numberOfWorkers,
871 './tests/worker-files/cluster/testWorker.js'
872 )
873 for (const workerNode of pool.workerNodes) {
874 expect(workerNode).toBeInstanceOf(WorkerNode)
875 expect(workerNode.info).toStrictEqual({
876 id: expect.any(Number),
877 type: WorkerTypes.cluster,
878 dynamic: false,
879 ready: true
880 })
881 }
882 await pool.destroy()
883 pool = new DynamicThreadPool(
884 Math.floor(numberOfWorkers / 2),
885 numberOfWorkers,
886 './tests/worker-files/thread/testWorker.js'
887 )
888 for (const workerNode of pool.workerNodes) {
889 expect(workerNode).toBeInstanceOf(WorkerNode)
890 expect(workerNode.info).toStrictEqual({
891 id: expect.any(Number),
892 type: WorkerTypes.thread,
893 dynamic: false,
894 ready: true
895 })
896 }
897 await pool.destroy()
898 })
899
900 it('Verify that pool can be started after initialization', async () => {
901 const pool = new FixedClusterPool(
902 numberOfWorkers,
903 './tests/worker-files/cluster/testWorker.js',
904 {
905 startWorkers: false
906 }
907 )
908 expect(pool.info.started).toBe(false)
909 expect(pool.info.ready).toBe(false)
910 expect(pool.workerNodes).toStrictEqual([])
911 await expect(pool.execute()).rejects.toThrowError(
912 new Error('Cannot execute a task on not started pool')
913 )
914 pool.start()
915 expect(pool.info.started).toBe(true)
916 expect(pool.info.ready).toBe(true)
917 expect(pool.workerNodes.length).toBe(numberOfWorkers)
918 for (const workerNode of pool.workerNodes) {
919 expect(workerNode).toBeInstanceOf(WorkerNode)
920 }
921 await pool.destroy()
922 })
923
924 it('Verify that pool execute() arguments are checked', async () => {
925 const pool = new FixedClusterPool(
926 numberOfWorkers,
927 './tests/worker-files/cluster/testWorker.js'
928 )
929 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
930 new TypeError('name argument must be a string')
931 )
932 await expect(pool.execute(undefined, '')).rejects.toThrowError(
933 new TypeError('name argument must not be an empty string')
934 )
935 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
936 new TypeError('transferList argument must be an array')
937 )
938 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
939 "Task function 'unknown' not found"
940 )
941 await pool.destroy()
942 await expect(pool.execute()).rejects.toThrowError(
943 new Error('Cannot execute a task on not started pool')
944 )
945 })
946
947 it('Verify that pool worker tasks usage are computed', async () => {
948 const pool = new FixedClusterPool(
949 numberOfWorkers,
950 './tests/worker-files/cluster/testWorker.js'
951 )
952 const promises = new Set()
953 const maxMultiplier = 2
954 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
955 promises.add(pool.execute())
956 }
957 for (const workerNode of pool.workerNodes) {
958 expect(workerNode.usage).toStrictEqual({
959 tasks: {
960 executed: 0,
961 executing: maxMultiplier,
962 queued: 0,
963 maxQueued: 0,
964 stolen: 0,
965 failed: 0
966 },
967 runTime: {
968 history: expect.any(CircularArray)
969 },
970 waitTime: {
971 history: expect.any(CircularArray)
972 },
973 elu: {
974 idle: {
975 history: expect.any(CircularArray)
976 },
977 active: {
978 history: expect.any(CircularArray)
979 }
980 }
981 })
982 }
983 await Promise.all(promises)
984 for (const workerNode of pool.workerNodes) {
985 expect(workerNode.usage).toStrictEqual({
986 tasks: {
987 executed: maxMultiplier,
988 executing: 0,
989 queued: 0,
990 maxQueued: 0,
991 stolen: 0,
992 failed: 0
993 },
994 runTime: {
995 history: expect.any(CircularArray)
996 },
997 waitTime: {
998 history: expect.any(CircularArray)
999 },
1000 elu: {
1001 idle: {
1002 history: expect.any(CircularArray)
1003 },
1004 active: {
1005 history: expect.any(CircularArray)
1006 }
1007 }
1008 })
1009 }
1010 await pool.destroy()
1011 })
1012
1013 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1014 const pool = new DynamicThreadPool(
1015 Math.floor(numberOfWorkers / 2),
1016 numberOfWorkers,
1017 './tests/worker-files/thread/testWorker.js'
1018 )
1019 const promises = new Set()
1020 const maxMultiplier = 2
1021 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1022 promises.add(pool.execute())
1023 }
1024 await Promise.all(promises)
1025 for (const workerNode of pool.workerNodes) {
1026 expect(workerNode.usage).toStrictEqual({
1027 tasks: {
1028 executed: expect.any(Number),
1029 executing: 0,
1030 queued: 0,
1031 maxQueued: 0,
1032 stolen: 0,
1033 failed: 0
1034 },
1035 runTime: {
1036 history: expect.any(CircularArray)
1037 },
1038 waitTime: {
1039 history: expect.any(CircularArray)
1040 },
1041 elu: {
1042 idle: {
1043 history: expect.any(CircularArray)
1044 },
1045 active: {
1046 history: expect.any(CircularArray)
1047 }
1048 }
1049 })
1050 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1051 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1052 numberOfWorkers * maxMultiplier
1053 )
1054 expect(workerNode.usage.runTime.history.length).toBe(0)
1055 expect(workerNode.usage.waitTime.history.length).toBe(0)
1056 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1057 expect(workerNode.usage.elu.active.history.length).toBe(0)
1058 }
1059 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1060 for (const workerNode of pool.workerNodes) {
1061 expect(workerNode.usage).toStrictEqual({
1062 tasks: {
1063 executed: 0,
1064 executing: 0,
1065 queued: 0,
1066 maxQueued: 0,
1067 stolen: 0,
1068 failed: 0
1069 },
1070 runTime: {
1071 history: expect.any(CircularArray)
1072 },
1073 waitTime: {
1074 history: expect.any(CircularArray)
1075 },
1076 elu: {
1077 idle: {
1078 history: expect.any(CircularArray)
1079 },
1080 active: {
1081 history: expect.any(CircularArray)
1082 }
1083 }
1084 })
1085 expect(workerNode.usage.runTime.history.length).toBe(0)
1086 expect(workerNode.usage.waitTime.history.length).toBe(0)
1087 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1088 expect(workerNode.usage.elu.active.history.length).toBe(0)
1089 }
1090 await pool.destroy()
1091 })
1092
1093 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1094 const pool = new DynamicClusterPool(
1095 Math.floor(numberOfWorkers / 2),
1096 numberOfWorkers,
1097 './tests/worker-files/cluster/testWorker.js'
1098 )
1099 let poolInfo
1100 let poolReady = 0
1101 pool.emitter.on(PoolEvents.ready, info => {
1102 ++poolReady
1103 poolInfo = info
1104 })
1105 await waitPoolEvents(pool, PoolEvents.ready, 1)
1106 expect(poolReady).toBe(1)
1107 expect(poolInfo).toStrictEqual({
1108 version,
1109 type: PoolTypes.dynamic,
1110 worker: WorkerTypes.cluster,
1111 started: true,
1112 ready: true,
1113 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1114 minSize: expect.any(Number),
1115 maxSize: expect.any(Number),
1116 workerNodes: expect.any(Number),
1117 idleWorkerNodes: expect.any(Number),
1118 busyWorkerNodes: expect.any(Number),
1119 executedTasks: expect.any(Number),
1120 executingTasks: expect.any(Number),
1121 failedTasks: expect.any(Number)
1122 })
1123 await pool.destroy()
1124 })
1125
1126 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1127 const pool = new FixedThreadPool(
1128 numberOfWorkers,
1129 './tests/worker-files/thread/testWorker.js'
1130 )
1131 const promises = new Set()
1132 let poolBusy = 0
1133 let poolInfo
1134 pool.emitter.on(PoolEvents.busy, info => {
1135 ++poolBusy
1136 poolInfo = info
1137 })
1138 for (let i = 0; i < numberOfWorkers * 2; i++) {
1139 promises.add(pool.execute())
1140 }
1141 await Promise.all(promises)
1142 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1143 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1144 expect(poolBusy).toBe(numberOfWorkers + 1)
1145 expect(poolInfo).toStrictEqual({
1146 version,
1147 type: PoolTypes.fixed,
1148 worker: WorkerTypes.thread,
1149 started: true,
1150 ready: true,
1151 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1152 minSize: expect.any(Number),
1153 maxSize: expect.any(Number),
1154 workerNodes: expect.any(Number),
1155 idleWorkerNodes: expect.any(Number),
1156 busyWorkerNodes: expect.any(Number),
1157 executedTasks: expect.any(Number),
1158 executingTasks: expect.any(Number),
1159 failedTasks: expect.any(Number)
1160 })
1161 await pool.destroy()
1162 })
1163
1164 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1165 const pool = new DynamicThreadPool(
1166 Math.floor(numberOfWorkers / 2),
1167 numberOfWorkers,
1168 './tests/worker-files/thread/testWorker.js'
1169 )
1170 const promises = new Set()
1171 let poolFull = 0
1172 let poolInfo
1173 pool.emitter.on(PoolEvents.full, info => {
1174 ++poolFull
1175 poolInfo = info
1176 })
1177 for (let i = 0; i < numberOfWorkers * 2; i++) {
1178 promises.add(pool.execute())
1179 }
1180 await Promise.all(promises)
1181 expect(poolFull).toBe(1)
1182 expect(poolInfo).toStrictEqual({
1183 version,
1184 type: PoolTypes.dynamic,
1185 worker: WorkerTypes.thread,
1186 started: true,
1187 ready: true,
1188 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1189 minSize: expect.any(Number),
1190 maxSize: expect.any(Number),
1191 workerNodes: expect.any(Number),
1192 idleWorkerNodes: expect.any(Number),
1193 busyWorkerNodes: expect.any(Number),
1194 executedTasks: expect.any(Number),
1195 executingTasks: expect.any(Number),
1196 failedTasks: expect.any(Number)
1197 })
1198 await pool.destroy()
1199 })
1200
1201 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1202 const pool = new FixedThreadPool(
1203 numberOfWorkers,
1204 './tests/worker-files/thread/testWorker.js',
1205 {
1206 enableTasksQueue: true
1207 }
1208 )
1209 sinon.stub(pool, 'hasBackPressure').returns(true)
1210 const promises = new Set()
1211 let poolBackPressure = 0
1212 let poolInfo
1213 pool.emitter.on(PoolEvents.backPressure, info => {
1214 ++poolBackPressure
1215 poolInfo = info
1216 })
1217 for (let i = 0; i < numberOfWorkers + 1; i++) {
1218 promises.add(pool.execute())
1219 }
1220 await Promise.all(promises)
1221 expect(poolBackPressure).toBe(1)
1222 expect(poolInfo).toStrictEqual({
1223 version,
1224 type: PoolTypes.fixed,
1225 worker: WorkerTypes.thread,
1226 started: true,
1227 ready: true,
1228 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1229 minSize: expect.any(Number),
1230 maxSize: expect.any(Number),
1231 workerNodes: expect.any(Number),
1232 idleWorkerNodes: expect.any(Number),
1233 busyWorkerNodes: expect.any(Number),
1234 executedTasks: expect.any(Number),
1235 executingTasks: expect.any(Number),
1236 maxQueuedTasks: expect.any(Number),
1237 queuedTasks: expect.any(Number),
1238 backPressure: true,
1239 stolenTasks: expect.any(Number),
1240 failedTasks: expect.any(Number)
1241 })
1242 expect(pool.hasBackPressure.called).toBe(true)
1243 await pool.destroy()
1244 })
1245
1246 it('Verify that hasTaskFunction() is working', async () => {
1247 const dynamicThreadPool = new DynamicThreadPool(
1248 Math.floor(numberOfWorkers / 2),
1249 numberOfWorkers,
1250 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1251 )
1252 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1253 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1254 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1255 true
1256 )
1257 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1258 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1259 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1260 await dynamicThreadPool.destroy()
1261 const fixedClusterPool = new FixedClusterPool(
1262 numberOfWorkers,
1263 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1264 )
1265 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1266 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1267 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1268 true
1269 )
1270 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1271 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1272 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1273 await fixedClusterPool.destroy()
1274 })
1275
1276 it('Verify that addTaskFunction() is working', async () => {
1277 const dynamicThreadPool = new DynamicThreadPool(
1278 Math.floor(numberOfWorkers / 2),
1279 numberOfWorkers,
1280 './tests/worker-files/thread/testWorker.js'
1281 )
1282 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1283 await expect(
1284 dynamicThreadPool.addTaskFunction(0, () => {})
1285 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1286 await expect(
1287 dynamicThreadPool.addTaskFunction('', () => {})
1288 ).rejects.toThrowError(
1289 new TypeError('name argument must not be an empty string')
1290 )
1291 await expect(
1292 dynamicThreadPool.addTaskFunction('test', 0)
1293 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1294 await expect(
1295 dynamicThreadPool.addTaskFunction('test', '')
1296 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1297 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1298 DEFAULT_TASK_NAME,
1299 'test'
1300 ])
1301 const echoTaskFunction = data => {
1302 return data
1303 }
1304 await expect(
1305 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1306 ).resolves.toBe(true)
1307 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1308 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1309 echoTaskFunction
1310 )
1311 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1312 DEFAULT_TASK_NAME,
1313 'test',
1314 'echo'
1315 ])
1316 const taskFunctionData = { test: 'test' }
1317 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1318 expect(echoResult).toStrictEqual(taskFunctionData)
1319 for (const workerNode of dynamicThreadPool.workerNodes) {
1320 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1321 tasks: {
1322 executed: expect.any(Number),
1323 executing: 0,
1324 queued: 0,
1325 stolen: 0,
1326 failed: 0
1327 },
1328 runTime: {
1329 history: new CircularArray()
1330 },
1331 waitTime: {
1332 history: new CircularArray()
1333 },
1334 elu: {
1335 idle: {
1336 history: new CircularArray()
1337 },
1338 active: {
1339 history: new CircularArray()
1340 }
1341 }
1342 })
1343 }
1344 await dynamicThreadPool.destroy()
1345 })
1346
1347 it('Verify that removeTaskFunction() is working', async () => {
1348 const dynamicThreadPool = new DynamicThreadPool(
1349 Math.floor(numberOfWorkers / 2),
1350 numberOfWorkers,
1351 './tests/worker-files/thread/testWorker.js'
1352 )
1353 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1354 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1355 DEFAULT_TASK_NAME,
1356 'test'
1357 ])
1358 await expect(
1359 dynamicThreadPool.removeTaskFunction('test')
1360 ).rejects.toThrowError(
1361 new Error('Cannot remove a task function not handled on the pool side')
1362 )
1363 const echoTaskFunction = data => {
1364 return data
1365 }
1366 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1367 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1368 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1369 echoTaskFunction
1370 )
1371 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1372 DEFAULT_TASK_NAME,
1373 'test',
1374 'echo'
1375 ])
1376 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1377 true
1378 )
1379 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1380 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1381 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1382 DEFAULT_TASK_NAME,
1383 'test'
1384 ])
1385 await dynamicThreadPool.destroy()
1386 })
1387
1388 it('Verify that listTaskFunctionNames() is working', async () => {
1389 const dynamicThreadPool = new DynamicThreadPool(
1390 Math.floor(numberOfWorkers / 2),
1391 numberOfWorkers,
1392 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1393 )
1394 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1395 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1396 DEFAULT_TASK_NAME,
1397 'jsonIntegerSerialization',
1398 'factorial',
1399 'fibonacci'
1400 ])
1401 await dynamicThreadPool.destroy()
1402 const fixedClusterPool = new FixedClusterPool(
1403 numberOfWorkers,
1404 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1405 )
1406 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1407 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1408 DEFAULT_TASK_NAME,
1409 'jsonIntegerSerialization',
1410 'factorial',
1411 'fibonacci'
1412 ])
1413 await fixedClusterPool.destroy()
1414 })
1415
1416 it('Verify that setDefaultTaskFunction() is working', async () => {
1417 const dynamicThreadPool = new DynamicThreadPool(
1418 Math.floor(numberOfWorkers / 2),
1419 numberOfWorkers,
1420 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1421 )
1422 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1423 await expect(
1424 dynamicThreadPool.setDefaultTaskFunction(0)
1425 ).rejects.toThrowError(
1426 new Error(
1427 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1428 )
1429 )
1430 await expect(
1431 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1432 ).rejects.toThrowError(
1433 new Error(
1434 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1435 )
1436 )
1437 await expect(
1438 dynamicThreadPool.setDefaultTaskFunction('unknown')
1439 ).rejects.toThrowError(
1440 new Error(
1441 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1442 )
1443 )
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 DEFAULT_TASK_NAME,
1446 'jsonIntegerSerialization',
1447 'factorial',
1448 'fibonacci'
1449 ])
1450 await expect(
1451 dynamicThreadPool.setDefaultTaskFunction('factorial')
1452 ).resolves.toBe(true)
1453 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1454 DEFAULT_TASK_NAME,
1455 'factorial',
1456 'jsonIntegerSerialization',
1457 'fibonacci'
1458 ])
1459 await expect(
1460 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1461 ).resolves.toBe(true)
1462 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1463 DEFAULT_TASK_NAME,
1464 'fibonacci',
1465 'jsonIntegerSerialization',
1466 'factorial'
1467 ])
1468 })
1469
1470 it('Verify that multiple task functions worker is working', async () => {
1471 const pool = new DynamicClusterPool(
1472 Math.floor(numberOfWorkers / 2),
1473 numberOfWorkers,
1474 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1475 )
1476 const data = { n: 10 }
1477 const result0 = await pool.execute(data)
1478 expect(result0).toStrictEqual({ ok: 1 })
1479 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1480 expect(result1).toStrictEqual({ ok: 1 })
1481 const result2 = await pool.execute(data, 'factorial')
1482 expect(result2).toBe(3628800)
1483 const result3 = await pool.execute(data, 'fibonacci')
1484 expect(result3).toBe(55)
1485 expect(pool.info.executingTasks).toBe(0)
1486 expect(pool.info.executedTasks).toBe(4)
1487 for (const workerNode of pool.workerNodes) {
1488 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1489 DEFAULT_TASK_NAME,
1490 'jsonIntegerSerialization',
1491 'factorial',
1492 'fibonacci'
1493 ])
1494 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1495 for (const name of pool.listTaskFunctionNames()) {
1496 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1497 tasks: {
1498 executed: expect.any(Number),
1499 executing: 0,
1500 failed: 0,
1501 queued: 0,
1502 stolen: 0
1503 },
1504 runTime: {
1505 history: expect.any(CircularArray)
1506 },
1507 waitTime: {
1508 history: expect.any(CircularArray)
1509 },
1510 elu: {
1511 idle: {
1512 history: expect.any(CircularArray)
1513 },
1514 active: {
1515 history: expect.any(CircularArray)
1516 }
1517 }
1518 })
1519 expect(
1520 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1521 ).toBeGreaterThan(0)
1522 }
1523 expect(
1524 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1525 ).toStrictEqual(
1526 workerNode.getTaskFunctionWorkerUsage(
1527 workerNode.info.taskFunctionNames[1]
1528 )
1529 )
1530 }
1531 await pool.destroy()
1532 })
1533
1534 it('Verify sendKillMessageToWorker()', async () => {
1535 const pool = new DynamicClusterPool(
1536 Math.floor(numberOfWorkers / 2),
1537 numberOfWorkers,
1538 './tests/worker-files/cluster/testWorker.js'
1539 )
1540 const workerNodeKey = 0
1541 await expect(
1542 pool.sendKillMessageToWorker(workerNodeKey)
1543 ).resolves.toBeUndefined()
1544 await pool.destroy()
1545 })
1546
1547 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1548 const pool = new DynamicClusterPool(
1549 Math.floor(numberOfWorkers / 2),
1550 numberOfWorkers,
1551 './tests/worker-files/cluster/testWorker.js'
1552 )
1553 const workerNodeKey = 0
1554 await expect(
1555 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1556 taskFunctionOperation: 'add',
1557 taskFunctionName: 'empty',
1558 taskFunction: (() => {}).toString()
1559 })
1560 ).resolves.toBe(true)
1561 expect(
1562 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1563 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1564 await pool.destroy()
1565 })
1566
1567 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1568 const pool = new DynamicClusterPool(
1569 Math.floor(numberOfWorkers / 2),
1570 numberOfWorkers,
1571 './tests/worker-files/cluster/testWorker.js'
1572 )
1573 await expect(
1574 pool.sendTaskFunctionOperationToWorkers({
1575 taskFunctionOperation: 'add',
1576 taskFunctionName: 'empty',
1577 taskFunction: (() => {}).toString()
1578 })
1579 ).resolves.toBe(true)
1580 for (const workerNode of pool.workerNodes) {
1581 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1582 DEFAULT_TASK_NAME,
1583 'test',
1584 'empty'
1585 ])
1586 }
1587 await pool.destroy()
1588 })
1589 })