refactor: code cleanup
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
7 import {
8 DynamicClusterPool,
9 DynamicThreadPool,
10 FixedClusterPool,
11 FixedThreadPool,
12 PoolEvents,
13 PoolTypes,
14 WorkerChoiceStrategies,
15 WorkerTypes
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
22
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
25 readFileSync(
26 join(dirname(fileURLToPath(import.meta.url)), '../../package.json'),
27 'utf8'
28 )
29 ).version
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
32 isMain () {
33 return false
34 }
35 }
36
37 afterEach(() => {
38 restore()
39 })
40
41 it('Simulate pool creation from a non main thread/process', () => {
42 expect(
43 () =>
44 new StubPoolWithIsMain(
45 numberOfWorkers,
46 './tests/worker-files/thread/testWorker.mjs',
47 {
48 errorHandler: e => console.error(e)
49 }
50 )
51 ).toThrowError(
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
55 )
56 })
57
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
61 './tests/worker-files/thread/testWorker.mjs'
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
66 })
67
68 it('Verify that filePath is checked', () => {
69 const expectedError = new Error(
70 'Please specify a file with a worker implementation'
71 )
72 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
73 expectedError
74 )
75 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
76 expectedError
77 )
78 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
79 expectedError
80 )
81 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
82 expectedError
83 )
84 expect(
85 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
86 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
87 })
88
89 it('Verify that numberOfWorkers is checked', () => {
90 expect(
91 () =>
92 new FixedThreadPool(
93 undefined,
94 './tests/worker-files/thread/testWorker.mjs'
95 )
96 ).toThrowError(
97 new Error(
98 'Cannot instantiate a pool without specifying the number of workers'
99 )
100 )
101 })
102
103 it('Verify that a negative number of workers is checked', () => {
104 expect(
105 () =>
106 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
107 ).toThrowError(
108 new RangeError(
109 'Cannot instantiate a pool with a negative number of workers'
110 )
111 )
112 })
113
114 it('Verify that a non integer number of workers is checked', () => {
115 expect(
116 () =>
117 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
118 ).toThrowError(
119 new TypeError(
120 'Cannot instantiate a pool with a non safe integer number of workers'
121 )
122 )
123 })
124
125 it('Verify that dynamic pool sizing is checked', () => {
126 expect(
127 () =>
128 new DynamicClusterPool(
129 1,
130 undefined,
131 './tests/worker-files/cluster/testWorker.js'
132 )
133 ).toThrowError(
134 new TypeError(
135 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
136 )
137 )
138 expect(
139 () =>
140 new DynamicThreadPool(
141 0.5,
142 1,
143 './tests/worker-files/thread/testWorker.mjs'
144 )
145 ).toThrowError(
146 new TypeError(
147 'Cannot instantiate a pool with a non safe integer number of workers'
148 )
149 )
150 expect(
151 () =>
152 new DynamicClusterPool(
153 0,
154 0.5,
155 './tests/worker-files/cluster/testWorker.js'
156 )
157 ).toThrowError(
158 new TypeError(
159 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
160 )
161 )
162 expect(
163 () =>
164 new DynamicThreadPool(
165 2,
166 1,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
169 ).toThrowError(
170 new RangeError(
171 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
172 )
173 )
174 expect(
175 () =>
176 new DynamicThreadPool(
177 0,
178 0,
179 './tests/worker-files/thread/testWorker.mjs'
180 )
181 ).toThrowError(
182 new RangeError(
183 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
184 )
185 )
186 expect(
187 () =>
188 new DynamicClusterPool(
189 1,
190 1,
191 './tests/worker-files/cluster/testWorker.js'
192 )
193 ).toThrowError(
194 new RangeError(
195 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
196 )
197 )
198 })
199
200 it('Verify that pool options are checked', async () => {
201 let pool = new FixedThreadPool(
202 numberOfWorkers,
203 './tests/worker-files/thread/testWorker.mjs'
204 )
205 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
206 expect(pool.opts).toStrictEqual({
207 startWorkers: true,
208 enableEvents: true,
209 restartWorkerOnError: true,
210 enableTasksQueue: false,
211 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
212 workerChoiceStrategyOptions: {
213 retries: 6,
214 runTime: { median: false },
215 waitTime: { median: false },
216 elu: { median: false }
217 }
218 })
219 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
220 retries: 6,
221 runTime: { median: false },
222 waitTime: { median: false },
223 elu: { median: false }
224 })
225 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
226 .workerChoiceStrategies) {
227 expect(workerChoiceStrategy.opts).toStrictEqual({
228 retries: 6,
229 runTime: { median: false },
230 waitTime: { median: false },
231 elu: { median: false }
232 })
233 }
234 await pool.destroy()
235 const testHandler = () => console.info('test handler executed')
236 pool = new FixedThreadPool(
237 numberOfWorkers,
238 './tests/worker-files/thread/testWorker.mjs',
239 {
240 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
241 workerChoiceStrategyOptions: {
242 runTime: { median: true },
243 weights: { 0: 300, 1: 200 }
244 },
245 enableEvents: false,
246 restartWorkerOnError: false,
247 enableTasksQueue: true,
248 tasksQueueOptions: { concurrency: 2 },
249 messageHandler: testHandler,
250 errorHandler: testHandler,
251 onlineHandler: testHandler,
252 exitHandler: testHandler
253 }
254 )
255 expect(pool.emitter).toBeUndefined()
256 expect(pool.opts).toStrictEqual({
257 startWorkers: true,
258 enableEvents: false,
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: {
262 concurrency: 2,
263 size: Math.pow(numberOfWorkers, 2),
264 taskStealing: true,
265 tasksStealingOnBackPressure: true
266 },
267 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
268 workerChoiceStrategyOptions: {
269 retries: 6,
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
274 },
275 onlineHandler: testHandler,
276 messageHandler: testHandler,
277 errorHandler: testHandler,
278 exitHandler: testHandler
279 })
280 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
281 retries: 6,
282 runTime: { median: true },
283 waitTime: { median: false },
284 elu: { median: false },
285 weights: { 0: 300, 1: 200 }
286 })
287 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
288 .workerChoiceStrategies) {
289 expect(workerChoiceStrategy.opts).toStrictEqual({
290 retries: 6,
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
295 })
296 }
297 await pool.destroy()
298 })
299
300 it('Verify that pool options are validated', async () => {
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
305 './tests/worker-files/thread/testWorker.mjs',
306 {
307 workerChoiceStrategy: 'invalidStrategy'
308 }
309 )
310 ).toThrowError(
311 new Error("Invalid worker choice strategy 'invalidStrategy'")
312 )
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.mjs',
318 {
319 workerChoiceStrategyOptions: {
320 retries: 'invalidChoiceRetries'
321 }
322 }
323 )
324 ).toThrowError(
325 new TypeError(
326 'Invalid worker choice strategy options: retries must be an integer'
327 )
328 )
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.mjs',
334 {
335 workerChoiceStrategyOptions: {
336 retries: -1
337 }
338 }
339 )
340 ).toThrowError(
341 new RangeError(
342 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
343 )
344 )
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
349 './tests/worker-files/thread/testWorker.mjs',
350 {
351 workerChoiceStrategyOptions: { weights: {} }
352 }
353 )
354 ).toThrowError(
355 new Error(
356 'Invalid worker choice strategy options: must have a weight for each worker node'
357 )
358 )
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
363 './tests/worker-files/thread/testWorker.mjs',
364 {
365 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
366 }
367 )
368 ).toThrowError(
369 new Error(
370 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 )
372 )
373 expect(
374 () =>
375 new FixedThreadPool(
376 numberOfWorkers,
377 './tests/worker-files/thread/testWorker.mjs',
378 {
379 enableTasksQueue: true,
380 tasksQueueOptions: 'invalidTasksQueueOptions'
381 }
382 )
383 ).toThrowError(
384 new TypeError('Invalid tasks queue options: must be a plain object')
385 )
386 expect(
387 () =>
388 new FixedThreadPool(
389 numberOfWorkers,
390 './tests/worker-files/thread/testWorker.mjs',
391 {
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: 0 }
394 }
395 )
396 ).toThrowError(
397 new RangeError(
398 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
399 )
400 )
401 expect(
402 () =>
403 new FixedThreadPool(
404 numberOfWorkers,
405 './tests/worker-files/thread/testWorker.mjs',
406 {
407 enableTasksQueue: true,
408 tasksQueueOptions: { concurrency: -1 }
409 }
410 )
411 ).toThrowError(
412 new RangeError(
413 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 )
415 )
416 expect(
417 () =>
418 new FixedThreadPool(
419 numberOfWorkers,
420 './tests/worker-files/thread/testWorker.mjs',
421 {
422 enableTasksQueue: true,
423 tasksQueueOptions: { concurrency: 0.2 }
424 }
425 )
426 ).toThrowError(
427 new TypeError('Invalid worker node tasks concurrency: must be an integer')
428 )
429 expect(
430 () =>
431 new FixedThreadPool(
432 numberOfWorkers,
433 './tests/worker-files/thread/testWorker.mjs',
434 {
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: 0 }
437 }
438 )
439 ).toThrowError(
440 new RangeError(
441 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 )
443 )
444 expect(
445 () =>
446 new FixedThreadPool(
447 numberOfWorkers,
448 './tests/worker-files/thread/testWorker.mjs',
449 {
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: -1 }
452 }
453 )
454 ).toThrowError(
455 new RangeError(
456 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 )
458 )
459 expect(
460 () =>
461 new FixedThreadPool(
462 numberOfWorkers,
463 './tests/worker-files/thread/testWorker.mjs',
464 {
465 enableTasksQueue: true,
466 tasksQueueOptions: { size: 0.2 }
467 }
468 )
469 ).toThrowError(
470 new TypeError('Invalid worker node tasks queue size: must be an integer')
471 )
472 })
473
474 it('Verify that pool worker choice strategy options can be set', async () => {
475 const pool = new FixedThreadPool(
476 numberOfWorkers,
477 './tests/worker-files/thread/testWorker.mjs',
478 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
479 )
480 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
481 retries: 6,
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
485 })
486 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
487 retries: 6,
488 runTime: { median: false },
489 waitTime: { median: false },
490 elu: { median: false }
491 })
492 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
493 .workerChoiceStrategies) {
494 expect(workerChoiceStrategy.opts).toStrictEqual({
495 retries: 6,
496 runTime: { median: false },
497 waitTime: { median: false },
498 elu: { median: false }
499 })
500 }
501 expect(
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
503 ).toStrictEqual({
504 runTime: {
505 aggregate: true,
506 average: true,
507 median: false
508 },
509 waitTime: {
510 aggregate: false,
511 average: false,
512 median: false
513 },
514 elu: {
515 aggregate: true,
516 average: true,
517 median: false
518 }
519 })
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: true },
522 elu: { median: true }
523 })
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 retries: 6,
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
529 })
530 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
531 retries: 6,
532 runTime: { median: true },
533 waitTime: { median: false },
534 elu: { median: true }
535 })
536 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
537 .workerChoiceStrategies) {
538 expect(workerChoiceStrategy.opts).toStrictEqual({
539 retries: 6,
540 runTime: { median: true },
541 waitTime: { median: false },
542 elu: { median: true }
543 })
544 }
545 expect(
546 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
547 ).toStrictEqual({
548 runTime: {
549 aggregate: true,
550 average: false,
551 median: true
552 },
553 waitTime: {
554 aggregate: false,
555 average: false,
556 median: false
557 },
558 elu: {
559 aggregate: true,
560 average: false,
561 median: true
562 }
563 })
564 pool.setWorkerChoiceStrategyOptions({
565 runTime: { median: false },
566 elu: { median: false }
567 })
568 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
569 retries: 6,
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
573 })
574 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
575 retries: 6,
576 runTime: { median: false },
577 waitTime: { median: false },
578 elu: { median: false }
579 })
580 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
581 .workerChoiceStrategies) {
582 expect(workerChoiceStrategy.opts).toStrictEqual({
583 retries: 6,
584 runTime: { median: false },
585 waitTime: { median: false },
586 elu: { median: false }
587 })
588 }
589 expect(
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
591 ).toStrictEqual({
592 runTime: {
593 aggregate: true,
594 average: true,
595 median: false
596 },
597 waitTime: {
598 aggregate: false,
599 average: false,
600 median: false
601 },
602 elu: {
603 aggregate: true,
604 average: true,
605 median: false
606 }
607 })
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
610 ).toThrowError(
611 new TypeError(
612 'Invalid worker choice strategy options: must be a plain object'
613 )
614 )
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({
617 retries: 'invalidChoiceRetries'
618 })
619 ).toThrowError(
620 new TypeError(
621 'Invalid worker choice strategy options: retries must be an integer'
622 )
623 )
624 expect(() =>
625 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
626 ).toThrowError(
627 new RangeError(
628 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
629 )
630 )
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ weights: {} })
633 ).toThrowError(
634 new Error(
635 'Invalid worker choice strategy options: must have a weight for each worker node'
636 )
637 )
638 expect(() =>
639 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
640 ).toThrowError(
641 new Error(
642 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
643 )
644 )
645 await pool.destroy()
646 })
647
648 it('Verify that pool tasks queue can be enabled/disabled', async () => {
649 const pool = new FixedThreadPool(
650 numberOfWorkers,
651 './tests/worker-files/thread/testWorker.mjs'
652 )
653 expect(pool.opts.enableTasksQueue).toBe(false)
654 expect(pool.opts.tasksQueueOptions).toBeUndefined()
655 for (const workerNode of pool.workerNodes) {
656 expect(workerNode.onEmptyQueue).toBeUndefined()
657 expect(workerNode.onBackPressure).toBeUndefined()
658 }
659 pool.enableTasksQueue(true)
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 1,
663 size: Math.pow(numberOfWorkers, 2),
664 taskStealing: true,
665 tasksStealingOnBackPressure: true
666 })
667 for (const workerNode of pool.workerNodes) {
668 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
669 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
670 }
671 pool.enableTasksQueue(true, { concurrency: 2 })
672 expect(pool.opts.enableTasksQueue).toBe(true)
673 expect(pool.opts.tasksQueueOptions).toStrictEqual({
674 concurrency: 2,
675 size: Math.pow(numberOfWorkers, 2),
676 taskStealing: true,
677 tasksStealingOnBackPressure: true
678 })
679 for (const workerNode of pool.workerNodes) {
680 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
681 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
682 }
683 pool.enableTasksQueue(false)
684 expect(pool.opts.enableTasksQueue).toBe(false)
685 expect(pool.opts.tasksQueueOptions).toBeUndefined()
686 for (const workerNode of pool.workerNodes) {
687 expect(workerNode.onEmptyQueue).toBeUndefined()
688 expect(workerNode.onBackPressure).toBeUndefined()
689 }
690 await pool.destroy()
691 })
692
693 it('Verify that pool tasks queue options can be set', async () => {
694 const pool = new FixedThreadPool(
695 numberOfWorkers,
696 './tests/worker-files/thread/testWorker.mjs',
697 { enableTasksQueue: true }
698 )
699 expect(pool.opts.tasksQueueOptions).toStrictEqual({
700 concurrency: 1,
701 size: Math.pow(numberOfWorkers, 2),
702 taskStealing: true,
703 tasksStealingOnBackPressure: true
704 })
705 for (const workerNode of pool.workerNodes) {
706 expect(workerNode.tasksQueueBackPressureSize).toBe(
707 pool.opts.tasksQueueOptions.size
708 )
709 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
710 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
711 }
712 pool.setTasksQueueOptions({
713 concurrency: 2,
714 size: 2,
715 taskStealing: false,
716 tasksStealingOnBackPressure: false
717 })
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
719 concurrency: 2,
720 size: 2,
721 taskStealing: false,
722 tasksStealingOnBackPressure: false
723 })
724 for (const workerNode of pool.workerNodes) {
725 expect(workerNode.tasksQueueBackPressureSize).toBe(
726 pool.opts.tasksQueueOptions.size
727 )
728 expect(workerNode.onEmptyQueue).toBeUndefined()
729 expect(workerNode.onBackPressure).toBeUndefined()
730 }
731 pool.setTasksQueueOptions({
732 concurrency: 1,
733 taskStealing: true,
734 tasksStealingOnBackPressure: true
735 })
736 expect(pool.opts.tasksQueueOptions).toStrictEqual({
737 concurrency: 1,
738 size: Math.pow(numberOfWorkers, 2),
739 taskStealing: true,
740 tasksStealingOnBackPressure: true
741 })
742 for (const workerNode of pool.workerNodes) {
743 expect(workerNode.tasksQueueBackPressureSize).toBe(
744 pool.opts.tasksQueueOptions.size
745 )
746 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
747 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
748 }
749 expect(() =>
750 pool.setTasksQueueOptions('invalidTasksQueueOptions')
751 ).toThrowError(
752 new TypeError('Invalid tasks queue options: must be a plain object')
753 )
754 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
755 new RangeError(
756 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
757 )
758 )
759 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
760 new RangeError(
761 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
762 )
763 )
764 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
765 new TypeError('Invalid worker node tasks concurrency: must be an integer')
766 )
767 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
768 new RangeError(
769 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
770 )
771 )
772 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
773 new RangeError(
774 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
775 )
776 )
777 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
778 new TypeError('Invalid worker node tasks queue size: must be an integer')
779 )
780 await pool.destroy()
781 })
782
783 it('Verify that pool info is set', async () => {
784 let pool = new FixedThreadPool(
785 numberOfWorkers,
786 './tests/worker-files/thread/testWorker.mjs'
787 )
788 expect(pool.info).toStrictEqual({
789 version,
790 type: PoolTypes.fixed,
791 worker: WorkerTypes.thread,
792 started: true,
793 ready: true,
794 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
795 minSize: numberOfWorkers,
796 maxSize: numberOfWorkers,
797 workerNodes: numberOfWorkers,
798 idleWorkerNodes: numberOfWorkers,
799 busyWorkerNodes: 0,
800 executedTasks: 0,
801 executingTasks: 0,
802 failedTasks: 0
803 })
804 await pool.destroy()
805 pool = new DynamicClusterPool(
806 Math.floor(numberOfWorkers / 2),
807 numberOfWorkers,
808 './tests/worker-files/cluster/testWorker.js'
809 )
810 expect(pool.info).toStrictEqual({
811 version,
812 type: PoolTypes.dynamic,
813 worker: WorkerTypes.cluster,
814 started: true,
815 ready: true,
816 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
817 minSize: Math.floor(numberOfWorkers / 2),
818 maxSize: numberOfWorkers,
819 workerNodes: Math.floor(numberOfWorkers / 2),
820 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
821 busyWorkerNodes: 0,
822 executedTasks: 0,
823 executingTasks: 0,
824 failedTasks: 0
825 })
826 await pool.destroy()
827 })
828
829 it('Verify that pool worker tasks usage are initialized', async () => {
830 const pool = new FixedClusterPool(
831 numberOfWorkers,
832 './tests/worker-files/cluster/testWorker.js'
833 )
834 for (const workerNode of pool.workerNodes) {
835 expect(workerNode).toBeInstanceOf(WorkerNode)
836 expect(workerNode.usage).toStrictEqual({
837 tasks: {
838 executed: 0,
839 executing: 0,
840 queued: 0,
841 maxQueued: 0,
842 stolen: 0,
843 failed: 0
844 },
845 runTime: {
846 history: new CircularArray()
847 },
848 waitTime: {
849 history: new CircularArray()
850 },
851 elu: {
852 idle: {
853 history: new CircularArray()
854 },
855 active: {
856 history: new CircularArray()
857 }
858 }
859 })
860 }
861 await pool.destroy()
862 })
863
864 it('Verify that pool worker tasks queue are initialized', async () => {
865 let pool = new FixedClusterPool(
866 numberOfWorkers,
867 './tests/worker-files/cluster/testWorker.js'
868 )
869 for (const workerNode of pool.workerNodes) {
870 expect(workerNode).toBeInstanceOf(WorkerNode)
871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
872 expect(workerNode.tasksQueue.size).toBe(0)
873 expect(workerNode.tasksQueue.maxSize).toBe(0)
874 }
875 await pool.destroy()
876 pool = new DynamicThreadPool(
877 Math.floor(numberOfWorkers / 2),
878 numberOfWorkers,
879 './tests/worker-files/thread/testWorker.mjs'
880 )
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
883 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
884 expect(workerNode.tasksQueue.size).toBe(0)
885 expect(workerNode.tasksQueue.maxSize).toBe(0)
886 }
887 await pool.destroy()
888 })
889
890 it('Verify that pool worker info are initialized', async () => {
891 let pool = new FixedClusterPool(
892 numberOfWorkers,
893 './tests/worker-files/cluster/testWorker.js'
894 )
895 for (const workerNode of pool.workerNodes) {
896 expect(workerNode).toBeInstanceOf(WorkerNode)
897 expect(workerNode.info).toStrictEqual({
898 id: expect.any(Number),
899 type: WorkerTypes.cluster,
900 dynamic: false,
901 ready: true
902 })
903 }
904 await pool.destroy()
905 pool = new DynamicThreadPool(
906 Math.floor(numberOfWorkers / 2),
907 numberOfWorkers,
908 './tests/worker-files/thread/testWorker.mjs'
909 )
910 for (const workerNode of pool.workerNodes) {
911 expect(workerNode).toBeInstanceOf(WorkerNode)
912 expect(workerNode.info).toStrictEqual({
913 id: expect.any(Number),
914 type: WorkerTypes.thread,
915 dynamic: false,
916 ready: true
917 })
918 }
919 await pool.destroy()
920 })
921
922 it('Verify that pool can be started after initialization', async () => {
923 const pool = new FixedClusterPool(
924 numberOfWorkers,
925 './tests/worker-files/cluster/testWorker.js',
926 {
927 startWorkers: false
928 }
929 )
930 expect(pool.info.started).toBe(false)
931 expect(pool.info.ready).toBe(false)
932 expect(pool.workerNodes).toStrictEqual([])
933 await expect(pool.execute()).rejects.toThrowError(
934 new Error('Cannot execute a task on not started pool')
935 )
936 pool.start()
937 expect(pool.info.started).toBe(true)
938 expect(pool.info.ready).toBe(true)
939 expect(pool.workerNodes.length).toBe(numberOfWorkers)
940 for (const workerNode of pool.workerNodes) {
941 expect(workerNode).toBeInstanceOf(WorkerNode)
942 }
943 await pool.destroy()
944 })
945
946 it('Verify that pool execute() arguments are checked', async () => {
947 const pool = new FixedClusterPool(
948 numberOfWorkers,
949 './tests/worker-files/cluster/testWorker.js'
950 )
951 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
952 new TypeError('name argument must be a string')
953 )
954 await expect(pool.execute(undefined, '')).rejects.toThrowError(
955 new TypeError('name argument must not be an empty string')
956 )
957 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
958 new TypeError('transferList argument must be an array')
959 )
960 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
961 "Task function 'unknown' not found"
962 )
963 await pool.destroy()
964 await expect(pool.execute()).rejects.toThrowError(
965 new Error('Cannot execute a task on not started pool')
966 )
967 })
968
969 it('Verify that pool worker tasks usage are computed', async () => {
970 const pool = new FixedClusterPool(
971 numberOfWorkers,
972 './tests/worker-files/cluster/testWorker.js'
973 )
974 const promises = new Set()
975 const maxMultiplier = 2
976 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
977 promises.add(pool.execute())
978 }
979 for (const workerNode of pool.workerNodes) {
980 expect(workerNode.usage).toStrictEqual({
981 tasks: {
982 executed: 0,
983 executing: maxMultiplier,
984 queued: 0,
985 maxQueued: 0,
986 stolen: 0,
987 failed: 0
988 },
989 runTime: {
990 history: expect.any(CircularArray)
991 },
992 waitTime: {
993 history: expect.any(CircularArray)
994 },
995 elu: {
996 idle: {
997 history: expect.any(CircularArray)
998 },
999 active: {
1000 history: expect.any(CircularArray)
1001 }
1002 }
1003 })
1004 }
1005 await Promise.all(promises)
1006 for (const workerNode of pool.workerNodes) {
1007 expect(workerNode.usage).toStrictEqual({
1008 tasks: {
1009 executed: maxMultiplier,
1010 executing: 0,
1011 queued: 0,
1012 maxQueued: 0,
1013 stolen: 0,
1014 failed: 0
1015 },
1016 runTime: {
1017 history: expect.any(CircularArray)
1018 },
1019 waitTime: {
1020 history: expect.any(CircularArray)
1021 },
1022 elu: {
1023 idle: {
1024 history: expect.any(CircularArray)
1025 },
1026 active: {
1027 history: expect.any(CircularArray)
1028 }
1029 }
1030 })
1031 }
1032 await pool.destroy()
1033 })
1034
1035 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1036 const pool = new DynamicThreadPool(
1037 Math.floor(numberOfWorkers / 2),
1038 numberOfWorkers,
1039 './tests/worker-files/thread/testWorker.mjs'
1040 )
1041 const promises = new Set()
1042 const maxMultiplier = 2
1043 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1044 promises.add(pool.execute())
1045 }
1046 await Promise.all(promises)
1047 for (const workerNode of pool.workerNodes) {
1048 expect(workerNode.usage).toStrictEqual({
1049 tasks: {
1050 executed: expect.any(Number),
1051 executing: 0,
1052 queued: 0,
1053 maxQueued: 0,
1054 stolen: 0,
1055 failed: 0
1056 },
1057 runTime: {
1058 history: expect.any(CircularArray)
1059 },
1060 waitTime: {
1061 history: expect.any(CircularArray)
1062 },
1063 elu: {
1064 idle: {
1065 history: expect.any(CircularArray)
1066 },
1067 active: {
1068 history: expect.any(CircularArray)
1069 }
1070 }
1071 })
1072 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 numberOfWorkers * maxMultiplier
1075 )
1076 expect(workerNode.usage.runTime.history.length).toBe(0)
1077 expect(workerNode.usage.waitTime.history.length).toBe(0)
1078 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1079 expect(workerNode.usage.elu.active.history.length).toBe(0)
1080 }
1081 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1082 for (const workerNode of pool.workerNodes) {
1083 expect(workerNode.usage).toStrictEqual({
1084 tasks: {
1085 executed: 0,
1086 executing: 0,
1087 queued: 0,
1088 maxQueued: 0,
1089 stolen: 0,
1090 failed: 0
1091 },
1092 runTime: {
1093 history: expect.any(CircularArray)
1094 },
1095 waitTime: {
1096 history: expect.any(CircularArray)
1097 },
1098 elu: {
1099 idle: {
1100 history: expect.any(CircularArray)
1101 },
1102 active: {
1103 history: expect.any(CircularArray)
1104 }
1105 }
1106 })
1107 expect(workerNode.usage.runTime.history.length).toBe(0)
1108 expect(workerNode.usage.waitTime.history.length).toBe(0)
1109 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1110 expect(workerNode.usage.elu.active.history.length).toBe(0)
1111 }
1112 await pool.destroy()
1113 })
1114
1115 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1116 const pool = new DynamicClusterPool(
1117 Math.floor(numberOfWorkers / 2),
1118 numberOfWorkers,
1119 './tests/worker-files/cluster/testWorker.js'
1120 )
1121 expect(pool.emitter.eventNames()).toStrictEqual([])
1122 let poolInfo
1123 let poolReady = 0
1124 pool.emitter.on(PoolEvents.ready, info => {
1125 ++poolReady
1126 poolInfo = info
1127 })
1128 await waitPoolEvents(pool, PoolEvents.ready, 1)
1129 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1130 expect(poolReady).toBe(1)
1131 expect(poolInfo).toStrictEqual({
1132 version,
1133 type: PoolTypes.dynamic,
1134 worker: WorkerTypes.cluster,
1135 started: true,
1136 ready: true,
1137 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1138 minSize: expect.any(Number),
1139 maxSize: expect.any(Number),
1140 workerNodes: expect.any(Number),
1141 idleWorkerNodes: expect.any(Number),
1142 busyWorkerNodes: expect.any(Number),
1143 executedTasks: expect.any(Number),
1144 executingTasks: expect.any(Number),
1145 failedTasks: expect.any(Number)
1146 })
1147 await pool.destroy()
1148 })
1149
1150 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1151 const pool = new FixedThreadPool(
1152 numberOfWorkers,
1153 './tests/worker-files/thread/testWorker.mjs'
1154 )
1155 expect(pool.emitter.eventNames()).toStrictEqual([])
1156 const promises = new Set()
1157 let poolBusy = 0
1158 let poolInfo
1159 pool.emitter.on(PoolEvents.busy, info => {
1160 ++poolBusy
1161 poolInfo = info
1162 })
1163 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1164 for (let i = 0; i < numberOfWorkers * 2; i++) {
1165 promises.add(pool.execute())
1166 }
1167 await Promise.all(promises)
1168 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1169 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1170 expect(poolBusy).toBe(numberOfWorkers + 1)
1171 expect(poolInfo).toStrictEqual({
1172 version,
1173 type: PoolTypes.fixed,
1174 worker: WorkerTypes.thread,
1175 started: true,
1176 ready: true,
1177 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1178 minSize: expect.any(Number),
1179 maxSize: expect.any(Number),
1180 workerNodes: expect.any(Number),
1181 idleWorkerNodes: expect.any(Number),
1182 busyWorkerNodes: expect.any(Number),
1183 executedTasks: expect.any(Number),
1184 executingTasks: expect.any(Number),
1185 failedTasks: expect.any(Number)
1186 })
1187 await pool.destroy()
1188 })
1189
1190 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1191 const pool = new DynamicThreadPool(
1192 Math.floor(numberOfWorkers / 2),
1193 numberOfWorkers,
1194 './tests/worker-files/thread/testWorker.mjs'
1195 )
1196 expect(pool.emitter.eventNames()).toStrictEqual([])
1197 const promises = new Set()
1198 let poolFull = 0
1199 let poolInfo
1200 pool.emitter.on(PoolEvents.full, info => {
1201 ++poolFull
1202 poolInfo = info
1203 })
1204 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1205 for (let i = 0; i < numberOfWorkers * 2; i++) {
1206 promises.add(pool.execute())
1207 }
1208 await Promise.all(promises)
1209 expect(poolFull).toBe(1)
1210 expect(poolInfo).toStrictEqual({
1211 version,
1212 type: PoolTypes.dynamic,
1213 worker: WorkerTypes.thread,
1214 started: true,
1215 ready: true,
1216 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1217 minSize: expect.any(Number),
1218 maxSize: expect.any(Number),
1219 workerNodes: expect.any(Number),
1220 idleWorkerNodes: expect.any(Number),
1221 busyWorkerNodes: expect.any(Number),
1222 executedTasks: expect.any(Number),
1223 executingTasks: expect.any(Number),
1224 failedTasks: expect.any(Number)
1225 })
1226 await pool.destroy()
1227 })
1228
1229 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1230 const pool = new FixedThreadPool(
1231 numberOfWorkers,
1232 './tests/worker-files/thread/testWorker.mjs',
1233 {
1234 enableTasksQueue: true
1235 }
1236 )
1237 stub(pool, 'hasBackPressure').returns(true)
1238 expect(pool.emitter.eventNames()).toStrictEqual([])
1239 const promises = new Set()
1240 let poolBackPressure = 0
1241 let poolInfo
1242 pool.emitter.on(PoolEvents.backPressure, info => {
1243 ++poolBackPressure
1244 poolInfo = info
1245 })
1246 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1247 for (let i = 0; i < numberOfWorkers + 1; i++) {
1248 promises.add(pool.execute())
1249 }
1250 await Promise.all(promises)
1251 expect(poolBackPressure).toBe(1)
1252 expect(poolInfo).toStrictEqual({
1253 version,
1254 type: PoolTypes.fixed,
1255 worker: WorkerTypes.thread,
1256 started: true,
1257 ready: true,
1258 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1259 minSize: expect.any(Number),
1260 maxSize: expect.any(Number),
1261 workerNodes: expect.any(Number),
1262 idleWorkerNodes: expect.any(Number),
1263 busyWorkerNodes: expect.any(Number),
1264 executedTasks: expect.any(Number),
1265 executingTasks: expect.any(Number),
1266 maxQueuedTasks: expect.any(Number),
1267 queuedTasks: expect.any(Number),
1268 backPressure: true,
1269 stolenTasks: expect.any(Number),
1270 failedTasks: expect.any(Number)
1271 })
1272 expect(pool.hasBackPressure.called).toBe(true)
1273 await pool.destroy()
1274 })
1275
1276 it('Verify that hasTaskFunction() is working', async () => {
1277 const dynamicThreadPool = new DynamicThreadPool(
1278 Math.floor(numberOfWorkers / 2),
1279 numberOfWorkers,
1280 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1281 )
1282 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1283 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1284 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1285 true
1286 )
1287 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1288 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1289 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1290 await dynamicThreadPool.destroy()
1291 const fixedClusterPool = new FixedClusterPool(
1292 numberOfWorkers,
1293 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1294 )
1295 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1296 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1297 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1298 true
1299 )
1300 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1301 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1302 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1303 await fixedClusterPool.destroy()
1304 })
1305
1306 it('Verify that addTaskFunction() is working', async () => {
1307 const dynamicThreadPool = new DynamicThreadPool(
1308 Math.floor(numberOfWorkers / 2),
1309 numberOfWorkers,
1310 './tests/worker-files/thread/testWorker.mjs'
1311 )
1312 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1313 await expect(
1314 dynamicThreadPool.addTaskFunction(0, () => {})
1315 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1316 await expect(
1317 dynamicThreadPool.addTaskFunction('', () => {})
1318 ).rejects.toThrowError(
1319 new TypeError('name argument must not be an empty string')
1320 )
1321 await expect(
1322 dynamicThreadPool.addTaskFunction('test', 0)
1323 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1324 await expect(
1325 dynamicThreadPool.addTaskFunction('test', '')
1326 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1327 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1328 DEFAULT_TASK_NAME,
1329 'test'
1330 ])
1331 const echoTaskFunction = data => {
1332 return data
1333 }
1334 await expect(
1335 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1336 ).resolves.toBe(true)
1337 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1338 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1339 echoTaskFunction
1340 )
1341 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1342 DEFAULT_TASK_NAME,
1343 'test',
1344 'echo'
1345 ])
1346 const taskFunctionData = { test: 'test' }
1347 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1348 expect(echoResult).toStrictEqual(taskFunctionData)
1349 for (const workerNode of dynamicThreadPool.workerNodes) {
1350 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1351 tasks: {
1352 executed: expect.any(Number),
1353 executing: 0,
1354 queued: 0,
1355 stolen: 0,
1356 failed: 0
1357 },
1358 runTime: {
1359 history: new CircularArray()
1360 },
1361 waitTime: {
1362 history: new CircularArray()
1363 },
1364 elu: {
1365 idle: {
1366 history: new CircularArray()
1367 },
1368 active: {
1369 history: new CircularArray()
1370 }
1371 }
1372 })
1373 }
1374 await dynamicThreadPool.destroy()
1375 })
1376
1377 it('Verify that removeTaskFunction() is working', async () => {
1378 const dynamicThreadPool = new DynamicThreadPool(
1379 Math.floor(numberOfWorkers / 2),
1380 numberOfWorkers,
1381 './tests/worker-files/thread/testWorker.mjs'
1382 )
1383 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1384 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1385 DEFAULT_TASK_NAME,
1386 'test'
1387 ])
1388 await expect(
1389 dynamicThreadPool.removeTaskFunction('test')
1390 ).rejects.toThrowError(
1391 new Error('Cannot remove a task function not handled on the pool side')
1392 )
1393 const echoTaskFunction = data => {
1394 return data
1395 }
1396 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1397 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1398 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1399 echoTaskFunction
1400 )
1401 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1402 DEFAULT_TASK_NAME,
1403 'test',
1404 'echo'
1405 ])
1406 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1407 true
1408 )
1409 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1410 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1411 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1412 DEFAULT_TASK_NAME,
1413 'test'
1414 ])
1415 await dynamicThreadPool.destroy()
1416 })
1417
1418 it('Verify that listTaskFunctionNames() is working', async () => {
1419 const dynamicThreadPool = new DynamicThreadPool(
1420 Math.floor(numberOfWorkers / 2),
1421 numberOfWorkers,
1422 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1423 )
1424 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1425 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1426 DEFAULT_TASK_NAME,
1427 'jsonIntegerSerialization',
1428 'factorial',
1429 'fibonacci'
1430 ])
1431 await dynamicThreadPool.destroy()
1432 const fixedClusterPool = new FixedClusterPool(
1433 numberOfWorkers,
1434 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1435 )
1436 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1437 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1438 DEFAULT_TASK_NAME,
1439 'jsonIntegerSerialization',
1440 'factorial',
1441 'fibonacci'
1442 ])
1443 await fixedClusterPool.destroy()
1444 })
1445
1446 it('Verify that setDefaultTaskFunction() is working', async () => {
1447 const dynamicThreadPool = new DynamicThreadPool(
1448 Math.floor(numberOfWorkers / 2),
1449 numberOfWorkers,
1450 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1451 )
1452 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1453 await expect(
1454 dynamicThreadPool.setDefaultTaskFunction(0)
1455 ).rejects.toThrowError(
1456 new Error(
1457 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1458 )
1459 )
1460 await expect(
1461 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1462 ).rejects.toThrowError(
1463 new Error(
1464 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1465 )
1466 )
1467 await expect(
1468 dynamicThreadPool.setDefaultTaskFunction('unknown')
1469 ).rejects.toThrowError(
1470 new Error(
1471 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1472 )
1473 )
1474 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1475 DEFAULT_TASK_NAME,
1476 'jsonIntegerSerialization',
1477 'factorial',
1478 'fibonacci'
1479 ])
1480 await expect(
1481 dynamicThreadPool.setDefaultTaskFunction('factorial')
1482 ).resolves.toBe(true)
1483 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1484 DEFAULT_TASK_NAME,
1485 'factorial',
1486 'jsonIntegerSerialization',
1487 'fibonacci'
1488 ])
1489 await expect(
1490 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1491 ).resolves.toBe(true)
1492 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1493 DEFAULT_TASK_NAME,
1494 'fibonacci',
1495 'jsonIntegerSerialization',
1496 'factorial'
1497 ])
1498 })
1499
1500 it('Verify that multiple task functions worker is working', async () => {
1501 const pool = new DynamicClusterPool(
1502 Math.floor(numberOfWorkers / 2),
1503 numberOfWorkers,
1504 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1505 )
1506 const data = { n: 10 }
1507 const result0 = await pool.execute(data)
1508 expect(result0).toStrictEqual({ ok: 1 })
1509 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1510 expect(result1).toStrictEqual({ ok: 1 })
1511 const result2 = await pool.execute(data, 'factorial')
1512 expect(result2).toBe(3628800)
1513 const result3 = await pool.execute(data, 'fibonacci')
1514 expect(result3).toBe(55)
1515 expect(pool.info.executingTasks).toBe(0)
1516 expect(pool.info.executedTasks).toBe(4)
1517 for (const workerNode of pool.workerNodes) {
1518 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1519 DEFAULT_TASK_NAME,
1520 'jsonIntegerSerialization',
1521 'factorial',
1522 'fibonacci'
1523 ])
1524 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1525 for (const name of pool.listTaskFunctionNames()) {
1526 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1527 tasks: {
1528 executed: expect.any(Number),
1529 executing: 0,
1530 failed: 0,
1531 queued: 0,
1532 stolen: 0
1533 },
1534 runTime: {
1535 history: expect.any(CircularArray)
1536 },
1537 waitTime: {
1538 history: expect.any(CircularArray)
1539 },
1540 elu: {
1541 idle: {
1542 history: expect.any(CircularArray)
1543 },
1544 active: {
1545 history: expect.any(CircularArray)
1546 }
1547 }
1548 })
1549 expect(
1550 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1551 ).toBeGreaterThan(0)
1552 }
1553 expect(
1554 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1555 ).toStrictEqual(
1556 workerNode.getTaskFunctionWorkerUsage(
1557 workerNode.info.taskFunctionNames[1]
1558 )
1559 )
1560 }
1561 await pool.destroy()
1562 })
1563
1564 it('Verify sendKillMessageToWorker()', async () => {
1565 const pool = new DynamicClusterPool(
1566 Math.floor(numberOfWorkers / 2),
1567 numberOfWorkers,
1568 './tests/worker-files/cluster/testWorker.js'
1569 )
1570 const workerNodeKey = 0
1571 await expect(
1572 pool.sendKillMessageToWorker(workerNodeKey)
1573 ).resolves.toBeUndefined()
1574 await pool.destroy()
1575 })
1576
1577 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1578 const pool = new DynamicClusterPool(
1579 Math.floor(numberOfWorkers / 2),
1580 numberOfWorkers,
1581 './tests/worker-files/cluster/testWorker.js'
1582 )
1583 const workerNodeKey = 0
1584 await expect(
1585 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1586 taskFunctionOperation: 'add',
1587 taskFunctionName: 'empty',
1588 taskFunction: (() => {}).toString()
1589 })
1590 ).resolves.toBe(true)
1591 expect(
1592 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1593 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1594 await pool.destroy()
1595 })
1596
1597 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1598 const pool = new DynamicClusterPool(
1599 Math.floor(numberOfWorkers / 2),
1600 numberOfWorkers,
1601 './tests/worker-files/cluster/testWorker.js'
1602 )
1603 await expect(
1604 pool.sendTaskFunctionOperationToWorkers({
1605 taskFunctionOperation: 'add',
1606 taskFunctionName: 'empty',
1607 taskFunction: (() => {}).toString()
1608 })
1609 ).resolves.toBe(true)
1610 for (const workerNode of pool.workerNodes) {
1611 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1612 DEFAULT_TASK_NAME,
1613 'test',
1614 'empty'
1615 ])
1616 }
1617 await pool.destroy()
1618 })
1619 })