test: switch thread worker to ESM
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { readFileSync } from 'node:fs'
3 import { expect } from 'expect'
4 import { restore, stub } from 'sinon'
5 import {
6 DynamicClusterPool,
7 DynamicThreadPool,
8 FixedClusterPool,
9 FixedThreadPool,
10 PoolEvents,
11 PoolTypes,
12 WorkerChoiceStrategies,
13 WorkerTypes
14 } from '../../lib/index.js'
15 import { CircularArray } from '../../lib/circular-array.js'
16 import { Deque } from '../../lib/deque.js'
17 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
18 import { waitPoolEvents } from '../test-utils.js'
19 import { WorkerNode } from '../../lib/pools/worker-node.js'
20
21 describe('Abstract pool test suite', () => {
22 const version = JSON.parse(readFileSync('./package.json', 'utf8')).version
23 const numberOfWorkers = 2
24 class StubPoolWithIsMain extends FixedThreadPool {
25 isMain () {
26 return false
27 }
28 }
29
30 afterEach(() => {
31 restore()
32 })
33
34 it('Simulate pool creation from a non main thread/process', () => {
35 expect(
36 () =>
37 new StubPoolWithIsMain(
38 numberOfWorkers,
39 './tests/worker-files/thread/testWorker.mjs',
40 {
41 errorHandler: e => console.error(e)
42 }
43 )
44 ).toThrowError(
45 new Error(
46 'Cannot start a pool from a worker with the same type as the pool'
47 )
48 )
49 })
50
51 it('Verify that pool statuses properties are set', async () => {
52 const pool = new FixedThreadPool(
53 numberOfWorkers,
54 './tests/worker-files/thread/testWorker.mjs'
55 )
56 expect(pool.starting).toBe(false)
57 expect(pool.started).toBe(true)
58 await pool.destroy()
59 })
60
61 it('Verify that filePath is checked', () => {
62 const expectedError = new Error(
63 'Please specify a file with a worker implementation'
64 )
65 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
66 expectedError
67 )
68 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
69 expectedError
70 )
71 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
72 expectedError
73 )
74 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
75 expectedError
76 )
77 expect(
78 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
79 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
80 })
81
82 it('Verify that numberOfWorkers is checked', () => {
83 expect(
84 () =>
85 new FixedThreadPool(
86 undefined,
87 './tests/worker-files/thread/testWorker.mjs'
88 )
89 ).toThrowError(
90 new Error(
91 'Cannot instantiate a pool without specifying the number of workers'
92 )
93 )
94 })
95
96 it('Verify that a negative number of workers is checked', () => {
97 expect(
98 () =>
99 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
100 ).toThrowError(
101 new RangeError(
102 'Cannot instantiate a pool with a negative number of workers'
103 )
104 )
105 })
106
107 it('Verify that a non integer number of workers is checked', () => {
108 expect(
109 () =>
110 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
111 ).toThrowError(
112 new TypeError(
113 'Cannot instantiate a pool with a non safe integer number of workers'
114 )
115 )
116 })
117
118 it('Verify that dynamic pool sizing is checked', () => {
119 expect(
120 () =>
121 new DynamicClusterPool(
122 1,
123 undefined,
124 './tests/worker-files/cluster/testWorker.js'
125 )
126 ).toThrowError(
127 new TypeError(
128 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
129 )
130 )
131 expect(
132 () =>
133 new DynamicThreadPool(
134 0.5,
135 1,
136 './tests/worker-files/thread/testWorker.mjs'
137 )
138 ).toThrowError(
139 new TypeError(
140 'Cannot instantiate a pool with a non safe integer number of workers'
141 )
142 )
143 expect(
144 () =>
145 new DynamicClusterPool(
146 0,
147 0.5,
148 './tests/worker-files/cluster/testWorker.js'
149 )
150 ).toThrowError(
151 new TypeError(
152 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
153 )
154 )
155 expect(
156 () =>
157 new DynamicThreadPool(
158 2,
159 1,
160 './tests/worker-files/thread/testWorker.mjs'
161 )
162 ).toThrowError(
163 new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 )
166 )
167 expect(
168 () =>
169 new DynamicThreadPool(
170 0,
171 0,
172 './tests/worker-files/thread/testWorker.mjs'
173 )
174 ).toThrowError(
175 new RangeError(
176 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
177 )
178 )
179 expect(
180 () =>
181 new DynamicClusterPool(
182 1,
183 1,
184 './tests/worker-files/cluster/testWorker.js'
185 )
186 ).toThrowError(
187 new RangeError(
188 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
189 )
190 )
191 })
192
193 it('Verify that pool options are checked', async () => {
194 let pool = new FixedThreadPool(
195 numberOfWorkers,
196 './tests/worker-files/thread/testWorker.mjs'
197 )
198 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
199 expect(pool.opts).toStrictEqual({
200 startWorkers: true,
201 enableEvents: true,
202 restartWorkerOnError: true,
203 enableTasksQueue: false,
204 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
205 workerChoiceStrategyOptions: {
206 retries: 6,
207 runTime: { median: false },
208 waitTime: { median: false },
209 elu: { median: false }
210 }
211 })
212 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
213 retries: 6,
214 runTime: { median: false },
215 waitTime: { median: false },
216 elu: { median: false }
217 })
218 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
219 .workerChoiceStrategies) {
220 expect(workerChoiceStrategy.opts).toStrictEqual({
221 retries: 6,
222 runTime: { median: false },
223 waitTime: { median: false },
224 elu: { median: false }
225 })
226 }
227 await pool.destroy()
228 const testHandler = () => console.info('test handler executed')
229 pool = new FixedThreadPool(
230 numberOfWorkers,
231 './tests/worker-files/thread/testWorker.mjs',
232 {
233 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
234 workerChoiceStrategyOptions: {
235 runTime: { median: true },
236 weights: { 0: 300, 1: 200 }
237 },
238 enableEvents: false,
239 restartWorkerOnError: false,
240 enableTasksQueue: true,
241 tasksQueueOptions: { concurrency: 2 },
242 messageHandler: testHandler,
243 errorHandler: testHandler,
244 onlineHandler: testHandler,
245 exitHandler: testHandler
246 }
247 )
248 expect(pool.emitter).toBeUndefined()
249 expect(pool.opts).toStrictEqual({
250 startWorkers: true,
251 enableEvents: false,
252 restartWorkerOnError: false,
253 enableTasksQueue: true,
254 tasksQueueOptions: {
255 concurrency: 2,
256 size: Math.pow(numberOfWorkers, 2),
257 taskStealing: true,
258 tasksStealingOnBackPressure: true
259 },
260 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
261 workerChoiceStrategyOptions: {
262 retries: 6,
263 runTime: { median: true },
264 waitTime: { median: false },
265 elu: { median: false },
266 weights: { 0: 300, 1: 200 }
267 },
268 onlineHandler: testHandler,
269 messageHandler: testHandler,
270 errorHandler: testHandler,
271 exitHandler: testHandler
272 })
273 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
274 retries: 6,
275 runTime: { median: true },
276 waitTime: { median: false },
277 elu: { median: false },
278 weights: { 0: 300, 1: 200 }
279 })
280 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
281 .workerChoiceStrategies) {
282 expect(workerChoiceStrategy.opts).toStrictEqual({
283 retries: 6,
284 runTime: { median: true },
285 waitTime: { median: false },
286 elu: { median: false },
287 weights: { 0: 300, 1: 200 }
288 })
289 }
290 await pool.destroy()
291 })
292
293 it('Verify that pool options are validated', async () => {
294 expect(
295 () =>
296 new FixedThreadPool(
297 numberOfWorkers,
298 './tests/worker-files/thread/testWorker.mjs',
299 {
300 workerChoiceStrategy: 'invalidStrategy'
301 }
302 )
303 ).toThrowError(
304 new Error("Invalid worker choice strategy 'invalidStrategy'")
305 )
306 expect(
307 () =>
308 new FixedThreadPool(
309 numberOfWorkers,
310 './tests/worker-files/thread/testWorker.mjs',
311 {
312 workerChoiceStrategyOptions: {
313 retries: 'invalidChoiceRetries'
314 }
315 }
316 )
317 ).toThrowError(
318 new TypeError(
319 'Invalid worker choice strategy options: retries must be an integer'
320 )
321 )
322 expect(
323 () =>
324 new FixedThreadPool(
325 numberOfWorkers,
326 './tests/worker-files/thread/testWorker.mjs',
327 {
328 workerChoiceStrategyOptions: {
329 retries: -1
330 }
331 }
332 )
333 ).toThrowError(
334 new RangeError(
335 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
336 )
337 )
338 expect(
339 () =>
340 new FixedThreadPool(
341 numberOfWorkers,
342 './tests/worker-files/thread/testWorker.mjs',
343 {
344 workerChoiceStrategyOptions: { weights: {} }
345 }
346 )
347 ).toThrowError(
348 new Error(
349 'Invalid worker choice strategy options: must have a weight for each worker node'
350 )
351 )
352 expect(
353 () =>
354 new FixedThreadPool(
355 numberOfWorkers,
356 './tests/worker-files/thread/testWorker.mjs',
357 {
358 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
359 }
360 )
361 ).toThrowError(
362 new Error(
363 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
364 )
365 )
366 expect(
367 () =>
368 new FixedThreadPool(
369 numberOfWorkers,
370 './tests/worker-files/thread/testWorker.mjs',
371 {
372 enableTasksQueue: true,
373 tasksQueueOptions: 'invalidTasksQueueOptions'
374 }
375 )
376 ).toThrowError(
377 new TypeError('Invalid tasks queue options: must be a plain object')
378 )
379 expect(
380 () =>
381 new FixedThreadPool(
382 numberOfWorkers,
383 './tests/worker-files/thread/testWorker.mjs',
384 {
385 enableTasksQueue: true,
386 tasksQueueOptions: { concurrency: 0 }
387 }
388 )
389 ).toThrowError(
390 new RangeError(
391 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
392 )
393 )
394 expect(
395 () =>
396 new FixedThreadPool(
397 numberOfWorkers,
398 './tests/worker-files/thread/testWorker.mjs',
399 {
400 enableTasksQueue: true,
401 tasksQueueOptions: { concurrency: -1 }
402 }
403 )
404 ).toThrowError(
405 new RangeError(
406 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
407 )
408 )
409 expect(
410 () =>
411 new FixedThreadPool(
412 numberOfWorkers,
413 './tests/worker-files/thread/testWorker.mjs',
414 {
415 enableTasksQueue: true,
416 tasksQueueOptions: { concurrency: 0.2 }
417 }
418 )
419 ).toThrowError(
420 new TypeError('Invalid worker node tasks concurrency: must be an integer')
421 )
422 expect(
423 () =>
424 new FixedThreadPool(
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.mjs',
427 {
428 enableTasksQueue: true,
429 tasksQueueOptions: { size: 0 }
430 }
431 )
432 ).toThrowError(
433 new RangeError(
434 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
435 )
436 )
437 expect(
438 () =>
439 new FixedThreadPool(
440 numberOfWorkers,
441 './tests/worker-files/thread/testWorker.mjs',
442 {
443 enableTasksQueue: true,
444 tasksQueueOptions: { size: -1 }
445 }
446 )
447 ).toThrowError(
448 new RangeError(
449 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
450 )
451 )
452 expect(
453 () =>
454 new FixedThreadPool(
455 numberOfWorkers,
456 './tests/worker-files/thread/testWorker.mjs',
457 {
458 enableTasksQueue: true,
459 tasksQueueOptions: { size: 0.2 }
460 }
461 )
462 ).toThrowError(
463 new TypeError('Invalid worker node tasks queue size: must be an integer')
464 )
465 })
466
467 it('Verify that pool worker choice strategy options can be set', async () => {
468 const pool = new FixedThreadPool(
469 numberOfWorkers,
470 './tests/worker-files/thread/testWorker.mjs',
471 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
472 )
473 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
474 retries: 6,
475 runTime: { median: false },
476 waitTime: { median: false },
477 elu: { median: false }
478 })
479 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
480 retries: 6,
481 runTime: { median: false },
482 waitTime: { median: false },
483 elu: { median: false }
484 })
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual({
488 retries: 6,
489 runTime: { median: false },
490 waitTime: { median: false },
491 elu: { median: false }
492 })
493 }
494 expect(
495 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
496 ).toStrictEqual({
497 runTime: {
498 aggregate: true,
499 average: true,
500 median: false
501 },
502 waitTime: {
503 aggregate: false,
504 average: false,
505 median: false
506 },
507 elu: {
508 aggregate: true,
509 average: true,
510 median: false
511 }
512 })
513 pool.setWorkerChoiceStrategyOptions({
514 runTime: { median: true },
515 elu: { median: true }
516 })
517 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
518 retries: 6,
519 runTime: { median: true },
520 waitTime: { median: false },
521 elu: { median: true }
522 })
523 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
524 retries: 6,
525 runTime: { median: true },
526 waitTime: { median: false },
527 elu: { median: true }
528 })
529 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
530 .workerChoiceStrategies) {
531 expect(workerChoiceStrategy.opts).toStrictEqual({
532 retries: 6,
533 runTime: { median: true },
534 waitTime: { median: false },
535 elu: { median: true }
536 })
537 }
538 expect(
539 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
540 ).toStrictEqual({
541 runTime: {
542 aggregate: true,
543 average: false,
544 median: true
545 },
546 waitTime: {
547 aggregate: false,
548 average: false,
549 median: false
550 },
551 elu: {
552 aggregate: true,
553 average: false,
554 median: true
555 }
556 })
557 pool.setWorkerChoiceStrategyOptions({
558 runTime: { median: false },
559 elu: { median: false }
560 })
561 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
562 retries: 6,
563 runTime: { median: false },
564 waitTime: { median: false },
565 elu: { median: false }
566 })
567 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
568 retries: 6,
569 runTime: { median: false },
570 waitTime: { median: false },
571 elu: { median: false }
572 })
573 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
574 .workerChoiceStrategies) {
575 expect(workerChoiceStrategy.opts).toStrictEqual({
576 retries: 6,
577 runTime: { median: false },
578 waitTime: { median: false },
579 elu: { median: false }
580 })
581 }
582 expect(
583 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
584 ).toStrictEqual({
585 runTime: {
586 aggregate: true,
587 average: true,
588 median: false
589 },
590 waitTime: {
591 aggregate: false,
592 average: false,
593 median: false
594 },
595 elu: {
596 aggregate: true,
597 average: true,
598 median: false
599 }
600 })
601 expect(() =>
602 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
603 ).toThrowError(
604 new TypeError(
605 'Invalid worker choice strategy options: must be a plain object'
606 )
607 )
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions({
610 retries: 'invalidChoiceRetries'
611 })
612 ).toThrowError(
613 new TypeError(
614 'Invalid worker choice strategy options: retries must be an integer'
615 )
616 )
617 expect(() =>
618 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
619 ).toThrowError(
620 new RangeError(
621 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
622 )
623 )
624 expect(() =>
625 pool.setWorkerChoiceStrategyOptions({ weights: {} })
626 ).toThrowError(
627 new Error(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
629 )
630 )
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
633 ).toThrowError(
634 new Error(
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
636 )
637 )
638 await pool.destroy()
639 })
640
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
643 numberOfWorkers,
644 './tests/worker-files/thread/testWorker.mjs'
645 )
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 for (const workerNode of pool.workerNodes) {
649 expect(workerNode.onEmptyQueue).toBeUndefined()
650 expect(workerNode.onBackPressure).toBeUndefined()
651 }
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 concurrency: 1,
656 size: Math.pow(numberOfWorkers, 2),
657 taskStealing: true,
658 tasksStealingOnBackPressure: true
659 })
660 for (const workerNode of pool.workerNodes) {
661 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
662 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
663 }
664 pool.enableTasksQueue(true, { concurrency: 2 })
665 expect(pool.opts.enableTasksQueue).toBe(true)
666 expect(pool.opts.tasksQueueOptions).toStrictEqual({
667 concurrency: 2,
668 size: Math.pow(numberOfWorkers, 2),
669 taskStealing: true,
670 tasksStealingOnBackPressure: true
671 })
672 for (const workerNode of pool.workerNodes) {
673 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
674 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
675 }
676 pool.enableTasksQueue(false)
677 expect(pool.opts.enableTasksQueue).toBe(false)
678 expect(pool.opts.tasksQueueOptions).toBeUndefined()
679 for (const workerNode of pool.workerNodes) {
680 expect(workerNode.onEmptyQueue).toBeUndefined()
681 expect(workerNode.onBackPressure).toBeUndefined()
682 }
683 await pool.destroy()
684 })
685
686 it('Verify that pool tasks queue options can be set', async () => {
687 const pool = new FixedThreadPool(
688 numberOfWorkers,
689 './tests/worker-files/thread/testWorker.mjs',
690 { enableTasksQueue: true }
691 )
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
693 concurrency: 1,
694 size: Math.pow(numberOfWorkers, 2),
695 taskStealing: true,
696 tasksStealingOnBackPressure: true
697 })
698 for (const workerNode of pool.workerNodes) {
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
701 )
702 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
703 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
704 }
705 pool.setTasksQueueOptions({
706 concurrency: 2,
707 size: 2,
708 taskStealing: false,
709 tasksStealingOnBackPressure: false
710 })
711 expect(pool.opts.tasksQueueOptions).toStrictEqual({
712 concurrency: 2,
713 size: 2,
714 taskStealing: false,
715 tasksStealingOnBackPressure: false
716 })
717 for (const workerNode of pool.workerNodes) {
718 expect(workerNode.tasksQueueBackPressureSize).toBe(
719 pool.opts.tasksQueueOptions.size
720 )
721 expect(workerNode.onEmptyQueue).toBeUndefined()
722 expect(workerNode.onBackPressure).toBeUndefined()
723 }
724 pool.setTasksQueueOptions({
725 concurrency: 1,
726 taskStealing: true,
727 tasksStealingOnBackPressure: true
728 })
729 expect(pool.opts.tasksQueueOptions).toStrictEqual({
730 concurrency: 1,
731 size: Math.pow(numberOfWorkers, 2),
732 taskStealing: true,
733 tasksStealingOnBackPressure: true
734 })
735 for (const workerNode of pool.workerNodes) {
736 expect(workerNode.tasksQueueBackPressureSize).toBe(
737 pool.opts.tasksQueueOptions.size
738 )
739 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
740 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
741 }
742 expect(() =>
743 pool.setTasksQueueOptions('invalidTasksQueueOptions')
744 ).toThrowError(
745 new TypeError('Invalid tasks queue options: must be a plain object')
746 )
747 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
748 new RangeError(
749 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
750 )
751 )
752 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
753 new RangeError(
754 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
755 )
756 )
757 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
758 new TypeError('Invalid worker node tasks concurrency: must be an integer')
759 )
760 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
761 new RangeError(
762 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
763 )
764 )
765 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
766 new RangeError(
767 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
768 )
769 )
770 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
771 new TypeError('Invalid worker node tasks queue size: must be an integer')
772 )
773 await pool.destroy()
774 })
775
776 it('Verify that pool info is set', async () => {
777 let pool = new FixedThreadPool(
778 numberOfWorkers,
779 './tests/worker-files/thread/testWorker.mjs'
780 )
781 expect(pool.info).toStrictEqual({
782 version,
783 type: PoolTypes.fixed,
784 worker: WorkerTypes.thread,
785 started: true,
786 ready: true,
787 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
788 minSize: numberOfWorkers,
789 maxSize: numberOfWorkers,
790 workerNodes: numberOfWorkers,
791 idleWorkerNodes: numberOfWorkers,
792 busyWorkerNodes: 0,
793 executedTasks: 0,
794 executingTasks: 0,
795 failedTasks: 0
796 })
797 await pool.destroy()
798 pool = new DynamicClusterPool(
799 Math.floor(numberOfWorkers / 2),
800 numberOfWorkers,
801 './tests/worker-files/cluster/testWorker.js'
802 )
803 expect(pool.info).toStrictEqual({
804 version,
805 type: PoolTypes.dynamic,
806 worker: WorkerTypes.cluster,
807 started: true,
808 ready: true,
809 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
810 minSize: Math.floor(numberOfWorkers / 2),
811 maxSize: numberOfWorkers,
812 workerNodes: Math.floor(numberOfWorkers / 2),
813 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
814 busyWorkerNodes: 0,
815 executedTasks: 0,
816 executingTasks: 0,
817 failedTasks: 0
818 })
819 await pool.destroy()
820 })
821
822 it('Verify that pool worker tasks usage are initialized', async () => {
823 const pool = new FixedClusterPool(
824 numberOfWorkers,
825 './tests/worker-files/cluster/testWorker.js'
826 )
827 for (const workerNode of pool.workerNodes) {
828 expect(workerNode).toBeInstanceOf(WorkerNode)
829 expect(workerNode.usage).toStrictEqual({
830 tasks: {
831 executed: 0,
832 executing: 0,
833 queued: 0,
834 maxQueued: 0,
835 stolen: 0,
836 failed: 0
837 },
838 runTime: {
839 history: new CircularArray()
840 },
841 waitTime: {
842 history: new CircularArray()
843 },
844 elu: {
845 idle: {
846 history: new CircularArray()
847 },
848 active: {
849 history: new CircularArray()
850 }
851 }
852 })
853 }
854 await pool.destroy()
855 })
856
857 it('Verify that pool worker tasks queue are initialized', async () => {
858 let pool = new FixedClusterPool(
859 numberOfWorkers,
860 './tests/worker-files/cluster/testWorker.js'
861 )
862 for (const workerNode of pool.workerNodes) {
863 expect(workerNode).toBeInstanceOf(WorkerNode)
864 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
865 expect(workerNode.tasksQueue.size).toBe(0)
866 expect(workerNode.tasksQueue.maxSize).toBe(0)
867 }
868 await pool.destroy()
869 pool = new DynamicThreadPool(
870 Math.floor(numberOfWorkers / 2),
871 numberOfWorkers,
872 './tests/worker-files/thread/testWorker.mjs'
873 )
874 for (const workerNode of pool.workerNodes) {
875 expect(workerNode).toBeInstanceOf(WorkerNode)
876 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
877 expect(workerNode.tasksQueue.size).toBe(0)
878 expect(workerNode.tasksQueue.maxSize).toBe(0)
879 }
880 await pool.destroy()
881 })
882
883 it('Verify that pool worker info are initialized', async () => {
884 let pool = new FixedClusterPool(
885 numberOfWorkers,
886 './tests/worker-files/cluster/testWorker.js'
887 )
888 for (const workerNode of pool.workerNodes) {
889 expect(workerNode).toBeInstanceOf(WorkerNode)
890 expect(workerNode.info).toStrictEqual({
891 id: expect.any(Number),
892 type: WorkerTypes.cluster,
893 dynamic: false,
894 ready: true
895 })
896 }
897 await pool.destroy()
898 pool = new DynamicThreadPool(
899 Math.floor(numberOfWorkers / 2),
900 numberOfWorkers,
901 './tests/worker-files/thread/testWorker.mjs'
902 )
903 for (const workerNode of pool.workerNodes) {
904 expect(workerNode).toBeInstanceOf(WorkerNode)
905 expect(workerNode.info).toStrictEqual({
906 id: expect.any(Number),
907 type: WorkerTypes.thread,
908 dynamic: false,
909 ready: true
910 })
911 }
912 await pool.destroy()
913 })
914
915 it('Verify that pool can be started after initialization', async () => {
916 const pool = new FixedClusterPool(
917 numberOfWorkers,
918 './tests/worker-files/cluster/testWorker.js',
919 {
920 startWorkers: false
921 }
922 )
923 expect(pool.info.started).toBe(false)
924 expect(pool.info.ready).toBe(false)
925 expect(pool.workerNodes).toStrictEqual([])
926 await expect(pool.execute()).rejects.toThrowError(
927 new Error('Cannot execute a task on not started pool')
928 )
929 pool.start()
930 expect(pool.info.started).toBe(true)
931 expect(pool.info.ready).toBe(true)
932 expect(pool.workerNodes.length).toBe(numberOfWorkers)
933 for (const workerNode of pool.workerNodes) {
934 expect(workerNode).toBeInstanceOf(WorkerNode)
935 }
936 await pool.destroy()
937 })
938
939 it('Verify that pool execute() arguments are checked', async () => {
940 const pool = new FixedClusterPool(
941 numberOfWorkers,
942 './tests/worker-files/cluster/testWorker.js'
943 )
944 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
945 new TypeError('name argument must be a string')
946 )
947 await expect(pool.execute(undefined, '')).rejects.toThrowError(
948 new TypeError('name argument must not be an empty string')
949 )
950 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
951 new TypeError('transferList argument must be an array')
952 )
953 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
954 "Task function 'unknown' not found"
955 )
956 await pool.destroy()
957 await expect(pool.execute()).rejects.toThrowError(
958 new Error('Cannot execute a task on not started pool')
959 )
960 })
961
962 it('Verify that pool worker tasks usage are computed', async () => {
963 const pool = new FixedClusterPool(
964 numberOfWorkers,
965 './tests/worker-files/cluster/testWorker.js'
966 )
967 const promises = new Set()
968 const maxMultiplier = 2
969 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
970 promises.add(pool.execute())
971 }
972 for (const workerNode of pool.workerNodes) {
973 expect(workerNode.usage).toStrictEqual({
974 tasks: {
975 executed: 0,
976 executing: maxMultiplier,
977 queued: 0,
978 maxQueued: 0,
979 stolen: 0,
980 failed: 0
981 },
982 runTime: {
983 history: expect.any(CircularArray)
984 },
985 waitTime: {
986 history: expect.any(CircularArray)
987 },
988 elu: {
989 idle: {
990 history: expect.any(CircularArray)
991 },
992 active: {
993 history: expect.any(CircularArray)
994 }
995 }
996 })
997 }
998 await Promise.all(promises)
999 for (const workerNode of pool.workerNodes) {
1000 expect(workerNode.usage).toStrictEqual({
1001 tasks: {
1002 executed: maxMultiplier,
1003 executing: 0,
1004 queued: 0,
1005 maxQueued: 0,
1006 stolen: 0,
1007 failed: 0
1008 },
1009 runTime: {
1010 history: expect.any(CircularArray)
1011 },
1012 waitTime: {
1013 history: expect.any(CircularArray)
1014 },
1015 elu: {
1016 idle: {
1017 history: expect.any(CircularArray)
1018 },
1019 active: {
1020 history: expect.any(CircularArray)
1021 }
1022 }
1023 })
1024 }
1025 await pool.destroy()
1026 })
1027
1028 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1029 const pool = new DynamicThreadPool(
1030 Math.floor(numberOfWorkers / 2),
1031 numberOfWorkers,
1032 './tests/worker-files/thread/testWorker.mjs'
1033 )
1034 const promises = new Set()
1035 const maxMultiplier = 2
1036 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1037 promises.add(pool.execute())
1038 }
1039 await Promise.all(promises)
1040 for (const workerNode of pool.workerNodes) {
1041 expect(workerNode.usage).toStrictEqual({
1042 tasks: {
1043 executed: expect.any(Number),
1044 executing: 0,
1045 queued: 0,
1046 maxQueued: 0,
1047 stolen: 0,
1048 failed: 0
1049 },
1050 runTime: {
1051 history: expect.any(CircularArray)
1052 },
1053 waitTime: {
1054 history: expect.any(CircularArray)
1055 },
1056 elu: {
1057 idle: {
1058 history: expect.any(CircularArray)
1059 },
1060 active: {
1061 history: expect.any(CircularArray)
1062 }
1063 }
1064 })
1065 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1066 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1067 numberOfWorkers * maxMultiplier
1068 )
1069 expect(workerNode.usage.runTime.history.length).toBe(0)
1070 expect(workerNode.usage.waitTime.history.length).toBe(0)
1071 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1072 expect(workerNode.usage.elu.active.history.length).toBe(0)
1073 }
1074 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1075 for (const workerNode of pool.workerNodes) {
1076 expect(workerNode.usage).toStrictEqual({
1077 tasks: {
1078 executed: 0,
1079 executing: 0,
1080 queued: 0,
1081 maxQueued: 0,
1082 stolen: 0,
1083 failed: 0
1084 },
1085 runTime: {
1086 history: expect.any(CircularArray)
1087 },
1088 waitTime: {
1089 history: expect.any(CircularArray)
1090 },
1091 elu: {
1092 idle: {
1093 history: expect.any(CircularArray)
1094 },
1095 active: {
1096 history: expect.any(CircularArray)
1097 }
1098 }
1099 })
1100 expect(workerNode.usage.runTime.history.length).toBe(0)
1101 expect(workerNode.usage.waitTime.history.length).toBe(0)
1102 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1103 expect(workerNode.usage.elu.active.history.length).toBe(0)
1104 }
1105 await pool.destroy()
1106 })
1107
1108 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1109 const pool = new DynamicClusterPool(
1110 Math.floor(numberOfWorkers / 2),
1111 numberOfWorkers,
1112 './tests/worker-files/cluster/testWorker.js'
1113 )
1114 expect(pool.emitter.eventNames()).toStrictEqual([])
1115 let poolInfo
1116 let poolReady = 0
1117 pool.emitter.on(PoolEvents.ready, info => {
1118 ++poolReady
1119 poolInfo = info
1120 })
1121 await waitPoolEvents(pool, PoolEvents.ready, 1)
1122 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1123 expect(poolReady).toBe(1)
1124 expect(poolInfo).toStrictEqual({
1125 version,
1126 type: PoolTypes.dynamic,
1127 worker: WorkerTypes.cluster,
1128 started: true,
1129 ready: true,
1130 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1131 minSize: expect.any(Number),
1132 maxSize: expect.any(Number),
1133 workerNodes: expect.any(Number),
1134 idleWorkerNodes: expect.any(Number),
1135 busyWorkerNodes: expect.any(Number),
1136 executedTasks: expect.any(Number),
1137 executingTasks: expect.any(Number),
1138 failedTasks: expect.any(Number)
1139 })
1140 await pool.destroy()
1141 })
1142
1143 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1144 const pool = new FixedThreadPool(
1145 numberOfWorkers,
1146 './tests/worker-files/thread/testWorker.mjs'
1147 )
1148 expect(pool.emitter.eventNames()).toStrictEqual([])
1149 const promises = new Set()
1150 let poolBusy = 0
1151 let poolInfo
1152 pool.emitter.on(PoolEvents.busy, info => {
1153 ++poolBusy
1154 poolInfo = info
1155 })
1156 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1157 for (let i = 0; i < numberOfWorkers * 2; i++) {
1158 promises.add(pool.execute())
1159 }
1160 await Promise.all(promises)
1161 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1162 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1163 expect(poolBusy).toBe(numberOfWorkers + 1)
1164 expect(poolInfo).toStrictEqual({
1165 version,
1166 type: PoolTypes.fixed,
1167 worker: WorkerTypes.thread,
1168 started: true,
1169 ready: true,
1170 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1171 minSize: expect.any(Number),
1172 maxSize: expect.any(Number),
1173 workerNodes: expect.any(Number),
1174 idleWorkerNodes: expect.any(Number),
1175 busyWorkerNodes: expect.any(Number),
1176 executedTasks: expect.any(Number),
1177 executingTasks: expect.any(Number),
1178 failedTasks: expect.any(Number)
1179 })
1180 await pool.destroy()
1181 })
1182
1183 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1184 const pool = new DynamicThreadPool(
1185 Math.floor(numberOfWorkers / 2),
1186 numberOfWorkers,
1187 './tests/worker-files/thread/testWorker.mjs'
1188 )
1189 expect(pool.emitter.eventNames()).toStrictEqual([])
1190 const promises = new Set()
1191 let poolFull = 0
1192 let poolInfo
1193 pool.emitter.on(PoolEvents.full, info => {
1194 ++poolFull
1195 poolInfo = info
1196 })
1197 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1198 for (let i = 0; i < numberOfWorkers * 2; i++) {
1199 promises.add(pool.execute())
1200 }
1201 await Promise.all(promises)
1202 expect(poolFull).toBe(1)
1203 expect(poolInfo).toStrictEqual({
1204 version,
1205 type: PoolTypes.dynamic,
1206 worker: WorkerTypes.thread,
1207 started: true,
1208 ready: true,
1209 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1210 minSize: expect.any(Number),
1211 maxSize: expect.any(Number),
1212 workerNodes: expect.any(Number),
1213 idleWorkerNodes: expect.any(Number),
1214 busyWorkerNodes: expect.any(Number),
1215 executedTasks: expect.any(Number),
1216 executingTasks: expect.any(Number),
1217 failedTasks: expect.any(Number)
1218 })
1219 await pool.destroy()
1220 })
1221
1222 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1223 const pool = new FixedThreadPool(
1224 numberOfWorkers,
1225 './tests/worker-files/thread/testWorker.mjs',
1226 {
1227 enableTasksQueue: true
1228 }
1229 )
1230 stub(pool, 'hasBackPressure').returns(true)
1231 expect(pool.emitter.eventNames()).toStrictEqual([])
1232 const promises = new Set()
1233 let poolBackPressure = 0
1234 let poolInfo
1235 pool.emitter.on(PoolEvents.backPressure, info => {
1236 ++poolBackPressure
1237 poolInfo = info
1238 })
1239 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1240 for (let i = 0; i < numberOfWorkers + 1; i++) {
1241 promises.add(pool.execute())
1242 }
1243 await Promise.all(promises)
1244 expect(poolBackPressure).toBe(1)
1245 expect(poolInfo).toStrictEqual({
1246 version,
1247 type: PoolTypes.fixed,
1248 worker: WorkerTypes.thread,
1249 started: true,
1250 ready: true,
1251 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1252 minSize: expect.any(Number),
1253 maxSize: expect.any(Number),
1254 workerNodes: expect.any(Number),
1255 idleWorkerNodes: expect.any(Number),
1256 busyWorkerNodes: expect.any(Number),
1257 executedTasks: expect.any(Number),
1258 executingTasks: expect.any(Number),
1259 maxQueuedTasks: expect.any(Number),
1260 queuedTasks: expect.any(Number),
1261 backPressure: true,
1262 stolenTasks: expect.any(Number),
1263 failedTasks: expect.any(Number)
1264 })
1265 expect(pool.hasBackPressure.called).toBe(true)
1266 await pool.destroy()
1267 })
1268
1269 it('Verify that hasTaskFunction() is working', async () => {
1270 const dynamicThreadPool = new DynamicThreadPool(
1271 Math.floor(numberOfWorkers / 2),
1272 numberOfWorkers,
1273 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1274 )
1275 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1276 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1278 true
1279 )
1280 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1281 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1282 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1283 await dynamicThreadPool.destroy()
1284 const fixedClusterPool = new FixedClusterPool(
1285 numberOfWorkers,
1286 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1287 )
1288 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1289 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1291 true
1292 )
1293 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1294 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1295 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1296 await fixedClusterPool.destroy()
1297 })
1298
1299 it('Verify that addTaskFunction() is working', async () => {
1300 const dynamicThreadPool = new DynamicThreadPool(
1301 Math.floor(numberOfWorkers / 2),
1302 numberOfWorkers,
1303 './tests/worker-files/thread/testWorker.mjs'
1304 )
1305 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1306 await expect(
1307 dynamicThreadPool.addTaskFunction(0, () => {})
1308 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1309 await expect(
1310 dynamicThreadPool.addTaskFunction('', () => {})
1311 ).rejects.toThrowError(
1312 new TypeError('name argument must not be an empty string')
1313 )
1314 await expect(
1315 dynamicThreadPool.addTaskFunction('test', 0)
1316 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1317 await expect(
1318 dynamicThreadPool.addTaskFunction('test', '')
1319 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1320 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1321 DEFAULT_TASK_NAME,
1322 'test'
1323 ])
1324 const echoTaskFunction = data => {
1325 return data
1326 }
1327 await expect(
1328 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1329 ).resolves.toBe(true)
1330 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1331 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1332 echoTaskFunction
1333 )
1334 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1335 DEFAULT_TASK_NAME,
1336 'test',
1337 'echo'
1338 ])
1339 const taskFunctionData = { test: 'test' }
1340 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1341 expect(echoResult).toStrictEqual(taskFunctionData)
1342 for (const workerNode of dynamicThreadPool.workerNodes) {
1343 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1344 tasks: {
1345 executed: expect.any(Number),
1346 executing: 0,
1347 queued: 0,
1348 stolen: 0,
1349 failed: 0
1350 },
1351 runTime: {
1352 history: new CircularArray()
1353 },
1354 waitTime: {
1355 history: new CircularArray()
1356 },
1357 elu: {
1358 idle: {
1359 history: new CircularArray()
1360 },
1361 active: {
1362 history: new CircularArray()
1363 }
1364 }
1365 })
1366 }
1367 await dynamicThreadPool.destroy()
1368 })
1369
1370 it('Verify that removeTaskFunction() is working', async () => {
1371 const dynamicThreadPool = new DynamicThreadPool(
1372 Math.floor(numberOfWorkers / 2),
1373 numberOfWorkers,
1374 './tests/worker-files/thread/testWorker.mjs'
1375 )
1376 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1377 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1378 DEFAULT_TASK_NAME,
1379 'test'
1380 ])
1381 await expect(
1382 dynamicThreadPool.removeTaskFunction('test')
1383 ).rejects.toThrowError(
1384 new Error('Cannot remove a task function not handled on the pool side')
1385 )
1386 const echoTaskFunction = data => {
1387 return data
1388 }
1389 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1390 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1391 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1392 echoTaskFunction
1393 )
1394 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1395 DEFAULT_TASK_NAME,
1396 'test',
1397 'echo'
1398 ])
1399 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1400 true
1401 )
1402 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1403 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1404 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1405 DEFAULT_TASK_NAME,
1406 'test'
1407 ])
1408 await dynamicThreadPool.destroy()
1409 })
1410
1411 it('Verify that listTaskFunctionNames() is working', async () => {
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1414 numberOfWorkers,
1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1416 )
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1418 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1419 DEFAULT_TASK_NAME,
1420 'jsonIntegerSerialization',
1421 'factorial',
1422 'fibonacci'
1423 ])
1424 await dynamicThreadPool.destroy()
1425 const fixedClusterPool = new FixedClusterPool(
1426 numberOfWorkers,
1427 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1428 )
1429 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1430 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1431 DEFAULT_TASK_NAME,
1432 'jsonIntegerSerialization',
1433 'factorial',
1434 'fibonacci'
1435 ])
1436 await fixedClusterPool.destroy()
1437 })
1438
1439 it('Verify that setDefaultTaskFunction() is working', async () => {
1440 const dynamicThreadPool = new DynamicThreadPool(
1441 Math.floor(numberOfWorkers / 2),
1442 numberOfWorkers,
1443 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1444 )
1445 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1446 await expect(
1447 dynamicThreadPool.setDefaultTaskFunction(0)
1448 ).rejects.toThrowError(
1449 new Error(
1450 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1451 )
1452 )
1453 await expect(
1454 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1455 ).rejects.toThrowError(
1456 new Error(
1457 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1458 )
1459 )
1460 await expect(
1461 dynamicThreadPool.setDefaultTaskFunction('unknown')
1462 ).rejects.toThrowError(
1463 new Error(
1464 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1465 )
1466 )
1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1468 DEFAULT_TASK_NAME,
1469 'jsonIntegerSerialization',
1470 'factorial',
1471 'fibonacci'
1472 ])
1473 await expect(
1474 dynamicThreadPool.setDefaultTaskFunction('factorial')
1475 ).resolves.toBe(true)
1476 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1477 DEFAULT_TASK_NAME,
1478 'factorial',
1479 'jsonIntegerSerialization',
1480 'fibonacci'
1481 ])
1482 await expect(
1483 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1484 ).resolves.toBe(true)
1485 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1486 DEFAULT_TASK_NAME,
1487 'fibonacci',
1488 'jsonIntegerSerialization',
1489 'factorial'
1490 ])
1491 })
1492
1493 it('Verify that multiple task functions worker is working', async () => {
1494 const pool = new DynamicClusterPool(
1495 Math.floor(numberOfWorkers / 2),
1496 numberOfWorkers,
1497 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1498 )
1499 const data = { n: 10 }
1500 const result0 = await pool.execute(data)
1501 expect(result0).toStrictEqual({ ok: 1 })
1502 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1503 expect(result1).toStrictEqual({ ok: 1 })
1504 const result2 = await pool.execute(data, 'factorial')
1505 expect(result2).toBe(3628800)
1506 const result3 = await pool.execute(data, 'fibonacci')
1507 expect(result3).toBe(55)
1508 expect(pool.info.executingTasks).toBe(0)
1509 expect(pool.info.executedTasks).toBe(4)
1510 for (const workerNode of pool.workerNodes) {
1511 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1512 DEFAULT_TASK_NAME,
1513 'jsonIntegerSerialization',
1514 'factorial',
1515 'fibonacci'
1516 ])
1517 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1518 for (const name of pool.listTaskFunctionNames()) {
1519 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1520 tasks: {
1521 executed: expect.any(Number),
1522 executing: 0,
1523 failed: 0,
1524 queued: 0,
1525 stolen: 0
1526 },
1527 runTime: {
1528 history: expect.any(CircularArray)
1529 },
1530 waitTime: {
1531 history: expect.any(CircularArray)
1532 },
1533 elu: {
1534 idle: {
1535 history: expect.any(CircularArray)
1536 },
1537 active: {
1538 history: expect.any(CircularArray)
1539 }
1540 }
1541 })
1542 expect(
1543 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1544 ).toBeGreaterThan(0)
1545 }
1546 expect(
1547 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1548 ).toStrictEqual(
1549 workerNode.getTaskFunctionWorkerUsage(
1550 workerNode.info.taskFunctionNames[1]
1551 )
1552 )
1553 }
1554 await pool.destroy()
1555 })
1556
1557 it('Verify sendKillMessageToWorker()', async () => {
1558 const pool = new DynamicClusterPool(
1559 Math.floor(numberOfWorkers / 2),
1560 numberOfWorkers,
1561 './tests/worker-files/cluster/testWorker.js'
1562 )
1563 const workerNodeKey = 0
1564 await expect(
1565 pool.sendKillMessageToWorker(workerNodeKey)
1566 ).resolves.toBeUndefined()
1567 await pool.destroy()
1568 })
1569
1570 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1571 const pool = new DynamicClusterPool(
1572 Math.floor(numberOfWorkers / 2),
1573 numberOfWorkers,
1574 './tests/worker-files/cluster/testWorker.js'
1575 )
1576 const workerNodeKey = 0
1577 await expect(
1578 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1579 taskFunctionOperation: 'add',
1580 taskFunctionName: 'empty',
1581 taskFunction: (() => {}).toString()
1582 })
1583 ).resolves.toBe(true)
1584 expect(
1585 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1586 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1587 await pool.destroy()
1588 })
1589
1590 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1591 const pool = new DynamicClusterPool(
1592 Math.floor(numberOfWorkers / 2),
1593 numberOfWorkers,
1594 './tests/worker-files/cluster/testWorker.js'
1595 )
1596 await expect(
1597 pool.sendTaskFunctionOperationToWorkers({
1598 taskFunctionOperation: 'add',
1599 taskFunctionName: 'empty',
1600 taskFunction: (() => {}).toString()
1601 })
1602 ).resolves.toBe(true)
1603 for (const workerNode of pool.workerNodes) {
1604 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1605 DEFAULT_TASK_NAME,
1606 'test',
1607 'empty'
1608 ])
1609 }
1610 await pool.destroy()
1611 })
1612 })