fix: fix pool ready event emission
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
7 import {
8 DynamicClusterPool,
9 DynamicThreadPool,
10 FixedClusterPool,
11 FixedThreadPool,
12 PoolEvents,
13 PoolTypes,
14 WorkerChoiceStrategies,
15 WorkerTypes
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
22
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
25 readFileSync(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
27 'utf8'
28 )
29 ).version
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
32 isMain () {
33 return false
34 }
35 }
36
37 afterEach(() => {
38 restore()
39 })
40
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
43 numberOfWorkers,
44 './tests/worker-files/thread/testWorker.mjs'
45 )
46 expect(pool).toBeInstanceOf(FixedThreadPool)
47 await pool.destroy()
48 })
49
50 it('Verify that pool cannot be created from a non main thread/process', () => {
51 expect(
52 () =>
53 new StubPoolWithIsMain(
54 numberOfWorkers,
55 './tests/worker-files/thread/testWorker.mjs',
56 {
57 errorHandler: e => console.error(e)
58 }
59 )
60 ).toThrow(
61 new Error(
62 'Cannot start a pool from a worker with the same type as the pool'
63 )
64 )
65 })
66
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
69 numberOfWorkers,
70 './tests/worker-files/thread/testWorker.mjs'
71 )
72 expect(pool.started).toBe(true)
73 expect(pool.starting).toBe(false)
74 expect(pool.destroying).toBe(false)
75 await pool.destroy()
76 })
77
78 it('Verify that filePath is checked', () => {
79 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
80 new Error("Cannot find the worker file 'undefined'")
81 )
82 expect(
83 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
84 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
85 })
86
87 it('Verify that numberOfWorkers is checked', () => {
88 expect(
89 () =>
90 new FixedThreadPool(
91 undefined,
92 './tests/worker-files/thread/testWorker.mjs'
93 )
94 ).toThrow(
95 new Error(
96 'Cannot instantiate a pool without specifying the number of workers'
97 )
98 )
99 })
100
101 it('Verify that a negative number of workers is checked', () => {
102 expect(
103 () =>
104 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
105 ).toThrow(
106 new RangeError(
107 'Cannot instantiate a pool with a negative number of workers'
108 )
109 )
110 })
111
112 it('Verify that a non integer number of workers is checked', () => {
113 expect(
114 () =>
115 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
116 ).toThrow(
117 new TypeError(
118 'Cannot instantiate a pool with a non safe integer number of workers'
119 )
120 )
121 })
122
123 it('Verify that dynamic pool sizing is checked', () => {
124 expect(
125 () =>
126 new DynamicClusterPool(
127 1,
128 undefined,
129 './tests/worker-files/cluster/testWorker.js'
130 )
131 ).toThrow(
132 new TypeError(
133 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
134 )
135 )
136 expect(
137 () =>
138 new DynamicThreadPool(
139 0.5,
140 1,
141 './tests/worker-files/thread/testWorker.mjs'
142 )
143 ).toThrow(
144 new TypeError(
145 'Cannot instantiate a pool with a non safe integer number of workers'
146 )
147 )
148 expect(
149 () =>
150 new DynamicClusterPool(
151 0,
152 0.5,
153 './tests/worker-files/cluster/testWorker.js'
154 )
155 ).toThrow(
156 new TypeError(
157 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
158 )
159 )
160 expect(
161 () =>
162 new DynamicThreadPool(
163 2,
164 1,
165 './tests/worker-files/thread/testWorker.mjs'
166 )
167 ).toThrow(
168 new RangeError(
169 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
170 )
171 )
172 expect(
173 () =>
174 new DynamicThreadPool(
175 0,
176 0,
177 './tests/worker-files/thread/testWorker.mjs'
178 )
179 ).toThrow(
180 new RangeError(
181 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
182 )
183 )
184 expect(
185 () =>
186 new DynamicClusterPool(
187 1,
188 1,
189 './tests/worker-files/cluster/testWorker.js'
190 )
191 ).toThrow(
192 new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 )
196 })
197
198 it('Verify that pool options are checked', async () => {
199 let pool = new FixedThreadPool(
200 numberOfWorkers,
201 './tests/worker-files/thread/testWorker.mjs'
202 )
203 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
204 expect(pool.opts).toStrictEqual({
205 startWorkers: true,
206 enableEvents: true,
207 restartWorkerOnError: true,
208 enableTasksQueue: false,
209 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
210 workerChoiceStrategyOptions: {
211 retries: 6,
212 runTime: { median: false },
213 waitTime: { median: false },
214 elu: { median: false }
215 }
216 })
217 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
218 retries: 6,
219 runTime: { median: false },
220 waitTime: { median: false },
221 elu: { median: false }
222 })
223 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
224 .workerChoiceStrategies) {
225 expect(workerChoiceStrategy.opts).toStrictEqual({
226 retries: 6,
227 runTime: { median: false },
228 waitTime: { median: false },
229 elu: { median: false }
230 })
231 }
232 await pool.destroy()
233 const testHandler = () => console.info('test handler executed')
234 pool = new FixedThreadPool(
235 numberOfWorkers,
236 './tests/worker-files/thread/testWorker.mjs',
237 {
238 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
239 workerChoiceStrategyOptions: {
240 runTime: { median: true },
241 weights: { 0: 300, 1: 200 }
242 },
243 enableEvents: false,
244 restartWorkerOnError: false,
245 enableTasksQueue: true,
246 tasksQueueOptions: { concurrency: 2 },
247 messageHandler: testHandler,
248 errorHandler: testHandler,
249 onlineHandler: testHandler,
250 exitHandler: testHandler
251 }
252 )
253 expect(pool.emitter).toBeUndefined()
254 expect(pool.opts).toStrictEqual({
255 startWorkers: true,
256 enableEvents: false,
257 restartWorkerOnError: false,
258 enableTasksQueue: true,
259 tasksQueueOptions: {
260 concurrency: 2,
261 size: Math.pow(numberOfWorkers, 2),
262 taskStealing: true,
263 tasksStealingOnBackPressure: true
264 },
265 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
266 workerChoiceStrategyOptions: {
267 retries: 6,
268 runTime: { median: true },
269 waitTime: { median: false },
270 elu: { median: false },
271 weights: { 0: 300, 1: 200 }
272 },
273 onlineHandler: testHandler,
274 messageHandler: testHandler,
275 errorHandler: testHandler,
276 exitHandler: testHandler
277 })
278 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
279 retries: 6,
280 runTime: { median: true },
281 waitTime: { median: false },
282 elu: { median: false },
283 weights: { 0: 300, 1: 200 }
284 })
285 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
286 .workerChoiceStrategies) {
287 expect(workerChoiceStrategy.opts).toStrictEqual({
288 retries: 6,
289 runTime: { median: true },
290 waitTime: { median: false },
291 elu: { median: false },
292 weights: { 0: 300, 1: 200 }
293 })
294 }
295 await pool.destroy()
296 })
297
298 it('Verify that pool options are validated', () => {
299 expect(
300 () =>
301 new FixedThreadPool(
302 numberOfWorkers,
303 './tests/worker-files/thread/testWorker.mjs',
304 {
305 workerChoiceStrategy: 'invalidStrategy'
306 }
307 )
308 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
309 expect(
310 () =>
311 new FixedThreadPool(
312 numberOfWorkers,
313 './tests/worker-files/thread/testWorker.mjs',
314 {
315 workerChoiceStrategyOptions: {
316 retries: 'invalidChoiceRetries'
317 }
318 }
319 )
320 ).toThrow(
321 new TypeError(
322 'Invalid worker choice strategy options: retries must be an integer'
323 )
324 )
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
329 './tests/worker-files/thread/testWorker.mjs',
330 {
331 workerChoiceStrategyOptions: {
332 retries: -1
333 }
334 }
335 )
336 ).toThrow(
337 new RangeError(
338 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
339 )
340 )
341 expect(
342 () =>
343 new FixedThreadPool(
344 numberOfWorkers,
345 './tests/worker-files/thread/testWorker.mjs',
346 {
347 workerChoiceStrategyOptions: { weights: {} }
348 }
349 )
350 ).toThrow(
351 new Error(
352 'Invalid worker choice strategy options: must have a weight for each worker node'
353 )
354 )
355 expect(
356 () =>
357 new FixedThreadPool(
358 numberOfWorkers,
359 './tests/worker-files/thread/testWorker.mjs',
360 {
361 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
362 }
363 )
364 ).toThrow(
365 new Error(
366 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
367 )
368 )
369 expect(
370 () =>
371 new FixedThreadPool(
372 numberOfWorkers,
373 './tests/worker-files/thread/testWorker.mjs',
374 {
375 enableTasksQueue: true,
376 tasksQueueOptions: 'invalidTasksQueueOptions'
377 }
378 )
379 ).toThrow(
380 new TypeError('Invalid tasks queue options: must be a plain object')
381 )
382 expect(
383 () =>
384 new FixedThreadPool(
385 numberOfWorkers,
386 './tests/worker-files/thread/testWorker.mjs',
387 {
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0 }
390 }
391 )
392 ).toThrow(
393 new RangeError(
394 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
395 )
396 )
397 expect(
398 () =>
399 new FixedThreadPool(
400 numberOfWorkers,
401 './tests/worker-files/thread/testWorker.mjs',
402 {
403 enableTasksQueue: true,
404 tasksQueueOptions: { concurrency: -1 }
405 }
406 )
407 ).toThrow(
408 new RangeError(
409 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
410 )
411 )
412 expect(
413 () =>
414 new FixedThreadPool(
415 numberOfWorkers,
416 './tests/worker-files/thread/testWorker.mjs',
417 {
418 enableTasksQueue: true,
419 tasksQueueOptions: { concurrency: 0.2 }
420 }
421 )
422 ).toThrow(
423 new TypeError('Invalid worker node tasks concurrency: must be an integer')
424 )
425 expect(
426 () =>
427 new FixedThreadPool(
428 numberOfWorkers,
429 './tests/worker-files/thread/testWorker.mjs',
430 {
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0 }
433 }
434 )
435 ).toThrow(
436 new RangeError(
437 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
438 )
439 )
440 expect(
441 () =>
442 new FixedThreadPool(
443 numberOfWorkers,
444 './tests/worker-files/thread/testWorker.mjs',
445 {
446 enableTasksQueue: true,
447 tasksQueueOptions: { size: -1 }
448 }
449 )
450 ).toThrow(
451 new RangeError(
452 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
453 )
454 )
455 expect(
456 () =>
457 new FixedThreadPool(
458 numberOfWorkers,
459 './tests/worker-files/thread/testWorker.mjs',
460 {
461 enableTasksQueue: true,
462 tasksQueueOptions: { size: 0.2 }
463 }
464 )
465 ).toThrow(
466 new TypeError('Invalid worker node tasks queue size: must be an integer')
467 )
468 })
469
470 it('Verify that pool worker choice strategy options can be set', async () => {
471 const pool = new FixedThreadPool(
472 numberOfWorkers,
473 './tests/worker-files/thread/testWorker.mjs',
474 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
475 )
476 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
477 retries: 6,
478 runTime: { median: false },
479 waitTime: { median: false },
480 elu: { median: false }
481 })
482 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
483 retries: 6,
484 runTime: { median: false },
485 waitTime: { median: false },
486 elu: { median: false }
487 })
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
490 expect(workerChoiceStrategy.opts).toStrictEqual({
491 retries: 6,
492 runTime: { median: false },
493 waitTime: { median: false },
494 elu: { median: false }
495 })
496 }
497 expect(
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
499 ).toStrictEqual({
500 runTime: {
501 aggregate: true,
502 average: true,
503 median: false
504 },
505 waitTime: {
506 aggregate: false,
507 average: false,
508 median: false
509 },
510 elu: {
511 aggregate: true,
512 average: true,
513 median: false
514 }
515 })
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: true },
518 elu: { median: true }
519 })
520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
521 retries: 6,
522 runTime: { median: true },
523 waitTime: { median: false },
524 elu: { median: true }
525 })
526 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
527 retries: 6,
528 runTime: { median: true },
529 waitTime: { median: false },
530 elu: { median: true }
531 })
532 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
533 .workerChoiceStrategies) {
534 expect(workerChoiceStrategy.opts).toStrictEqual({
535 retries: 6,
536 runTime: { median: true },
537 waitTime: { median: false },
538 elu: { median: true }
539 })
540 }
541 expect(
542 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
543 ).toStrictEqual({
544 runTime: {
545 aggregate: true,
546 average: false,
547 median: true
548 },
549 waitTime: {
550 aggregate: false,
551 average: false,
552 median: false
553 },
554 elu: {
555 aggregate: true,
556 average: false,
557 median: true
558 }
559 })
560 pool.setWorkerChoiceStrategyOptions({
561 runTime: { median: false },
562 elu: { median: false }
563 })
564 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
565 retries: 6,
566 runTime: { median: false },
567 waitTime: { median: false },
568 elu: { median: false }
569 })
570 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
571 retries: 6,
572 runTime: { median: false },
573 waitTime: { median: false },
574 elu: { median: false }
575 })
576 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
577 .workerChoiceStrategies) {
578 expect(workerChoiceStrategy.opts).toStrictEqual({
579 retries: 6,
580 runTime: { median: false },
581 waitTime: { median: false },
582 elu: { median: false }
583 })
584 }
585 expect(
586 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
587 ).toStrictEqual({
588 runTime: {
589 aggregate: true,
590 average: true,
591 median: false
592 },
593 waitTime: {
594 aggregate: false,
595 average: false,
596 median: false
597 },
598 elu: {
599 aggregate: true,
600 average: true,
601 median: false
602 }
603 })
604 expect(() =>
605 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
606 ).toThrow(
607 new TypeError(
608 'Invalid worker choice strategy options: must be a plain object'
609 )
610 )
611 expect(() =>
612 pool.setWorkerChoiceStrategyOptions({
613 retries: 'invalidChoiceRetries'
614 })
615 ).toThrow(
616 new TypeError(
617 'Invalid worker choice strategy options: retries must be an integer'
618 )
619 )
620 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
621 new RangeError(
622 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
623 )
624 )
625 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
626 new Error(
627 'Invalid worker choice strategy options: must have a weight for each worker node'
628 )
629 )
630 expect(() =>
631 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
632 ).toThrow(
633 new Error(
634 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
635 )
636 )
637 await pool.destroy()
638 })
639
640 it('Verify that pool tasks queue can be enabled/disabled', async () => {
641 const pool = new FixedThreadPool(
642 numberOfWorkers,
643 './tests/worker-files/thread/testWorker.mjs'
644 )
645 expect(pool.opts.enableTasksQueue).toBe(false)
646 expect(pool.opts.tasksQueueOptions).toBeUndefined()
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 concurrency: 1,
651 size: Math.pow(numberOfWorkers, 2),
652 taskStealing: true,
653 tasksStealingOnBackPressure: true
654 })
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
659 size: Math.pow(numberOfWorkers, 2),
660 taskStealing: true,
661 tasksStealingOnBackPressure: true
662 })
663 pool.enableTasksQueue(false)
664 expect(pool.opts.enableTasksQueue).toBe(false)
665 expect(pool.opts.tasksQueueOptions).toBeUndefined()
666 await pool.destroy()
667 })
668
669 it('Verify that pool tasks queue options can be set', async () => {
670 const pool = new FixedThreadPool(
671 numberOfWorkers,
672 './tests/worker-files/thread/testWorker.mjs',
673 { enableTasksQueue: true }
674 )
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
676 concurrency: 1,
677 size: Math.pow(numberOfWorkers, 2),
678 taskStealing: true,
679 tasksStealingOnBackPressure: true
680 })
681 for (const workerNode of pool.workerNodes) {
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
684 )
685 }
686 pool.setTasksQueueOptions({
687 concurrency: 2,
688 size: 2,
689 taskStealing: false,
690 tasksStealingOnBackPressure: false
691 })
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
693 concurrency: 2,
694 size: 2,
695 taskStealing: false,
696 tasksStealingOnBackPressure: false
697 })
698 for (const workerNode of pool.workerNodes) {
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
701 )
702 }
703 pool.setTasksQueueOptions({
704 concurrency: 1,
705 taskStealing: true,
706 tasksStealingOnBackPressure: true
707 })
708 expect(pool.opts.tasksQueueOptions).toStrictEqual({
709 concurrency: 1,
710 size: Math.pow(numberOfWorkers, 2),
711 taskStealing: true,
712 tasksStealingOnBackPressure: true
713 })
714 for (const workerNode of pool.workerNodes) {
715 expect(workerNode.tasksQueueBackPressureSize).toBe(
716 pool.opts.tasksQueueOptions.size
717 )
718 }
719 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
720 new TypeError('Invalid tasks queue options: must be a plain object')
721 )
722 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
723 new RangeError(
724 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
725 )
726 )
727 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
728 new RangeError(
729 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
730 )
731 )
732 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
733 new TypeError('Invalid worker node tasks concurrency: must be an integer')
734 )
735 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
736 new RangeError(
737 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
738 )
739 )
740 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
741 new RangeError(
742 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
743 )
744 )
745 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
746 new TypeError('Invalid worker node tasks queue size: must be an integer')
747 )
748 await pool.destroy()
749 })
750
751 it('Verify that pool info is set', async () => {
752 let pool = new FixedThreadPool(
753 numberOfWorkers,
754 './tests/worker-files/thread/testWorker.mjs'
755 )
756 expect(pool.info).toStrictEqual({
757 version,
758 type: PoolTypes.fixed,
759 worker: WorkerTypes.thread,
760 started: true,
761 ready: true,
762 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
763 minSize: numberOfWorkers,
764 maxSize: numberOfWorkers,
765 workerNodes: numberOfWorkers,
766 idleWorkerNodes: numberOfWorkers,
767 busyWorkerNodes: 0,
768 executedTasks: 0,
769 executingTasks: 0,
770 failedTasks: 0
771 })
772 await pool.destroy()
773 pool = new DynamicClusterPool(
774 Math.floor(numberOfWorkers / 2),
775 numberOfWorkers,
776 './tests/worker-files/cluster/testWorker.js'
777 )
778 expect(pool.info).toStrictEqual({
779 version,
780 type: PoolTypes.dynamic,
781 worker: WorkerTypes.cluster,
782 started: true,
783 ready: true,
784 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
785 minSize: Math.floor(numberOfWorkers / 2),
786 maxSize: numberOfWorkers,
787 workerNodes: Math.floor(numberOfWorkers / 2),
788 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
789 busyWorkerNodes: 0,
790 executedTasks: 0,
791 executingTasks: 0,
792 failedTasks: 0
793 })
794 await pool.destroy()
795 })
796
797 it('Verify that pool worker tasks usage are initialized', async () => {
798 const pool = new FixedClusterPool(
799 numberOfWorkers,
800 './tests/worker-files/cluster/testWorker.js'
801 )
802 for (const workerNode of pool.workerNodes) {
803 expect(workerNode).toBeInstanceOf(WorkerNode)
804 expect(workerNode.usage).toStrictEqual({
805 tasks: {
806 executed: 0,
807 executing: 0,
808 queued: 0,
809 maxQueued: 0,
810 stolen: 0,
811 failed: 0
812 },
813 runTime: {
814 history: new CircularArray()
815 },
816 waitTime: {
817 history: new CircularArray()
818 },
819 elu: {
820 idle: {
821 history: new CircularArray()
822 },
823 active: {
824 history: new CircularArray()
825 }
826 }
827 })
828 }
829 await pool.destroy()
830 })
831
832 it('Verify that pool worker tasks queue are initialized', async () => {
833 let pool = new FixedClusterPool(
834 numberOfWorkers,
835 './tests/worker-files/cluster/testWorker.js'
836 )
837 for (const workerNode of pool.workerNodes) {
838 expect(workerNode).toBeInstanceOf(WorkerNode)
839 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
840 expect(workerNode.tasksQueue.size).toBe(0)
841 expect(workerNode.tasksQueue.maxSize).toBe(0)
842 }
843 await pool.destroy()
844 pool = new DynamicThreadPool(
845 Math.floor(numberOfWorkers / 2),
846 numberOfWorkers,
847 './tests/worker-files/thread/testWorker.mjs'
848 )
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
854 }
855 await pool.destroy()
856 })
857
858 it('Verify that pool worker info are initialized', async () => {
859 let pool = new FixedClusterPool(
860 numberOfWorkers,
861 './tests/worker-files/cluster/testWorker.js'
862 )
863 for (const workerNode of pool.workerNodes) {
864 expect(workerNode).toBeInstanceOf(WorkerNode)
865 expect(workerNode.info).toStrictEqual({
866 id: expect.any(Number),
867 type: WorkerTypes.cluster,
868 dynamic: false,
869 ready: true
870 })
871 }
872 await pool.destroy()
873 pool = new DynamicThreadPool(
874 Math.floor(numberOfWorkers / 2),
875 numberOfWorkers,
876 './tests/worker-files/thread/testWorker.mjs'
877 )
878 for (const workerNode of pool.workerNodes) {
879 expect(workerNode).toBeInstanceOf(WorkerNode)
880 expect(workerNode.info).toStrictEqual({
881 id: expect.any(Number),
882 type: WorkerTypes.thread,
883 dynamic: false,
884 ready: true
885 })
886 }
887 await pool.destroy()
888 })
889
890 it('Verify that pool statuses are checked at start or destroy', async () => {
891 const pool = new FixedThreadPool(
892 numberOfWorkers,
893 './tests/worker-files/thread/testWorker.mjs'
894 )
895 expect(pool.info.started).toBe(true)
896 expect(pool.info.ready).toBe(true)
897 expect(() => pool.start()).toThrow(
898 new Error('Cannot start an already started pool')
899 )
900 await pool.destroy()
901 expect(pool.info.started).toBe(false)
902 expect(pool.info.ready).toBe(false)
903 await expect(pool.destroy()).rejects.toThrow(
904 new Error('Cannot destroy an already destroyed pool')
905 )
906 })
907
908 it('Verify that pool can be started after initialization', async () => {
909 const pool = new FixedClusterPool(
910 numberOfWorkers,
911 './tests/worker-files/cluster/testWorker.js',
912 {
913 startWorkers: false
914 }
915 )
916 expect(pool.info.started).toBe(false)
917 expect(pool.info.ready).toBe(false)
918 expect(pool.readyEventEmitted).toBe(false)
919 expect(pool.workerNodes).toStrictEqual([])
920 await expect(pool.execute()).rejects.toThrow(
921 new Error('Cannot execute a task on not started pool')
922 )
923 pool.start()
924 expect(pool.info.started).toBe(true)
925 expect(pool.info.ready).toBe(true)
926 await waitPoolEvents(pool, PoolEvents.ready, 1)
927 expect(pool.readyEventEmitted).toBe(true)
928 expect(pool.workerNodes.length).toBe(numberOfWorkers)
929 for (const workerNode of pool.workerNodes) {
930 expect(workerNode).toBeInstanceOf(WorkerNode)
931 }
932 await pool.destroy()
933 })
934
935 it('Verify that pool execute() arguments are checked', async () => {
936 const pool = new FixedClusterPool(
937 numberOfWorkers,
938 './tests/worker-files/cluster/testWorker.js'
939 )
940 await expect(pool.execute(undefined, 0)).rejects.toThrow(
941 new TypeError('name argument must be a string')
942 )
943 await expect(pool.execute(undefined, '')).rejects.toThrow(
944 new TypeError('name argument must not be an empty string')
945 )
946 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
947 new TypeError('transferList argument must be an array')
948 )
949 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
950 "Task function 'unknown' not found"
951 )
952 await pool.destroy()
953 await expect(pool.execute()).rejects.toThrow(
954 new Error('Cannot execute a task on not started pool')
955 )
956 })
957
958 it('Verify that pool worker tasks usage are computed', async () => {
959 const pool = new FixedClusterPool(
960 numberOfWorkers,
961 './tests/worker-files/cluster/testWorker.js'
962 )
963 const promises = new Set()
964 const maxMultiplier = 2
965 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
966 promises.add(pool.execute())
967 }
968 for (const workerNode of pool.workerNodes) {
969 expect(workerNode.usage).toStrictEqual({
970 tasks: {
971 executed: 0,
972 executing: maxMultiplier,
973 queued: 0,
974 maxQueued: 0,
975 stolen: 0,
976 failed: 0
977 },
978 runTime: {
979 history: expect.any(CircularArray)
980 },
981 waitTime: {
982 history: expect.any(CircularArray)
983 },
984 elu: {
985 idle: {
986 history: expect.any(CircularArray)
987 },
988 active: {
989 history: expect.any(CircularArray)
990 }
991 }
992 })
993 }
994 await Promise.all(promises)
995 for (const workerNode of pool.workerNodes) {
996 expect(workerNode.usage).toStrictEqual({
997 tasks: {
998 executed: maxMultiplier,
999 executing: 0,
1000 queued: 0,
1001 maxQueued: 0,
1002 stolen: 0,
1003 failed: 0
1004 },
1005 runTime: {
1006 history: expect.any(CircularArray)
1007 },
1008 waitTime: {
1009 history: expect.any(CircularArray)
1010 },
1011 elu: {
1012 idle: {
1013 history: expect.any(CircularArray)
1014 },
1015 active: {
1016 history: expect.any(CircularArray)
1017 }
1018 }
1019 })
1020 }
1021 await pool.destroy()
1022 })
1023
1024 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1025 const pool = new DynamicThreadPool(
1026 Math.floor(numberOfWorkers / 2),
1027 numberOfWorkers,
1028 './tests/worker-files/thread/testWorker.mjs'
1029 )
1030 const promises = new Set()
1031 const maxMultiplier = 2
1032 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1033 promises.add(pool.execute())
1034 }
1035 await Promise.all(promises)
1036 for (const workerNode of pool.workerNodes) {
1037 expect(workerNode.usage).toStrictEqual({
1038 tasks: {
1039 executed: expect.any(Number),
1040 executing: 0,
1041 queued: 0,
1042 maxQueued: 0,
1043 stolen: 0,
1044 failed: 0
1045 },
1046 runTime: {
1047 history: expect.any(CircularArray)
1048 },
1049 waitTime: {
1050 history: expect.any(CircularArray)
1051 },
1052 elu: {
1053 idle: {
1054 history: expect.any(CircularArray)
1055 },
1056 active: {
1057 history: expect.any(CircularArray)
1058 }
1059 }
1060 })
1061 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1062 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1063 numberOfWorkers * maxMultiplier
1064 )
1065 expect(workerNode.usage.runTime.history.length).toBe(0)
1066 expect(workerNode.usage.waitTime.history.length).toBe(0)
1067 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1068 expect(workerNode.usage.elu.active.history.length).toBe(0)
1069 }
1070 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1071 for (const workerNode of pool.workerNodes) {
1072 expect(workerNode.usage).toStrictEqual({
1073 tasks: {
1074 executed: 0,
1075 executing: 0,
1076 queued: 0,
1077 maxQueued: 0,
1078 stolen: 0,
1079 failed: 0
1080 },
1081 runTime: {
1082 history: expect.any(CircularArray)
1083 },
1084 waitTime: {
1085 history: expect.any(CircularArray)
1086 },
1087 elu: {
1088 idle: {
1089 history: expect.any(CircularArray)
1090 },
1091 active: {
1092 history: expect.any(CircularArray)
1093 }
1094 }
1095 })
1096 expect(workerNode.usage.runTime.history.length).toBe(0)
1097 expect(workerNode.usage.waitTime.history.length).toBe(0)
1098 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1099 expect(workerNode.usage.elu.active.history.length).toBe(0)
1100 }
1101 await pool.destroy()
1102 })
1103
1104 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1105 const pool = new DynamicClusterPool(
1106 Math.floor(numberOfWorkers / 2),
1107 numberOfWorkers,
1108 './tests/worker-files/cluster/testWorker.js'
1109 )
1110 expect(pool.emitter.eventNames()).toStrictEqual([])
1111 let poolInfo
1112 let poolReady = 0
1113 pool.emitter.on(PoolEvents.ready, info => {
1114 ++poolReady
1115 poolInfo = info
1116 })
1117 await waitPoolEvents(pool, PoolEvents.ready, 1)
1118 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1119 expect(poolReady).toBe(1)
1120 expect(poolInfo).toStrictEqual({
1121 version,
1122 type: PoolTypes.dynamic,
1123 worker: WorkerTypes.cluster,
1124 started: true,
1125 ready: true,
1126 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1127 minSize: expect.any(Number),
1128 maxSize: expect.any(Number),
1129 workerNodes: expect.any(Number),
1130 idleWorkerNodes: expect.any(Number),
1131 busyWorkerNodes: expect.any(Number),
1132 executedTasks: expect.any(Number),
1133 executingTasks: expect.any(Number),
1134 failedTasks: expect.any(Number)
1135 })
1136 await pool.destroy()
1137 })
1138
1139 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1140 const pool = new FixedThreadPool(
1141 numberOfWorkers,
1142 './tests/worker-files/thread/testWorker.mjs'
1143 )
1144 expect(pool.emitter.eventNames()).toStrictEqual([])
1145 const promises = new Set()
1146 let poolBusy = 0
1147 let poolInfo
1148 pool.emitter.on(PoolEvents.busy, info => {
1149 ++poolBusy
1150 poolInfo = info
1151 })
1152 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1153 for (let i = 0; i < numberOfWorkers * 2; i++) {
1154 promises.add(pool.execute())
1155 }
1156 await Promise.all(promises)
1157 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1158 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1159 expect(poolBusy).toBe(numberOfWorkers + 1)
1160 expect(poolInfo).toStrictEqual({
1161 version,
1162 type: PoolTypes.fixed,
1163 worker: WorkerTypes.thread,
1164 started: true,
1165 ready: true,
1166 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1167 minSize: expect.any(Number),
1168 maxSize: expect.any(Number),
1169 workerNodes: expect.any(Number),
1170 idleWorkerNodes: expect.any(Number),
1171 busyWorkerNodes: expect.any(Number),
1172 executedTasks: expect.any(Number),
1173 executingTasks: expect.any(Number),
1174 failedTasks: expect.any(Number)
1175 })
1176 await pool.destroy()
1177 })
1178
1179 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1180 const pool = new DynamicThreadPool(
1181 Math.floor(numberOfWorkers / 2),
1182 numberOfWorkers,
1183 './tests/worker-files/thread/testWorker.mjs'
1184 )
1185 expect(pool.emitter.eventNames()).toStrictEqual([])
1186 const promises = new Set()
1187 let poolFull = 0
1188 let poolInfo
1189 pool.emitter.on(PoolEvents.full, info => {
1190 ++poolFull
1191 poolInfo = info
1192 })
1193 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1194 for (let i = 0; i < numberOfWorkers * 2; i++) {
1195 promises.add(pool.execute())
1196 }
1197 await Promise.all(promises)
1198 expect(poolFull).toBe(1)
1199 expect(poolInfo).toStrictEqual({
1200 version,
1201 type: PoolTypes.dynamic,
1202 worker: WorkerTypes.thread,
1203 started: true,
1204 ready: true,
1205 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1206 minSize: expect.any(Number),
1207 maxSize: expect.any(Number),
1208 workerNodes: expect.any(Number),
1209 idleWorkerNodes: expect.any(Number),
1210 busyWorkerNodes: expect.any(Number),
1211 executedTasks: expect.any(Number),
1212 executingTasks: expect.any(Number),
1213 failedTasks: expect.any(Number)
1214 })
1215 await pool.destroy()
1216 })
1217
1218 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1219 const pool = new FixedThreadPool(
1220 numberOfWorkers,
1221 './tests/worker-files/thread/testWorker.mjs',
1222 {
1223 enableTasksQueue: true
1224 }
1225 )
1226 stub(pool, 'hasBackPressure').returns(true)
1227 expect(pool.emitter.eventNames()).toStrictEqual([])
1228 const promises = new Set()
1229 let poolBackPressure = 0
1230 let poolInfo
1231 pool.emitter.on(PoolEvents.backPressure, info => {
1232 ++poolBackPressure
1233 poolInfo = info
1234 })
1235 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1236 for (let i = 0; i < numberOfWorkers + 1; i++) {
1237 promises.add(pool.execute())
1238 }
1239 await Promise.all(promises)
1240 expect(poolBackPressure).toBe(1)
1241 expect(poolInfo).toStrictEqual({
1242 version,
1243 type: PoolTypes.fixed,
1244 worker: WorkerTypes.thread,
1245 started: true,
1246 ready: true,
1247 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1248 minSize: expect.any(Number),
1249 maxSize: expect.any(Number),
1250 workerNodes: expect.any(Number),
1251 idleWorkerNodes: expect.any(Number),
1252 busyWorkerNodes: expect.any(Number),
1253 executedTasks: expect.any(Number),
1254 executingTasks: expect.any(Number),
1255 maxQueuedTasks: expect.any(Number),
1256 queuedTasks: expect.any(Number),
1257 backPressure: true,
1258 stolenTasks: expect.any(Number),
1259 failedTasks: expect.any(Number)
1260 })
1261 expect(pool.hasBackPressure.called).toBe(true)
1262 await pool.destroy()
1263 })
1264
1265 it('Verify that hasTaskFunction() is working', async () => {
1266 const dynamicThreadPool = new DynamicThreadPool(
1267 Math.floor(numberOfWorkers / 2),
1268 numberOfWorkers,
1269 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1270 )
1271 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1272 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1273 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1274 true
1275 )
1276 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1278 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1279 await dynamicThreadPool.destroy()
1280 const fixedClusterPool = new FixedClusterPool(
1281 numberOfWorkers,
1282 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1283 )
1284 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1285 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1286 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1287 true
1288 )
1289 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1291 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1292 await fixedClusterPool.destroy()
1293 })
1294
1295 it('Verify that addTaskFunction() is working', async () => {
1296 const dynamicThreadPool = new DynamicThreadPool(
1297 Math.floor(numberOfWorkers / 2),
1298 numberOfWorkers,
1299 './tests/worker-files/thread/testWorker.mjs'
1300 )
1301 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1302 await expect(
1303 dynamicThreadPool.addTaskFunction(0, () => {})
1304 ).rejects.toThrow(new TypeError('name argument must be a string'))
1305 await expect(
1306 dynamicThreadPool.addTaskFunction('', () => {})
1307 ).rejects.toThrow(
1308 new TypeError('name argument must not be an empty string')
1309 )
1310 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1311 new TypeError('fn argument must be a function')
1312 )
1313 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1314 new TypeError('fn argument must be a function')
1315 )
1316 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1317 DEFAULT_TASK_NAME,
1318 'test'
1319 ])
1320 const echoTaskFunction = data => {
1321 return data
1322 }
1323 await expect(
1324 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1325 ).resolves.toBe(true)
1326 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1327 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1328 echoTaskFunction
1329 )
1330 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1331 DEFAULT_TASK_NAME,
1332 'test',
1333 'echo'
1334 ])
1335 const taskFunctionData = { test: 'test' }
1336 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1337 expect(echoResult).toStrictEqual(taskFunctionData)
1338 for (const workerNode of dynamicThreadPool.workerNodes) {
1339 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1340 tasks: {
1341 executed: expect.any(Number),
1342 executing: 0,
1343 queued: 0,
1344 stolen: 0,
1345 failed: 0
1346 },
1347 runTime: {
1348 history: new CircularArray()
1349 },
1350 waitTime: {
1351 history: new CircularArray()
1352 },
1353 elu: {
1354 idle: {
1355 history: new CircularArray()
1356 },
1357 active: {
1358 history: new CircularArray()
1359 }
1360 }
1361 })
1362 }
1363 await dynamicThreadPool.destroy()
1364 })
1365
1366 it('Verify that removeTaskFunction() is working', async () => {
1367 const dynamicThreadPool = new DynamicThreadPool(
1368 Math.floor(numberOfWorkers / 2),
1369 numberOfWorkers,
1370 './tests/worker-files/thread/testWorker.mjs'
1371 )
1372 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1373 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1374 DEFAULT_TASK_NAME,
1375 'test'
1376 ])
1377 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1378 new Error('Cannot remove a task function not handled on the pool side')
1379 )
1380 const echoTaskFunction = data => {
1381 return data
1382 }
1383 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1384 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1385 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1386 echoTaskFunction
1387 )
1388 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1389 DEFAULT_TASK_NAME,
1390 'test',
1391 'echo'
1392 ])
1393 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1394 true
1395 )
1396 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1397 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1398 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1399 DEFAULT_TASK_NAME,
1400 'test'
1401 ])
1402 await dynamicThreadPool.destroy()
1403 })
1404
1405 it('Verify that listTaskFunctionNames() is working', async () => {
1406 const dynamicThreadPool = new DynamicThreadPool(
1407 Math.floor(numberOfWorkers / 2),
1408 numberOfWorkers,
1409 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1410 )
1411 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1412 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1413 DEFAULT_TASK_NAME,
1414 'jsonIntegerSerialization',
1415 'factorial',
1416 'fibonacci'
1417 ])
1418 await dynamicThreadPool.destroy()
1419 const fixedClusterPool = new FixedClusterPool(
1420 numberOfWorkers,
1421 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1422 )
1423 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1424 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1425 DEFAULT_TASK_NAME,
1426 'jsonIntegerSerialization',
1427 'factorial',
1428 'fibonacci'
1429 ])
1430 await fixedClusterPool.destroy()
1431 })
1432
1433 it('Verify that setDefaultTaskFunction() is working', async () => {
1434 const dynamicThreadPool = new DynamicThreadPool(
1435 Math.floor(numberOfWorkers / 2),
1436 numberOfWorkers,
1437 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1438 )
1439 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1440 const workerId = dynamicThreadPool.workerNodes[0].info.id
1441 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1442 new Error(
1443 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1444 )
1445 )
1446 await expect(
1447 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1448 ).rejects.toThrow(
1449 new Error(
1450 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1451 )
1452 )
1453 await expect(
1454 dynamicThreadPool.setDefaultTaskFunction('unknown')
1455 ).rejects.toThrow(
1456 new Error(
1457 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1458 )
1459 )
1460 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1461 DEFAULT_TASK_NAME,
1462 'jsonIntegerSerialization',
1463 'factorial',
1464 'fibonacci'
1465 ])
1466 await expect(
1467 dynamicThreadPool.setDefaultTaskFunction('factorial')
1468 ).resolves.toBe(true)
1469 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1470 DEFAULT_TASK_NAME,
1471 'factorial',
1472 'jsonIntegerSerialization',
1473 'fibonacci'
1474 ])
1475 await expect(
1476 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1477 ).resolves.toBe(true)
1478 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1479 DEFAULT_TASK_NAME,
1480 'fibonacci',
1481 'jsonIntegerSerialization',
1482 'factorial'
1483 ])
1484 await dynamicThreadPool.destroy()
1485 })
1486
1487 it('Verify that multiple task functions worker is working', async () => {
1488 const pool = new DynamicClusterPool(
1489 Math.floor(numberOfWorkers / 2),
1490 numberOfWorkers,
1491 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1492 )
1493 const data = { n: 10 }
1494 const result0 = await pool.execute(data)
1495 expect(result0).toStrictEqual({ ok: 1 })
1496 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1497 expect(result1).toStrictEqual({ ok: 1 })
1498 const result2 = await pool.execute(data, 'factorial')
1499 expect(result2).toBe(3628800)
1500 const result3 = await pool.execute(data, 'fibonacci')
1501 expect(result3).toBe(55)
1502 expect(pool.info.executingTasks).toBe(0)
1503 expect(pool.info.executedTasks).toBe(4)
1504 for (const workerNode of pool.workerNodes) {
1505 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1506 DEFAULT_TASK_NAME,
1507 'jsonIntegerSerialization',
1508 'factorial',
1509 'fibonacci'
1510 ])
1511 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1512 for (const name of pool.listTaskFunctionNames()) {
1513 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1514 tasks: {
1515 executed: expect.any(Number),
1516 executing: 0,
1517 failed: 0,
1518 queued: 0,
1519 stolen: 0
1520 },
1521 runTime: {
1522 history: expect.any(CircularArray)
1523 },
1524 waitTime: {
1525 history: expect.any(CircularArray)
1526 },
1527 elu: {
1528 idle: {
1529 history: expect.any(CircularArray)
1530 },
1531 active: {
1532 history: expect.any(CircularArray)
1533 }
1534 }
1535 })
1536 expect(
1537 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1538 ).toBeGreaterThan(0)
1539 }
1540 expect(
1541 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1542 ).toStrictEqual(
1543 workerNode.getTaskFunctionWorkerUsage(
1544 workerNode.info.taskFunctionNames[1]
1545 )
1546 )
1547 }
1548 await pool.destroy()
1549 })
1550
1551 it('Verify sendKillMessageToWorker()', async () => {
1552 const pool = new DynamicClusterPool(
1553 Math.floor(numberOfWorkers / 2),
1554 numberOfWorkers,
1555 './tests/worker-files/cluster/testWorker.js'
1556 )
1557 const workerNodeKey = 0
1558 await expect(
1559 pool.sendKillMessageToWorker(workerNodeKey)
1560 ).resolves.toBeUndefined()
1561 await pool.destroy()
1562 })
1563
1564 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1565 const pool = new DynamicClusterPool(
1566 Math.floor(numberOfWorkers / 2),
1567 numberOfWorkers,
1568 './tests/worker-files/cluster/testWorker.js'
1569 )
1570 const workerNodeKey = 0
1571 await expect(
1572 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1573 taskFunctionOperation: 'add',
1574 taskFunctionName: 'empty',
1575 taskFunction: (() => {}).toString()
1576 })
1577 ).resolves.toBe(true)
1578 expect(
1579 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1580 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1581 await pool.destroy()
1582 })
1583
1584 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1585 const pool = new DynamicClusterPool(
1586 Math.floor(numberOfWorkers / 2),
1587 numberOfWorkers,
1588 './tests/worker-files/cluster/testWorker.js'
1589 )
1590 await expect(
1591 pool.sendTaskFunctionOperationToWorkers({
1592 taskFunctionOperation: 'add',
1593 taskFunctionName: 'empty',
1594 taskFunction: (() => {}).toString()
1595 })
1596 ).resolves.toBe(true)
1597 for (const workerNode of pool.workerNodes) {
1598 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1599 DEFAULT_TASK_NAME,
1600 'test',
1601 'empty'
1602 ])
1603 }
1604 await pool.destroy()
1605 })
1606 })