test: add missing pool destroy()
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
7 import {
8 DynamicClusterPool,
9 DynamicThreadPool,
10 FixedClusterPool,
11 FixedThreadPool,
12 PoolEvents,
13 PoolTypes,
14 WorkerChoiceStrategies,
15 WorkerTypes
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
22
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
25 readFileSync(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
27 'utf8'
28 )
29 ).version
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
32 isMain () {
33 return false
34 }
35 }
36
37 afterEach(() => {
38 restore()
39 })
40
41 it('Simulate pool creation from a non main thread/process', () => {
42 expect(
43 () =>
44 new StubPoolWithIsMain(
45 numberOfWorkers,
46 './tests/worker-files/thread/testWorker.mjs',
47 {
48 errorHandler: e => console.error(e)
49 }
50 )
51 ).toThrow(
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
55 )
56 })
57
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
61 './tests/worker-files/thread/testWorker.mjs'
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
66 })
67
68 it('Verify that filePath is checked', () => {
69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
70 new Error("Cannot find the worker file 'undefined'")
71 )
72 expect(
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
74 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
75 })
76
77 it('Verify that numberOfWorkers is checked', () => {
78 expect(
79 () =>
80 new FixedThreadPool(
81 undefined,
82 './tests/worker-files/thread/testWorker.mjs'
83 )
84 ).toThrow(
85 new Error(
86 'Cannot instantiate a pool without specifying the number of workers'
87 )
88 )
89 })
90
91 it('Verify that a negative number of workers is checked', () => {
92 expect(
93 () =>
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
95 ).toThrow(
96 new RangeError(
97 'Cannot instantiate a pool with a negative number of workers'
98 )
99 )
100 })
101
102 it('Verify that a non integer number of workers is checked', () => {
103 expect(
104 () =>
105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
106 ).toThrow(
107 new TypeError(
108 'Cannot instantiate a pool with a non safe integer number of workers'
109 )
110 )
111 })
112
113 it('Verify that dynamic pool sizing is checked', () => {
114 expect(
115 () =>
116 new DynamicClusterPool(
117 1,
118 undefined,
119 './tests/worker-files/cluster/testWorker.js'
120 )
121 ).toThrow(
122 new TypeError(
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 )
125 )
126 expect(
127 () =>
128 new DynamicThreadPool(
129 0.5,
130 1,
131 './tests/worker-files/thread/testWorker.mjs'
132 )
133 ).toThrow(
134 new TypeError(
135 'Cannot instantiate a pool with a non safe integer number of workers'
136 )
137 )
138 expect(
139 () =>
140 new DynamicClusterPool(
141 0,
142 0.5,
143 './tests/worker-files/cluster/testWorker.js'
144 )
145 ).toThrow(
146 new TypeError(
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 )
149 )
150 expect(
151 () =>
152 new DynamicThreadPool(
153 2,
154 1,
155 './tests/worker-files/thread/testWorker.mjs'
156 )
157 ).toThrow(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
164 new DynamicThreadPool(
165 0,
166 0,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
169 ).toThrow(
170 new RangeError(
171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
172 )
173 )
174 expect(
175 () =>
176 new DynamicClusterPool(
177 1,
178 1,
179 './tests/worker-files/cluster/testWorker.js'
180 )
181 ).toThrow(
182 new RangeError(
183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
184 )
185 )
186 })
187
188 it('Verify that pool options are checked', async () => {
189 let pool = new FixedThreadPool(
190 numberOfWorkers,
191 './tests/worker-files/thread/testWorker.mjs'
192 )
193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
194 expect(pool.opts).toStrictEqual({
195 startWorkers: true,
196 enableEvents: true,
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
201 retries: 6,
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
205 }
206 })
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
208 retries: 6,
209 runTime: { median: false },
210 waitTime: { median: false },
211 elu: { median: false }
212 })
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
216 retries: 6,
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
220 })
221 }
222 await pool.destroy()
223 const testHandler = () => console.info('test handler executed')
224 pool = new FixedThreadPool(
225 numberOfWorkers,
226 './tests/worker-files/thread/testWorker.mjs',
227 {
228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
229 workerChoiceStrategyOptions: {
230 runTime: { median: true },
231 weights: { 0: 300, 1: 200 }
232 },
233 enableEvents: false,
234 restartWorkerOnError: false,
235 enableTasksQueue: true,
236 tasksQueueOptions: { concurrency: 2 },
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
241 }
242 )
243 expect(pool.emitter).toBeUndefined()
244 expect(pool.opts).toStrictEqual({
245 startWorkers: true,
246 enableEvents: false,
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
249 tasksQueueOptions: {
250 concurrency: 2,
251 size: Math.pow(numberOfWorkers, 2),
252 taskStealing: true,
253 tasksStealingOnBackPressure: true
254 },
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
257 retries: 6,
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
262 },
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
267 })
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
269 retries: 6,
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
274 })
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
278 retries: 6,
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
283 })
284 }
285 await pool.destroy()
286 })
287
288 it('Verify that pool options are validated', () => {
289 expect(
290 () =>
291 new FixedThreadPool(
292 numberOfWorkers,
293 './tests/worker-files/thread/testWorker.mjs',
294 {
295 workerChoiceStrategy: 'invalidStrategy'
296 }
297 )
298 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
299 expect(
300 () =>
301 new FixedThreadPool(
302 numberOfWorkers,
303 './tests/worker-files/thread/testWorker.mjs',
304 {
305 workerChoiceStrategyOptions: {
306 retries: 'invalidChoiceRetries'
307 }
308 }
309 )
310 ).toThrow(
311 new TypeError(
312 'Invalid worker choice strategy options: retries must be an integer'
313 )
314 )
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
319 './tests/worker-files/thread/testWorker.mjs',
320 {
321 workerChoiceStrategyOptions: {
322 retries: -1
323 }
324 }
325 )
326 ).toThrow(
327 new RangeError(
328 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
329 )
330 )
331 expect(
332 () =>
333 new FixedThreadPool(
334 numberOfWorkers,
335 './tests/worker-files/thread/testWorker.mjs',
336 {
337 workerChoiceStrategyOptions: { weights: {} }
338 }
339 )
340 ).toThrow(
341 new Error(
342 'Invalid worker choice strategy options: must have a weight for each worker node'
343 )
344 )
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
349 './tests/worker-files/thread/testWorker.mjs',
350 {
351 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
352 }
353 )
354 ).toThrow(
355 new Error(
356 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
357 )
358 )
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
363 './tests/worker-files/thread/testWorker.mjs',
364 {
365 enableTasksQueue: true,
366 tasksQueueOptions: 'invalidTasksQueueOptions'
367 }
368 )
369 ).toThrow(
370 new TypeError('Invalid tasks queue options: must be a plain object')
371 )
372 expect(
373 () =>
374 new FixedThreadPool(
375 numberOfWorkers,
376 './tests/worker-files/thread/testWorker.mjs',
377 {
378 enableTasksQueue: true,
379 tasksQueueOptions: { concurrency: 0 }
380 }
381 )
382 ).toThrow(
383 new RangeError(
384 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
385 )
386 )
387 expect(
388 () =>
389 new FixedThreadPool(
390 numberOfWorkers,
391 './tests/worker-files/thread/testWorker.mjs',
392 {
393 enableTasksQueue: true,
394 tasksQueueOptions: { concurrency: -1 }
395 }
396 )
397 ).toThrow(
398 new RangeError(
399 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
400 )
401 )
402 expect(
403 () =>
404 new FixedThreadPool(
405 numberOfWorkers,
406 './tests/worker-files/thread/testWorker.mjs',
407 {
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: 0.2 }
410 }
411 )
412 ).toThrow(
413 new TypeError('Invalid worker node tasks concurrency: must be an integer')
414 )
415 expect(
416 () =>
417 new FixedThreadPool(
418 numberOfWorkers,
419 './tests/worker-files/thread/testWorker.mjs',
420 {
421 enableTasksQueue: true,
422 tasksQueueOptions: { size: 0 }
423 }
424 )
425 ).toThrow(
426 new RangeError(
427 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
428 )
429 )
430 expect(
431 () =>
432 new FixedThreadPool(
433 numberOfWorkers,
434 './tests/worker-files/thread/testWorker.mjs',
435 {
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: -1 }
438 }
439 )
440 ).toThrow(
441 new RangeError(
442 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
443 )
444 )
445 expect(
446 () =>
447 new FixedThreadPool(
448 numberOfWorkers,
449 './tests/worker-files/thread/testWorker.mjs',
450 {
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: 0.2 }
453 }
454 )
455 ).toThrow(
456 new TypeError('Invalid worker node tasks queue size: must be an integer')
457 )
458 })
459
460 it('Verify that pool worker choice strategy options can be set', async () => {
461 const pool = new FixedThreadPool(
462 numberOfWorkers,
463 './tests/worker-files/thread/testWorker.mjs',
464 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
465 )
466 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
467 retries: 6,
468 runTime: { median: false },
469 waitTime: { median: false },
470 elu: { median: false }
471 })
472 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
473 retries: 6,
474 runTime: { median: false },
475 waitTime: { median: false },
476 elu: { median: false }
477 })
478 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
479 .workerChoiceStrategies) {
480 expect(workerChoiceStrategy.opts).toStrictEqual({
481 retries: 6,
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
485 })
486 }
487 expect(
488 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 ).toStrictEqual({
490 runTime: {
491 aggregate: true,
492 average: true,
493 median: false
494 },
495 waitTime: {
496 aggregate: false,
497 average: false,
498 median: false
499 },
500 elu: {
501 aggregate: true,
502 average: true,
503 median: false
504 }
505 })
506 pool.setWorkerChoiceStrategyOptions({
507 runTime: { median: true },
508 elu: { median: true }
509 })
510 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
511 retries: 6,
512 runTime: { median: true },
513 waitTime: { median: false },
514 elu: { median: true }
515 })
516 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
517 retries: 6,
518 runTime: { median: true },
519 waitTime: { median: false },
520 elu: { median: true }
521 })
522 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
523 .workerChoiceStrategies) {
524 expect(workerChoiceStrategy.opts).toStrictEqual({
525 retries: 6,
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
529 })
530 }
531 expect(
532 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
533 ).toStrictEqual({
534 runTime: {
535 aggregate: true,
536 average: false,
537 median: true
538 },
539 waitTime: {
540 aggregate: false,
541 average: false,
542 median: false
543 },
544 elu: {
545 aggregate: true,
546 average: false,
547 median: true
548 }
549 })
550 pool.setWorkerChoiceStrategyOptions({
551 runTime: { median: false },
552 elu: { median: false }
553 })
554 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
555 retries: 6,
556 runTime: { median: false },
557 waitTime: { median: false },
558 elu: { median: false }
559 })
560 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
561 retries: 6,
562 runTime: { median: false },
563 waitTime: { median: false },
564 elu: { median: false }
565 })
566 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
567 .workerChoiceStrategies) {
568 expect(workerChoiceStrategy.opts).toStrictEqual({
569 retries: 6,
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
573 })
574 }
575 expect(
576 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
577 ).toStrictEqual({
578 runTime: {
579 aggregate: true,
580 average: true,
581 median: false
582 },
583 waitTime: {
584 aggregate: false,
585 average: false,
586 median: false
587 },
588 elu: {
589 aggregate: true,
590 average: true,
591 median: false
592 }
593 })
594 expect(() =>
595 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
596 ).toThrow(
597 new TypeError(
598 'Invalid worker choice strategy options: must be a plain object'
599 )
600 )
601 expect(() =>
602 pool.setWorkerChoiceStrategyOptions({
603 retries: 'invalidChoiceRetries'
604 })
605 ).toThrow(
606 new TypeError(
607 'Invalid worker choice strategy options: retries must be an integer'
608 )
609 )
610 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
611 new RangeError(
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
613 )
614 )
615 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
616 new Error(
617 'Invalid worker choice strategy options: must have a weight for each worker node'
618 )
619 )
620 expect(() =>
621 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
622 ).toThrow(
623 new Error(
624 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
625 )
626 )
627 await pool.destroy()
628 })
629
630 it('Verify that pool tasks queue can be enabled/disabled', async () => {
631 const pool = new FixedThreadPool(
632 numberOfWorkers,
633 './tests/worker-files/thread/testWorker.mjs'
634 )
635 expect(pool.opts.enableTasksQueue).toBe(false)
636 expect(pool.opts.tasksQueueOptions).toBeUndefined()
637 for (const workerNode of pool.workerNodes) {
638 expect(workerNode.onEmptyQueue).toBeUndefined()
639 expect(workerNode.onBackPressure).toBeUndefined()
640 }
641 pool.enableTasksQueue(true)
642 expect(pool.opts.enableTasksQueue).toBe(true)
643 expect(pool.opts.tasksQueueOptions).toStrictEqual({
644 concurrency: 1,
645 size: Math.pow(numberOfWorkers, 2),
646 taskStealing: true,
647 tasksStealingOnBackPressure: true
648 })
649 for (const workerNode of pool.workerNodes) {
650 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
651 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
652 }
653 pool.enableTasksQueue(true, { concurrency: 2 })
654 expect(pool.opts.enableTasksQueue).toBe(true)
655 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 concurrency: 2,
657 size: Math.pow(numberOfWorkers, 2),
658 taskStealing: true,
659 tasksStealingOnBackPressure: true
660 })
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
663 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
664 }
665 pool.enableTasksQueue(false)
666 expect(pool.opts.enableTasksQueue).toBe(false)
667 expect(pool.opts.tasksQueueOptions).toBeUndefined()
668 for (const workerNode of pool.workerNodes) {
669 expect(workerNode.onEmptyQueue).toBeUndefined()
670 expect(workerNode.onBackPressure).toBeUndefined()
671 }
672 await pool.destroy()
673 })
674
675 it('Verify that pool tasks queue options can be set', async () => {
676 const pool = new FixedThreadPool(
677 numberOfWorkers,
678 './tests/worker-files/thread/testWorker.mjs',
679 { enableTasksQueue: true }
680 )
681 expect(pool.opts.tasksQueueOptions).toStrictEqual({
682 concurrency: 1,
683 size: Math.pow(numberOfWorkers, 2),
684 taskStealing: true,
685 tasksStealingOnBackPressure: true
686 })
687 for (const workerNode of pool.workerNodes) {
688 expect(workerNode.tasksQueueBackPressureSize).toBe(
689 pool.opts.tasksQueueOptions.size
690 )
691 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
692 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
693 }
694 pool.setTasksQueueOptions({
695 concurrency: 2,
696 size: 2,
697 taskStealing: false,
698 tasksStealingOnBackPressure: false
699 })
700 expect(pool.opts.tasksQueueOptions).toStrictEqual({
701 concurrency: 2,
702 size: 2,
703 taskStealing: false,
704 tasksStealingOnBackPressure: false
705 })
706 for (const workerNode of pool.workerNodes) {
707 expect(workerNode.tasksQueueBackPressureSize).toBe(
708 pool.opts.tasksQueueOptions.size
709 )
710 expect(workerNode.onEmptyQueue).toBeUndefined()
711 expect(workerNode.onBackPressure).toBeUndefined()
712 }
713 pool.setTasksQueueOptions({
714 concurrency: 1,
715 taskStealing: true,
716 tasksStealingOnBackPressure: true
717 })
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
719 concurrency: 1,
720 size: Math.pow(numberOfWorkers, 2),
721 taskStealing: true,
722 tasksStealingOnBackPressure: true
723 })
724 for (const workerNode of pool.workerNodes) {
725 expect(workerNode.tasksQueueBackPressureSize).toBe(
726 pool.opts.tasksQueueOptions.size
727 )
728 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
729 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
730 }
731 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
732 new TypeError('Invalid tasks queue options: must be a plain object')
733 )
734 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
735 new RangeError(
736 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
737 )
738 )
739 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
740 new RangeError(
741 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
742 )
743 )
744 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
745 new TypeError('Invalid worker node tasks concurrency: must be an integer')
746 )
747 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
748 new RangeError(
749 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
750 )
751 )
752 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
753 new RangeError(
754 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
755 )
756 )
757 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
758 new TypeError('Invalid worker node tasks queue size: must be an integer')
759 )
760 await pool.destroy()
761 })
762
763 it('Verify that pool info is set', async () => {
764 let pool = new FixedThreadPool(
765 numberOfWorkers,
766 './tests/worker-files/thread/testWorker.mjs'
767 )
768 expect(pool.info).toStrictEqual({
769 version,
770 type: PoolTypes.fixed,
771 worker: WorkerTypes.thread,
772 started: true,
773 ready: true,
774 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
775 minSize: numberOfWorkers,
776 maxSize: numberOfWorkers,
777 workerNodes: numberOfWorkers,
778 idleWorkerNodes: numberOfWorkers,
779 busyWorkerNodes: 0,
780 executedTasks: 0,
781 executingTasks: 0,
782 failedTasks: 0
783 })
784 await pool.destroy()
785 pool = new DynamicClusterPool(
786 Math.floor(numberOfWorkers / 2),
787 numberOfWorkers,
788 './tests/worker-files/cluster/testWorker.js'
789 )
790 expect(pool.info).toStrictEqual({
791 version,
792 type: PoolTypes.dynamic,
793 worker: WorkerTypes.cluster,
794 started: true,
795 ready: true,
796 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
797 minSize: Math.floor(numberOfWorkers / 2),
798 maxSize: numberOfWorkers,
799 workerNodes: Math.floor(numberOfWorkers / 2),
800 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
801 busyWorkerNodes: 0,
802 executedTasks: 0,
803 executingTasks: 0,
804 failedTasks: 0
805 })
806 await pool.destroy()
807 })
808
809 it('Verify that pool worker tasks usage are initialized', async () => {
810 const pool = new FixedClusterPool(
811 numberOfWorkers,
812 './tests/worker-files/cluster/testWorker.js'
813 )
814 for (const workerNode of pool.workerNodes) {
815 expect(workerNode).toBeInstanceOf(WorkerNode)
816 expect(workerNode.usage).toStrictEqual({
817 tasks: {
818 executed: 0,
819 executing: 0,
820 queued: 0,
821 maxQueued: 0,
822 stolen: 0,
823 failed: 0
824 },
825 runTime: {
826 history: new CircularArray()
827 },
828 waitTime: {
829 history: new CircularArray()
830 },
831 elu: {
832 idle: {
833 history: new CircularArray()
834 },
835 active: {
836 history: new CircularArray()
837 }
838 }
839 })
840 }
841 await pool.destroy()
842 })
843
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
846 numberOfWorkers,
847 './tests/worker-files/cluster/testWorker.js'
848 )
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
854 }
855 await pool.destroy()
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
858 numberOfWorkers,
859 './tests/worker-files/thread/testWorker.mjs'
860 )
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 }
867 await pool.destroy()
868 })
869
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
872 numberOfWorkers,
873 './tests/worker-files/cluster/testWorker.js'
874 )
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
880 dynamic: false,
881 ready: true
882 })
883 }
884 await pool.destroy()
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
887 numberOfWorkers,
888 './tests/worker-files/thread/testWorker.mjs'
889 )
890 for (const workerNode of pool.workerNodes) {
891 expect(workerNode).toBeInstanceOf(WorkerNode)
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
895 dynamic: false,
896 ready: true
897 })
898 }
899 await pool.destroy()
900 })
901
902 it('Verify that pool can be started after initialization', async () => {
903 const pool = new FixedClusterPool(
904 numberOfWorkers,
905 './tests/worker-files/cluster/testWorker.js',
906 {
907 startWorkers: false
908 }
909 )
910 expect(pool.info.started).toBe(false)
911 expect(pool.info.ready).toBe(false)
912 expect(pool.workerNodes).toStrictEqual([])
913 await expect(pool.execute()).rejects.toThrow(
914 new Error('Cannot execute a task on not started pool')
915 )
916 pool.start()
917 expect(pool.info.started).toBe(true)
918 expect(pool.info.ready).toBe(true)
919 expect(pool.workerNodes.length).toBe(numberOfWorkers)
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode).toBeInstanceOf(WorkerNode)
922 }
923 await pool.destroy()
924 })
925
926 it('Verify that pool execute() arguments are checked', async () => {
927 const pool = new FixedClusterPool(
928 numberOfWorkers,
929 './tests/worker-files/cluster/testWorker.js'
930 )
931 await expect(pool.execute(undefined, 0)).rejects.toThrow(
932 new TypeError('name argument must be a string')
933 )
934 await expect(pool.execute(undefined, '')).rejects.toThrow(
935 new TypeError('name argument must not be an empty string')
936 )
937 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
938 new TypeError('transferList argument must be an array')
939 )
940 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
941 "Task function 'unknown' not found"
942 )
943 await pool.destroy()
944 await expect(pool.execute()).rejects.toThrow(
945 new Error('Cannot execute a task on not started pool')
946 )
947 })
948
949 it('Verify that pool worker tasks usage are computed', async () => {
950 const pool = new FixedClusterPool(
951 numberOfWorkers,
952 './tests/worker-files/cluster/testWorker.js'
953 )
954 const promises = new Set()
955 const maxMultiplier = 2
956 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
957 promises.add(pool.execute())
958 }
959 for (const workerNode of pool.workerNodes) {
960 expect(workerNode.usage).toStrictEqual({
961 tasks: {
962 executed: 0,
963 executing: maxMultiplier,
964 queued: 0,
965 maxQueued: 0,
966 stolen: 0,
967 failed: 0
968 },
969 runTime: {
970 history: expect.any(CircularArray)
971 },
972 waitTime: {
973 history: expect.any(CircularArray)
974 },
975 elu: {
976 idle: {
977 history: expect.any(CircularArray)
978 },
979 active: {
980 history: expect.any(CircularArray)
981 }
982 }
983 })
984 }
985 await Promise.all(promises)
986 for (const workerNode of pool.workerNodes) {
987 expect(workerNode.usage).toStrictEqual({
988 tasks: {
989 executed: maxMultiplier,
990 executing: 0,
991 queued: 0,
992 maxQueued: 0,
993 stolen: 0,
994 failed: 0
995 },
996 runTime: {
997 history: expect.any(CircularArray)
998 },
999 waitTime: {
1000 history: expect.any(CircularArray)
1001 },
1002 elu: {
1003 idle: {
1004 history: expect.any(CircularArray)
1005 },
1006 active: {
1007 history: expect.any(CircularArray)
1008 }
1009 }
1010 })
1011 }
1012 await pool.destroy()
1013 })
1014
1015 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1016 const pool = new DynamicThreadPool(
1017 Math.floor(numberOfWorkers / 2),
1018 numberOfWorkers,
1019 './tests/worker-files/thread/testWorker.mjs'
1020 )
1021 const promises = new Set()
1022 const maxMultiplier = 2
1023 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1024 promises.add(pool.execute())
1025 }
1026 await Promise.all(promises)
1027 for (const workerNode of pool.workerNodes) {
1028 expect(workerNode.usage).toStrictEqual({
1029 tasks: {
1030 executed: expect.any(Number),
1031 executing: 0,
1032 queued: 0,
1033 maxQueued: 0,
1034 stolen: 0,
1035 failed: 0
1036 },
1037 runTime: {
1038 history: expect.any(CircularArray)
1039 },
1040 waitTime: {
1041 history: expect.any(CircularArray)
1042 },
1043 elu: {
1044 idle: {
1045 history: expect.any(CircularArray)
1046 },
1047 active: {
1048 history: expect.any(CircularArray)
1049 }
1050 }
1051 })
1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1055 )
1056 expect(workerNode.usage.runTime.history.length).toBe(0)
1057 expect(workerNode.usage.waitTime.history.length).toBe(0)
1058 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1059 expect(workerNode.usage.elu.active.history.length).toBe(0)
1060 }
1061 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1062 for (const workerNode of pool.workerNodes) {
1063 expect(workerNode.usage).toStrictEqual({
1064 tasks: {
1065 executed: 0,
1066 executing: 0,
1067 queued: 0,
1068 maxQueued: 0,
1069 stolen: 0,
1070 failed: 0
1071 },
1072 runTime: {
1073 history: expect.any(CircularArray)
1074 },
1075 waitTime: {
1076 history: expect.any(CircularArray)
1077 },
1078 elu: {
1079 idle: {
1080 history: expect.any(CircularArray)
1081 },
1082 active: {
1083 history: expect.any(CircularArray)
1084 }
1085 }
1086 })
1087 expect(workerNode.usage.runTime.history.length).toBe(0)
1088 expect(workerNode.usage.waitTime.history.length).toBe(0)
1089 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1090 expect(workerNode.usage.elu.active.history.length).toBe(0)
1091 }
1092 await pool.destroy()
1093 })
1094
1095 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1096 const pool = new DynamicClusterPool(
1097 Math.floor(numberOfWorkers / 2),
1098 numberOfWorkers,
1099 './tests/worker-files/cluster/testWorker.js'
1100 )
1101 expect(pool.emitter.eventNames()).toStrictEqual([])
1102 let poolInfo
1103 let poolReady = 0
1104 pool.emitter.on(PoolEvents.ready, info => {
1105 ++poolReady
1106 poolInfo = info
1107 })
1108 await waitPoolEvents(pool, PoolEvents.ready, 1)
1109 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1110 expect(poolReady).toBe(1)
1111 expect(poolInfo).toStrictEqual({
1112 version,
1113 type: PoolTypes.dynamic,
1114 worker: WorkerTypes.cluster,
1115 started: true,
1116 ready: true,
1117 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1118 minSize: expect.any(Number),
1119 maxSize: expect.any(Number),
1120 workerNodes: expect.any(Number),
1121 idleWorkerNodes: expect.any(Number),
1122 busyWorkerNodes: expect.any(Number),
1123 executedTasks: expect.any(Number),
1124 executingTasks: expect.any(Number),
1125 failedTasks: expect.any(Number)
1126 })
1127 await pool.destroy()
1128 })
1129
1130 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1131 const pool = new FixedThreadPool(
1132 numberOfWorkers,
1133 './tests/worker-files/thread/testWorker.mjs'
1134 )
1135 expect(pool.emitter.eventNames()).toStrictEqual([])
1136 const promises = new Set()
1137 let poolBusy = 0
1138 let poolInfo
1139 pool.emitter.on(PoolEvents.busy, info => {
1140 ++poolBusy
1141 poolInfo = info
1142 })
1143 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
1145 promises.add(pool.execute())
1146 }
1147 await Promise.all(promises)
1148 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1149 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1150 expect(poolBusy).toBe(numberOfWorkers + 1)
1151 expect(poolInfo).toStrictEqual({
1152 version,
1153 type: PoolTypes.fixed,
1154 worker: WorkerTypes.thread,
1155 started: true,
1156 ready: true,
1157 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1158 minSize: expect.any(Number),
1159 maxSize: expect.any(Number),
1160 workerNodes: expect.any(Number),
1161 idleWorkerNodes: expect.any(Number),
1162 busyWorkerNodes: expect.any(Number),
1163 executedTasks: expect.any(Number),
1164 executingTasks: expect.any(Number),
1165 failedTasks: expect.any(Number)
1166 })
1167 await pool.destroy()
1168 })
1169
1170 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1171 const pool = new DynamicThreadPool(
1172 Math.floor(numberOfWorkers / 2),
1173 numberOfWorkers,
1174 './tests/worker-files/thread/testWorker.mjs'
1175 )
1176 expect(pool.emitter.eventNames()).toStrictEqual([])
1177 const promises = new Set()
1178 let poolFull = 0
1179 let poolInfo
1180 pool.emitter.on(PoolEvents.full, info => {
1181 ++poolFull
1182 poolInfo = info
1183 })
1184 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1185 for (let i = 0; i < numberOfWorkers * 2; i++) {
1186 promises.add(pool.execute())
1187 }
1188 await Promise.all(promises)
1189 expect(poolFull).toBe(1)
1190 expect(poolInfo).toStrictEqual({
1191 version,
1192 type: PoolTypes.dynamic,
1193 worker: WorkerTypes.thread,
1194 started: true,
1195 ready: true,
1196 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1197 minSize: expect.any(Number),
1198 maxSize: expect.any(Number),
1199 workerNodes: expect.any(Number),
1200 idleWorkerNodes: expect.any(Number),
1201 busyWorkerNodes: expect.any(Number),
1202 executedTasks: expect.any(Number),
1203 executingTasks: expect.any(Number),
1204 failedTasks: expect.any(Number)
1205 })
1206 await pool.destroy()
1207 })
1208
1209 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1210 const pool = new FixedThreadPool(
1211 numberOfWorkers,
1212 './tests/worker-files/thread/testWorker.mjs',
1213 {
1214 enableTasksQueue: true
1215 }
1216 )
1217 stub(pool, 'hasBackPressure').returns(true)
1218 expect(pool.emitter.eventNames()).toStrictEqual([])
1219 const promises = new Set()
1220 let poolBackPressure = 0
1221 let poolInfo
1222 pool.emitter.on(PoolEvents.backPressure, info => {
1223 ++poolBackPressure
1224 poolInfo = info
1225 })
1226 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1227 for (let i = 0; i < numberOfWorkers + 1; i++) {
1228 promises.add(pool.execute())
1229 }
1230 await Promise.all(promises)
1231 expect(poolBackPressure).toBe(1)
1232 expect(poolInfo).toStrictEqual({
1233 version,
1234 type: PoolTypes.fixed,
1235 worker: WorkerTypes.thread,
1236 started: true,
1237 ready: true,
1238 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1239 minSize: expect.any(Number),
1240 maxSize: expect.any(Number),
1241 workerNodes: expect.any(Number),
1242 idleWorkerNodes: expect.any(Number),
1243 busyWorkerNodes: expect.any(Number),
1244 executedTasks: expect.any(Number),
1245 executingTasks: expect.any(Number),
1246 maxQueuedTasks: expect.any(Number),
1247 queuedTasks: expect.any(Number),
1248 backPressure: true,
1249 stolenTasks: expect.any(Number),
1250 failedTasks: expect.any(Number)
1251 })
1252 expect(pool.hasBackPressure.called).toBe(true)
1253 await pool.destroy()
1254 })
1255
1256 it('Verify that hasTaskFunction() is working', async () => {
1257 const dynamicThreadPool = new DynamicThreadPool(
1258 Math.floor(numberOfWorkers / 2),
1259 numberOfWorkers,
1260 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1261 )
1262 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1263 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1264 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1265 true
1266 )
1267 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1268 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1269 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1270 await dynamicThreadPool.destroy()
1271 const fixedClusterPool = new FixedClusterPool(
1272 numberOfWorkers,
1273 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1274 )
1275 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1276 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1277 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1278 true
1279 )
1280 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1281 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1282 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1283 await fixedClusterPool.destroy()
1284 })
1285
1286 it('Verify that addTaskFunction() is working', async () => {
1287 const dynamicThreadPool = new DynamicThreadPool(
1288 Math.floor(numberOfWorkers / 2),
1289 numberOfWorkers,
1290 './tests/worker-files/thread/testWorker.mjs'
1291 )
1292 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1293 await expect(
1294 dynamicThreadPool.addTaskFunction(0, () => {})
1295 ).rejects.toThrow(new TypeError('name argument must be a string'))
1296 await expect(
1297 dynamicThreadPool.addTaskFunction('', () => {})
1298 ).rejects.toThrow(
1299 new TypeError('name argument must not be an empty string')
1300 )
1301 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1302 new TypeError('fn argument must be a function')
1303 )
1304 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1305 new TypeError('fn argument must be a function')
1306 )
1307 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1308 DEFAULT_TASK_NAME,
1309 'test'
1310 ])
1311 const echoTaskFunction = data => {
1312 return data
1313 }
1314 await expect(
1315 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1316 ).resolves.toBe(true)
1317 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1318 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1319 echoTaskFunction
1320 )
1321 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1322 DEFAULT_TASK_NAME,
1323 'test',
1324 'echo'
1325 ])
1326 const taskFunctionData = { test: 'test' }
1327 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1328 expect(echoResult).toStrictEqual(taskFunctionData)
1329 for (const workerNode of dynamicThreadPool.workerNodes) {
1330 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1331 tasks: {
1332 executed: expect.any(Number),
1333 executing: 0,
1334 queued: 0,
1335 stolen: 0,
1336 failed: 0
1337 },
1338 runTime: {
1339 history: new CircularArray()
1340 },
1341 waitTime: {
1342 history: new CircularArray()
1343 },
1344 elu: {
1345 idle: {
1346 history: new CircularArray()
1347 },
1348 active: {
1349 history: new CircularArray()
1350 }
1351 }
1352 })
1353 }
1354 await dynamicThreadPool.destroy()
1355 })
1356
1357 it('Verify that removeTaskFunction() is working', async () => {
1358 const dynamicThreadPool = new DynamicThreadPool(
1359 Math.floor(numberOfWorkers / 2),
1360 numberOfWorkers,
1361 './tests/worker-files/thread/testWorker.mjs'
1362 )
1363 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1364 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1365 DEFAULT_TASK_NAME,
1366 'test'
1367 ])
1368 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1369 new Error('Cannot remove a task function not handled on the pool side')
1370 )
1371 const echoTaskFunction = data => {
1372 return data
1373 }
1374 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1375 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1376 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1377 echoTaskFunction
1378 )
1379 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1380 DEFAULT_TASK_NAME,
1381 'test',
1382 'echo'
1383 ])
1384 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1385 true
1386 )
1387 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1388 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1389 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1390 DEFAULT_TASK_NAME,
1391 'test'
1392 ])
1393 await dynamicThreadPool.destroy()
1394 })
1395
1396 it('Verify that listTaskFunctionNames() is working', async () => {
1397 const dynamicThreadPool = new DynamicThreadPool(
1398 Math.floor(numberOfWorkers / 2),
1399 numberOfWorkers,
1400 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1401 )
1402 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1403 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1404 DEFAULT_TASK_NAME,
1405 'jsonIntegerSerialization',
1406 'factorial',
1407 'fibonacci'
1408 ])
1409 await dynamicThreadPool.destroy()
1410 const fixedClusterPool = new FixedClusterPool(
1411 numberOfWorkers,
1412 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1413 )
1414 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1415 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1416 DEFAULT_TASK_NAME,
1417 'jsonIntegerSerialization',
1418 'factorial',
1419 'fibonacci'
1420 ])
1421 await fixedClusterPool.destroy()
1422 })
1423
1424 it('Verify that setDefaultTaskFunction() is working', async () => {
1425 const dynamicThreadPool = new DynamicThreadPool(
1426 Math.floor(numberOfWorkers / 2),
1427 numberOfWorkers,
1428 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1429 )
1430 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1431 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1432 new Error(
1433 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1434 )
1435 )
1436 await expect(
1437 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1438 ).rejects.toThrow(
1439 new Error(
1440 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1441 )
1442 )
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('unknown')
1445 ).rejects.toThrow(
1446 new Error(
1447 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1448 )
1449 )
1450 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1451 DEFAULT_TASK_NAME,
1452 'jsonIntegerSerialization',
1453 'factorial',
1454 'fibonacci'
1455 ])
1456 await expect(
1457 dynamicThreadPool.setDefaultTaskFunction('factorial')
1458 ).resolves.toBe(true)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 DEFAULT_TASK_NAME,
1461 'factorial',
1462 'jsonIntegerSerialization',
1463 'fibonacci'
1464 ])
1465 await expect(
1466 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1467 ).resolves.toBe(true)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 DEFAULT_TASK_NAME,
1470 'fibonacci',
1471 'jsonIntegerSerialization',
1472 'factorial'
1473 ])
1474 await dynamicThreadPool.destroy()
1475 })
1476
1477 it('Verify that multiple task functions worker is working', async () => {
1478 const pool = new DynamicClusterPool(
1479 Math.floor(numberOfWorkers / 2),
1480 numberOfWorkers,
1481 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1482 )
1483 const data = { n: 10 }
1484 const result0 = await pool.execute(data)
1485 expect(result0).toStrictEqual({ ok: 1 })
1486 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1487 expect(result1).toStrictEqual({ ok: 1 })
1488 const result2 = await pool.execute(data, 'factorial')
1489 expect(result2).toBe(3628800)
1490 const result3 = await pool.execute(data, 'fibonacci')
1491 expect(result3).toBe(55)
1492 expect(pool.info.executingTasks).toBe(0)
1493 expect(pool.info.executedTasks).toBe(4)
1494 for (const workerNode of pool.workerNodes) {
1495 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1496 DEFAULT_TASK_NAME,
1497 'jsonIntegerSerialization',
1498 'factorial',
1499 'fibonacci'
1500 ])
1501 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1502 for (const name of pool.listTaskFunctionNames()) {
1503 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1504 tasks: {
1505 executed: expect.any(Number),
1506 executing: 0,
1507 failed: 0,
1508 queued: 0,
1509 stolen: 0
1510 },
1511 runTime: {
1512 history: expect.any(CircularArray)
1513 },
1514 waitTime: {
1515 history: expect.any(CircularArray)
1516 },
1517 elu: {
1518 idle: {
1519 history: expect.any(CircularArray)
1520 },
1521 active: {
1522 history: expect.any(CircularArray)
1523 }
1524 }
1525 })
1526 expect(
1527 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1528 ).toBeGreaterThan(0)
1529 }
1530 expect(
1531 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1532 ).toStrictEqual(
1533 workerNode.getTaskFunctionWorkerUsage(
1534 workerNode.info.taskFunctionNames[1]
1535 )
1536 )
1537 }
1538 await pool.destroy()
1539 })
1540
1541 it('Verify sendKillMessageToWorker()', async () => {
1542 const pool = new DynamicClusterPool(
1543 Math.floor(numberOfWorkers / 2),
1544 numberOfWorkers,
1545 './tests/worker-files/cluster/testWorker.js'
1546 )
1547 const workerNodeKey = 0
1548 await expect(
1549 pool.sendKillMessageToWorker(workerNodeKey)
1550 ).resolves.toBeUndefined()
1551 await pool.destroy()
1552 })
1553
1554 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1555 const pool = new DynamicClusterPool(
1556 Math.floor(numberOfWorkers / 2),
1557 numberOfWorkers,
1558 './tests/worker-files/cluster/testWorker.js'
1559 )
1560 const workerNodeKey = 0
1561 await expect(
1562 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1563 taskFunctionOperation: 'add',
1564 taskFunctionName: 'empty',
1565 taskFunction: (() => {}).toString()
1566 })
1567 ).resolves.toBe(true)
1568 expect(
1569 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1570 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1571 await pool.destroy()
1572 })
1573
1574 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1575 const pool = new DynamicClusterPool(
1576 Math.floor(numberOfWorkers / 2),
1577 numberOfWorkers,
1578 './tests/worker-files/cluster/testWorker.js'
1579 )
1580 await expect(
1581 pool.sendTaskFunctionOperationToWorkers({
1582 taskFunctionOperation: 'add',
1583 taskFunctionName: 'empty',
1584 taskFunction: (() => {}).toString()
1585 })
1586 ).resolves.toBe(true)
1587 for (const workerNode of pool.workerNodes) {
1588 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1589 DEFAULT_TASK_NAME,
1590 'test',
1591 'empty'
1592 ])
1593 }
1594 await pool.destroy()
1595 })
1596 })