refactor: move worker setup into worker node constructor
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that dynamic pool sizing is checked', () => {
128 expect(
129 () =>
130 new DynamicClusterPool(
131 1,
132 undefined,
133 './tests/worker-files/cluster/testWorker.js'
134 )
135 ).toThrow(
136 new TypeError(
137 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
138 )
139 )
140 expect(
141 () =>
142 new DynamicThreadPool(
143 0.5,
144 1,
145 './tests/worker-files/thread/testWorker.mjs'
146 )
147 ).toThrow(
148 new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 )
152 expect(
153 () =>
154 new DynamicClusterPool(
155 0,
156 0.5,
157 './tests/worker-files/cluster/testWorker.js'
158 )
159 ).toThrow(
160 new TypeError(
161 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
162 )
163 )
164 expect(
165 () =>
166 new DynamicThreadPool(
167 2,
168 1,
169 './tests/worker-files/thread/testWorker.mjs'
170 )
171 ).toThrow(
172 new RangeError(
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 )
175 )
176 expect(
177 () =>
178 new DynamicThreadPool(
179 0,
180 0,
181 './tests/worker-files/thread/testWorker.mjs'
182 )
183 ).toThrow(
184 new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
186 )
187 )
188 expect(
189 () =>
190 new DynamicClusterPool(
191 1,
192 1,
193 './tests/worker-files/cluster/testWorker.js'
194 )
195 ).toThrow(
196 new RangeError(
197 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
198 )
199 )
200 })
201
202 it('Verify that pool options are checked', async () => {
203 let pool = new FixedThreadPool(
204 numberOfWorkers,
205 './tests/worker-files/thread/testWorker.mjs'
206 )
207 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
208 expect(pool.opts).toStrictEqual({
209 startWorkers: true,
210 enableEvents: true,
211 restartWorkerOnError: true,
212 enableTasksQueue: false,
213 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
214 workerChoiceStrategyOptions: {
215 retries: 6,
216 runTime: { median: false },
217 waitTime: { median: false },
218 elu: { median: false }
219 }
220 })
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
222 retries: 6,
223 runTime: { median: false },
224 waitTime: { median: false },
225 elu: { median: false }
226 })
227 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
228 .workerChoiceStrategies) {
229 expect(workerChoiceStrategy.opts).toStrictEqual({
230 retries: 6,
231 runTime: { median: false },
232 waitTime: { median: false },
233 elu: { median: false }
234 })
235 }
236 await pool.destroy()
237 const testHandler = () => console.info('test handler executed')
238 pool = new FixedThreadPool(
239 numberOfWorkers,
240 './tests/worker-files/thread/testWorker.mjs',
241 {
242 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
243 workerChoiceStrategyOptions: {
244 runTime: { median: true },
245 weights: { 0: 300, 1: 200 }
246 },
247 enableEvents: false,
248 restartWorkerOnError: false,
249 enableTasksQueue: true,
250 tasksQueueOptions: { concurrency: 2 },
251 messageHandler: testHandler,
252 errorHandler: testHandler,
253 onlineHandler: testHandler,
254 exitHandler: testHandler
255 }
256 )
257 expect(pool.emitter).toBeUndefined()
258 expect(pool.opts).toStrictEqual({
259 startWorkers: true,
260 enableEvents: false,
261 restartWorkerOnError: false,
262 enableTasksQueue: true,
263 tasksQueueOptions: {
264 concurrency: 2,
265 size: Math.pow(numberOfWorkers, 2),
266 taskStealing: true,
267 tasksStealingOnBackPressure: true
268 },
269 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
270 workerChoiceStrategyOptions: {
271 retries: 6,
272 runTime: { median: true },
273 waitTime: { median: false },
274 elu: { median: false },
275 weights: { 0: 300, 1: 200 }
276 },
277 onlineHandler: testHandler,
278 messageHandler: testHandler,
279 errorHandler: testHandler,
280 exitHandler: testHandler
281 })
282 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
283 retries: 6,
284 runTime: { median: true },
285 waitTime: { median: false },
286 elu: { median: false },
287 weights: { 0: 300, 1: 200 }
288 })
289 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
290 .workerChoiceStrategies) {
291 expect(workerChoiceStrategy.opts).toStrictEqual({
292 retries: 6,
293 runTime: { median: true },
294 waitTime: { median: false },
295 elu: { median: false },
296 weights: { 0: 300, 1: 200 }
297 })
298 }
299 await pool.destroy()
300 })
301
302 it('Verify that pool options are validated', () => {
303 expect(
304 () =>
305 new FixedThreadPool(
306 numberOfWorkers,
307 './tests/worker-files/thread/testWorker.mjs',
308 {
309 workerChoiceStrategy: 'invalidStrategy'
310 }
311 )
312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.mjs',
318 {
319 workerChoiceStrategyOptions: {
320 retries: 'invalidChoiceRetries'
321 }
322 }
323 )
324 ).toThrow(
325 new TypeError(
326 'Invalid worker choice strategy options: retries must be an integer'
327 )
328 )
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.mjs',
334 {
335 workerChoiceStrategyOptions: {
336 retries: -1
337 }
338 }
339 )
340 ).toThrow(
341 new RangeError(
342 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
343 )
344 )
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
349 './tests/worker-files/thread/testWorker.mjs',
350 {
351 workerChoiceStrategyOptions: { weights: {} }
352 }
353 )
354 ).toThrow(
355 new Error(
356 'Invalid worker choice strategy options: must have a weight for each worker node'
357 )
358 )
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
363 './tests/worker-files/thread/testWorker.mjs',
364 {
365 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
366 }
367 )
368 ).toThrow(
369 new Error(
370 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 )
372 )
373 expect(
374 () =>
375 new FixedThreadPool(
376 numberOfWorkers,
377 './tests/worker-files/thread/testWorker.mjs',
378 {
379 enableTasksQueue: true,
380 tasksQueueOptions: 'invalidTasksQueueOptions'
381 }
382 )
383 ).toThrow(
384 new TypeError('Invalid tasks queue options: must be a plain object')
385 )
386 expect(
387 () =>
388 new FixedThreadPool(
389 numberOfWorkers,
390 './tests/worker-files/thread/testWorker.mjs',
391 {
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: 0 }
394 }
395 )
396 ).toThrow(
397 new RangeError(
398 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
399 )
400 )
401 expect(
402 () =>
403 new FixedThreadPool(
404 numberOfWorkers,
405 './tests/worker-files/thread/testWorker.mjs',
406 {
407 enableTasksQueue: true,
408 tasksQueueOptions: { concurrency: -1 }
409 }
410 )
411 ).toThrow(
412 new RangeError(
413 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 )
415 )
416 expect(
417 () =>
418 new FixedThreadPool(
419 numberOfWorkers,
420 './tests/worker-files/thread/testWorker.mjs',
421 {
422 enableTasksQueue: true,
423 tasksQueueOptions: { concurrency: 0.2 }
424 }
425 )
426 ).toThrow(
427 new TypeError('Invalid worker node tasks concurrency: must be an integer')
428 )
429 expect(
430 () =>
431 new FixedThreadPool(
432 numberOfWorkers,
433 './tests/worker-files/thread/testWorker.mjs',
434 {
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: 0 }
437 }
438 )
439 ).toThrow(
440 new RangeError(
441 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 )
443 )
444 expect(
445 () =>
446 new FixedThreadPool(
447 numberOfWorkers,
448 './tests/worker-files/thread/testWorker.mjs',
449 {
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: -1 }
452 }
453 )
454 ).toThrow(
455 new RangeError(
456 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 )
458 )
459 expect(
460 () =>
461 new FixedThreadPool(
462 numberOfWorkers,
463 './tests/worker-files/thread/testWorker.mjs',
464 {
465 enableTasksQueue: true,
466 tasksQueueOptions: { size: 0.2 }
467 }
468 )
469 ).toThrow(
470 new TypeError('Invalid worker node tasks queue size: must be an integer')
471 )
472 })
473
474 it('Verify that pool worker choice strategy options can be set', async () => {
475 const pool = new FixedThreadPool(
476 numberOfWorkers,
477 './tests/worker-files/thread/testWorker.mjs',
478 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
479 )
480 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
481 retries: 6,
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
485 })
486 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
487 retries: 6,
488 runTime: { median: false },
489 waitTime: { median: false },
490 elu: { median: false }
491 })
492 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
493 .workerChoiceStrategies) {
494 expect(workerChoiceStrategy.opts).toStrictEqual({
495 retries: 6,
496 runTime: { median: false },
497 waitTime: { median: false },
498 elu: { median: false }
499 })
500 }
501 expect(
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
503 ).toStrictEqual({
504 runTime: {
505 aggregate: true,
506 average: true,
507 median: false
508 },
509 waitTime: {
510 aggregate: false,
511 average: false,
512 median: false
513 },
514 elu: {
515 aggregate: true,
516 average: true,
517 median: false
518 }
519 })
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: true },
522 elu: { median: true }
523 })
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 retries: 6,
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
529 })
530 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
531 retries: 6,
532 runTime: { median: true },
533 waitTime: { median: false },
534 elu: { median: true }
535 })
536 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
537 .workerChoiceStrategies) {
538 expect(workerChoiceStrategy.opts).toStrictEqual({
539 retries: 6,
540 runTime: { median: true },
541 waitTime: { median: false },
542 elu: { median: true }
543 })
544 }
545 expect(
546 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
547 ).toStrictEqual({
548 runTime: {
549 aggregate: true,
550 average: false,
551 median: true
552 },
553 waitTime: {
554 aggregate: false,
555 average: false,
556 median: false
557 },
558 elu: {
559 aggregate: true,
560 average: false,
561 median: true
562 }
563 })
564 pool.setWorkerChoiceStrategyOptions({
565 runTime: { median: false },
566 elu: { median: false }
567 })
568 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
569 retries: 6,
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
573 })
574 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
575 retries: 6,
576 runTime: { median: false },
577 waitTime: { median: false },
578 elu: { median: false }
579 })
580 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
581 .workerChoiceStrategies) {
582 expect(workerChoiceStrategy.opts).toStrictEqual({
583 retries: 6,
584 runTime: { median: false },
585 waitTime: { median: false },
586 elu: { median: false }
587 })
588 }
589 expect(
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
591 ).toStrictEqual({
592 runTime: {
593 aggregate: true,
594 average: true,
595 median: false
596 },
597 waitTime: {
598 aggregate: false,
599 average: false,
600 median: false
601 },
602 elu: {
603 aggregate: true,
604 average: true,
605 median: false
606 }
607 })
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
610 ).toThrow(
611 new TypeError(
612 'Invalid worker choice strategy options: must be a plain object'
613 )
614 )
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({
617 retries: 'invalidChoiceRetries'
618 })
619 ).toThrow(
620 new TypeError(
621 'Invalid worker choice strategy options: retries must be an integer'
622 )
623 )
624 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
625 new RangeError(
626 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
627 )
628 )
629 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
630 new Error(
631 'Invalid worker choice strategy options: must have a weight for each worker node'
632 )
633 )
634 expect(() =>
635 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
636 ).toThrow(
637 new Error(
638 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
639 )
640 )
641 await pool.destroy()
642 })
643
644 it('Verify that pool tasks queue can be enabled/disabled', async () => {
645 const pool = new FixedThreadPool(
646 numberOfWorkers,
647 './tests/worker-files/thread/testWorker.mjs'
648 )
649 expect(pool.opts.enableTasksQueue).toBe(false)
650 expect(pool.opts.tasksQueueOptions).toBeUndefined()
651 pool.enableTasksQueue(true)
652 expect(pool.opts.enableTasksQueue).toBe(true)
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
654 concurrency: 1,
655 size: Math.pow(numberOfWorkers, 2),
656 taskStealing: true,
657 tasksStealingOnBackPressure: true
658 })
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 2,
663 size: Math.pow(numberOfWorkers, 2),
664 taskStealing: true,
665 tasksStealingOnBackPressure: true
666 })
667 pool.enableTasksQueue(false)
668 expect(pool.opts.enableTasksQueue).toBe(false)
669 expect(pool.opts.tasksQueueOptions).toBeUndefined()
670 await pool.destroy()
671 })
672
673 it('Verify that pool tasks queue options can be set', async () => {
674 const pool = new FixedThreadPool(
675 numberOfWorkers,
676 './tests/worker-files/thread/testWorker.mjs',
677 { enableTasksQueue: true }
678 )
679 expect(pool.opts.tasksQueueOptions).toStrictEqual({
680 concurrency: 1,
681 size: Math.pow(numberOfWorkers, 2),
682 taskStealing: true,
683 tasksStealingOnBackPressure: true
684 })
685 for (const workerNode of pool.workerNodes) {
686 expect(workerNode.tasksQueueBackPressureSize).toBe(
687 pool.opts.tasksQueueOptions.size
688 )
689 }
690 pool.setTasksQueueOptions({
691 concurrency: 2,
692 size: 2,
693 taskStealing: false,
694 tasksStealingOnBackPressure: false
695 })
696 expect(pool.opts.tasksQueueOptions).toStrictEqual({
697 concurrency: 2,
698 size: 2,
699 taskStealing: false,
700 tasksStealingOnBackPressure: false
701 })
702 for (const workerNode of pool.workerNodes) {
703 expect(workerNode.tasksQueueBackPressureSize).toBe(
704 pool.opts.tasksQueueOptions.size
705 )
706 }
707 pool.setTasksQueueOptions({
708 concurrency: 1,
709 taskStealing: true,
710 tasksStealingOnBackPressure: true
711 })
712 expect(pool.opts.tasksQueueOptions).toStrictEqual({
713 concurrency: 1,
714 size: Math.pow(numberOfWorkers, 2),
715 taskStealing: true,
716 tasksStealingOnBackPressure: true
717 })
718 for (const workerNode of pool.workerNodes) {
719 expect(workerNode.tasksQueueBackPressureSize).toBe(
720 pool.opts.tasksQueueOptions.size
721 )
722 }
723 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
724 new TypeError('Invalid tasks queue options: must be a plain object')
725 )
726 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
727 new RangeError(
728 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
729 )
730 )
731 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
732 new RangeError(
733 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
734 )
735 )
736 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
737 new TypeError('Invalid worker node tasks concurrency: must be an integer')
738 )
739 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
740 new RangeError(
741 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
742 )
743 )
744 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
745 new RangeError(
746 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
747 )
748 )
749 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
750 new TypeError('Invalid worker node tasks queue size: must be an integer')
751 )
752 await pool.destroy()
753 })
754
755 it('Verify that pool info is set', async () => {
756 let pool = new FixedThreadPool(
757 numberOfWorkers,
758 './tests/worker-files/thread/testWorker.mjs'
759 )
760 expect(pool.info).toStrictEqual({
761 version,
762 type: PoolTypes.fixed,
763 worker: WorkerTypes.thread,
764 started: true,
765 ready: true,
766 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
767 minSize: numberOfWorkers,
768 maxSize: numberOfWorkers,
769 workerNodes: numberOfWorkers,
770 idleWorkerNodes: numberOfWorkers,
771 busyWorkerNodes: 0,
772 executedTasks: 0,
773 executingTasks: 0,
774 failedTasks: 0
775 })
776 await pool.destroy()
777 pool = new DynamicClusterPool(
778 Math.floor(numberOfWorkers / 2),
779 numberOfWorkers,
780 './tests/worker-files/cluster/testWorker.js'
781 )
782 expect(pool.info).toStrictEqual({
783 version,
784 type: PoolTypes.dynamic,
785 worker: WorkerTypes.cluster,
786 started: true,
787 ready: true,
788 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
789 minSize: Math.floor(numberOfWorkers / 2),
790 maxSize: numberOfWorkers,
791 workerNodes: Math.floor(numberOfWorkers / 2),
792 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
793 busyWorkerNodes: 0,
794 executedTasks: 0,
795 executingTasks: 0,
796 failedTasks: 0
797 })
798 await pool.destroy()
799 })
800
801 it('Verify that pool worker tasks usage are initialized', async () => {
802 const pool = new FixedClusterPool(
803 numberOfWorkers,
804 './tests/worker-files/cluster/testWorker.js'
805 )
806 for (const workerNode of pool.workerNodes) {
807 expect(workerNode).toBeInstanceOf(WorkerNode)
808 expect(workerNode.usage).toStrictEqual({
809 tasks: {
810 executed: 0,
811 executing: 0,
812 queued: 0,
813 maxQueued: 0,
814 sequentiallyStolen: 0,
815 stolen: 0,
816 failed: 0
817 },
818 runTime: {
819 history: new CircularArray()
820 },
821 waitTime: {
822 history: new CircularArray()
823 },
824 elu: {
825 idle: {
826 history: new CircularArray()
827 },
828 active: {
829 history: new CircularArray()
830 }
831 }
832 })
833 }
834 await pool.destroy()
835 })
836
837 it('Verify that pool worker tasks queue are initialized', async () => {
838 let pool = new FixedClusterPool(
839 numberOfWorkers,
840 './tests/worker-files/cluster/testWorker.js'
841 )
842 for (const workerNode of pool.workerNodes) {
843 expect(workerNode).toBeInstanceOf(WorkerNode)
844 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
845 expect(workerNode.tasksQueue.size).toBe(0)
846 expect(workerNode.tasksQueue.maxSize).toBe(0)
847 }
848 await pool.destroy()
849 pool = new DynamicThreadPool(
850 Math.floor(numberOfWorkers / 2),
851 numberOfWorkers,
852 './tests/worker-files/thread/testWorker.mjs'
853 )
854 for (const workerNode of pool.workerNodes) {
855 expect(workerNode).toBeInstanceOf(WorkerNode)
856 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
857 expect(workerNode.tasksQueue.size).toBe(0)
858 expect(workerNode.tasksQueue.maxSize).toBe(0)
859 }
860 await pool.destroy()
861 })
862
863 it('Verify that pool worker info are initialized', async () => {
864 let pool = new FixedClusterPool(
865 numberOfWorkers,
866 './tests/worker-files/cluster/testWorker.js'
867 )
868 for (const workerNode of pool.workerNodes) {
869 expect(workerNode).toBeInstanceOf(WorkerNode)
870 expect(workerNode.info).toStrictEqual({
871 id: expect.any(Number),
872 type: WorkerTypes.cluster,
873 dynamic: false,
874 ready: true
875 })
876 }
877 await pool.destroy()
878 pool = new DynamicThreadPool(
879 Math.floor(numberOfWorkers / 2),
880 numberOfWorkers,
881 './tests/worker-files/thread/testWorker.mjs'
882 )
883 for (const workerNode of pool.workerNodes) {
884 expect(workerNode).toBeInstanceOf(WorkerNode)
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.thread,
888 dynamic: false,
889 ready: true
890 })
891 }
892 await pool.destroy()
893 })
894
895 it('Verify that pool statuses are checked at start or destroy', async () => {
896 const pool = new FixedThreadPool(
897 numberOfWorkers,
898 './tests/worker-files/thread/testWorker.mjs'
899 )
900 expect(pool.info.started).toBe(true)
901 expect(pool.info.ready).toBe(true)
902 expect(() => pool.start()).toThrow(
903 new Error('Cannot start an already started pool')
904 )
905 await pool.destroy()
906 expect(pool.info.started).toBe(false)
907 expect(pool.info.ready).toBe(false)
908 await expect(pool.destroy()).rejects.toThrow(
909 new Error('Cannot destroy an already destroyed pool')
910 )
911 })
912
913 it('Verify that pool can be started after initialization', async () => {
914 const pool = new FixedClusterPool(
915 numberOfWorkers,
916 './tests/worker-files/cluster/testWorker.js',
917 {
918 startWorkers: false
919 }
920 )
921 expect(pool.info.started).toBe(false)
922 expect(pool.info.ready).toBe(false)
923 expect(pool.readyEventEmitted).toBe(false)
924 expect(pool.workerNodes).toStrictEqual([])
925 await expect(pool.execute()).rejects.toThrow(
926 new Error('Cannot execute a task on not started pool')
927 )
928 pool.start()
929 expect(pool.info.started).toBe(true)
930 expect(pool.info.ready).toBe(true)
931 await waitPoolEvents(pool, PoolEvents.ready, 1)
932 expect(pool.readyEventEmitted).toBe(true)
933 expect(pool.workerNodes.length).toBe(numberOfWorkers)
934 for (const workerNode of pool.workerNodes) {
935 expect(workerNode).toBeInstanceOf(WorkerNode)
936 }
937 await pool.destroy()
938 })
939
940 it('Verify that pool execute() arguments are checked', async () => {
941 const pool = new FixedClusterPool(
942 numberOfWorkers,
943 './tests/worker-files/cluster/testWorker.js'
944 )
945 await expect(pool.execute(undefined, 0)).rejects.toThrow(
946 new TypeError('name argument must be a string')
947 )
948 await expect(pool.execute(undefined, '')).rejects.toThrow(
949 new TypeError('name argument must not be an empty string')
950 )
951 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
952 new TypeError('transferList argument must be an array')
953 )
954 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
955 "Task function 'unknown' not found"
956 )
957 await pool.destroy()
958 await expect(pool.execute()).rejects.toThrow(
959 new Error('Cannot execute a task on not started pool')
960 )
961 })
962
963 it('Verify that pool worker tasks usage are computed', async () => {
964 const pool = new FixedClusterPool(
965 numberOfWorkers,
966 './tests/worker-files/cluster/testWorker.js'
967 )
968 const promises = new Set()
969 const maxMultiplier = 2
970 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
971 promises.add(pool.execute())
972 }
973 for (const workerNode of pool.workerNodes) {
974 expect(workerNode.usage).toStrictEqual({
975 tasks: {
976 executed: 0,
977 executing: maxMultiplier,
978 queued: 0,
979 maxQueued: 0,
980 sequentiallyStolen: 0,
981 stolen: 0,
982 failed: 0
983 },
984 runTime: {
985 history: expect.any(CircularArray)
986 },
987 waitTime: {
988 history: expect.any(CircularArray)
989 },
990 elu: {
991 idle: {
992 history: expect.any(CircularArray)
993 },
994 active: {
995 history: expect.any(CircularArray)
996 }
997 }
998 })
999 }
1000 await Promise.all(promises)
1001 for (const workerNode of pool.workerNodes) {
1002 expect(workerNode.usage).toStrictEqual({
1003 tasks: {
1004 executed: maxMultiplier,
1005 executing: 0,
1006 queued: 0,
1007 maxQueued: 0,
1008 sequentiallyStolen: 0,
1009 stolen: 0,
1010 failed: 0
1011 },
1012 runTime: {
1013 history: expect.any(CircularArray)
1014 },
1015 waitTime: {
1016 history: expect.any(CircularArray)
1017 },
1018 elu: {
1019 idle: {
1020 history: expect.any(CircularArray)
1021 },
1022 active: {
1023 history: expect.any(CircularArray)
1024 }
1025 }
1026 })
1027 }
1028 await pool.destroy()
1029 })
1030
1031 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1032 const pool = new DynamicThreadPool(
1033 Math.floor(numberOfWorkers / 2),
1034 numberOfWorkers,
1035 './tests/worker-files/thread/testWorker.mjs'
1036 )
1037 const promises = new Set()
1038 const maxMultiplier = 2
1039 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1040 promises.add(pool.execute())
1041 }
1042 await Promise.all(promises)
1043 for (const workerNode of pool.workerNodes) {
1044 expect(workerNode.usage).toStrictEqual({
1045 tasks: {
1046 executed: expect.any(Number),
1047 executing: 0,
1048 queued: 0,
1049 maxQueued: 0,
1050 sequentiallyStolen: 0,
1051 stolen: 0,
1052 failed: 0
1053 },
1054 runTime: {
1055 history: expect.any(CircularArray)
1056 },
1057 waitTime: {
1058 history: expect.any(CircularArray)
1059 },
1060 elu: {
1061 idle: {
1062 history: expect.any(CircularArray)
1063 },
1064 active: {
1065 history: expect.any(CircularArray)
1066 }
1067 }
1068 })
1069 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1070 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1071 numberOfWorkers * maxMultiplier
1072 )
1073 expect(workerNode.usage.runTime.history.length).toBe(0)
1074 expect(workerNode.usage.waitTime.history.length).toBe(0)
1075 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1076 expect(workerNode.usage.elu.active.history.length).toBe(0)
1077 }
1078 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1079 for (const workerNode of pool.workerNodes) {
1080 expect(workerNode.usage).toStrictEqual({
1081 tasks: {
1082 executed: 0,
1083 executing: 0,
1084 queued: 0,
1085 maxQueued: 0,
1086 sequentiallyStolen: 0,
1087 stolen: 0,
1088 failed: 0
1089 },
1090 runTime: {
1091 history: expect.any(CircularArray)
1092 },
1093 waitTime: {
1094 history: expect.any(CircularArray)
1095 },
1096 elu: {
1097 idle: {
1098 history: expect.any(CircularArray)
1099 },
1100 active: {
1101 history: expect.any(CircularArray)
1102 }
1103 }
1104 })
1105 expect(workerNode.usage.runTime.history.length).toBe(0)
1106 expect(workerNode.usage.waitTime.history.length).toBe(0)
1107 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1108 expect(workerNode.usage.elu.active.history.length).toBe(0)
1109 }
1110 await pool.destroy()
1111 })
1112
1113 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1114 const pool = new DynamicClusterPool(
1115 Math.floor(numberOfWorkers / 2),
1116 numberOfWorkers,
1117 './tests/worker-files/cluster/testWorker.js'
1118 )
1119 expect(pool.emitter.eventNames()).toStrictEqual([])
1120 let poolInfo
1121 let poolReady = 0
1122 pool.emitter.on(PoolEvents.ready, info => {
1123 ++poolReady
1124 poolInfo = info
1125 })
1126 await waitPoolEvents(pool, PoolEvents.ready, 1)
1127 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1128 expect(poolReady).toBe(1)
1129 expect(poolInfo).toStrictEqual({
1130 version,
1131 type: PoolTypes.dynamic,
1132 worker: WorkerTypes.cluster,
1133 started: true,
1134 ready: true,
1135 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 minSize: expect.any(Number),
1137 maxSize: expect.any(Number),
1138 workerNodes: expect.any(Number),
1139 idleWorkerNodes: expect.any(Number),
1140 busyWorkerNodes: expect.any(Number),
1141 executedTasks: expect.any(Number),
1142 executingTasks: expect.any(Number),
1143 failedTasks: expect.any(Number)
1144 })
1145 await pool.destroy()
1146 })
1147
1148 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1149 const pool = new FixedThreadPool(
1150 numberOfWorkers,
1151 './tests/worker-files/thread/testWorker.mjs'
1152 )
1153 expect(pool.emitter.eventNames()).toStrictEqual([])
1154 const promises = new Set()
1155 let poolBusy = 0
1156 let poolInfo
1157 pool.emitter.on(PoolEvents.busy, info => {
1158 ++poolBusy
1159 poolInfo = info
1160 })
1161 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1162 for (let i = 0; i < numberOfWorkers * 2; i++) {
1163 promises.add(pool.execute())
1164 }
1165 await Promise.all(promises)
1166 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1167 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1168 expect(poolBusy).toBe(numberOfWorkers + 1)
1169 expect(poolInfo).toStrictEqual({
1170 version,
1171 type: PoolTypes.fixed,
1172 worker: WorkerTypes.thread,
1173 started: true,
1174 ready: true,
1175 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1176 minSize: expect.any(Number),
1177 maxSize: expect.any(Number),
1178 workerNodes: expect.any(Number),
1179 idleWorkerNodes: expect.any(Number),
1180 busyWorkerNodes: expect.any(Number),
1181 executedTasks: expect.any(Number),
1182 executingTasks: expect.any(Number),
1183 failedTasks: expect.any(Number)
1184 })
1185 await pool.destroy()
1186 })
1187
1188 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1189 const pool = new DynamicThreadPool(
1190 Math.floor(numberOfWorkers / 2),
1191 numberOfWorkers,
1192 './tests/worker-files/thread/testWorker.mjs'
1193 )
1194 expect(pool.emitter.eventNames()).toStrictEqual([])
1195 const promises = new Set()
1196 let poolFull = 0
1197 let poolInfo
1198 pool.emitter.on(PoolEvents.full, info => {
1199 ++poolFull
1200 poolInfo = info
1201 })
1202 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1203 for (let i = 0; i < numberOfWorkers * 2; i++) {
1204 promises.add(pool.execute())
1205 }
1206 await Promise.all(promises)
1207 expect(poolFull).toBe(1)
1208 expect(poolInfo).toStrictEqual({
1209 version,
1210 type: PoolTypes.dynamic,
1211 worker: WorkerTypes.thread,
1212 started: true,
1213 ready: true,
1214 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1215 minSize: expect.any(Number),
1216 maxSize: expect.any(Number),
1217 workerNodes: expect.any(Number),
1218 idleWorkerNodes: expect.any(Number),
1219 busyWorkerNodes: expect.any(Number),
1220 executedTasks: expect.any(Number),
1221 executingTasks: expect.any(Number),
1222 failedTasks: expect.any(Number)
1223 })
1224 await pool.destroy()
1225 })
1226
1227 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1228 const pool = new FixedThreadPool(
1229 numberOfWorkers,
1230 './tests/worker-files/thread/testWorker.mjs',
1231 {
1232 enableTasksQueue: true
1233 }
1234 )
1235 stub(pool, 'hasBackPressure').returns(true)
1236 expect(pool.emitter.eventNames()).toStrictEqual([])
1237 const promises = new Set()
1238 let poolBackPressure = 0
1239 let poolInfo
1240 pool.emitter.on(PoolEvents.backPressure, info => {
1241 ++poolBackPressure
1242 poolInfo = info
1243 })
1244 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1245 for (let i = 0; i < numberOfWorkers + 1; i++) {
1246 promises.add(pool.execute())
1247 }
1248 await Promise.all(promises)
1249 expect(poolBackPressure).toBe(1)
1250 expect(poolInfo).toStrictEqual({
1251 version,
1252 type: PoolTypes.fixed,
1253 worker: WorkerTypes.thread,
1254 started: true,
1255 ready: true,
1256 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1257 minSize: expect.any(Number),
1258 maxSize: expect.any(Number),
1259 workerNodes: expect.any(Number),
1260 idleWorkerNodes: expect.any(Number),
1261 busyWorkerNodes: expect.any(Number),
1262 executedTasks: expect.any(Number),
1263 executingTasks: expect.any(Number),
1264 maxQueuedTasks: expect.any(Number),
1265 queuedTasks: expect.any(Number),
1266 backPressure: true,
1267 stolenTasks: expect.any(Number),
1268 failedTasks: expect.any(Number)
1269 })
1270 expect(pool.hasBackPressure.called).toBe(true)
1271 await pool.destroy()
1272 })
1273
1274 it('Verify that pool asynchronous resource track tasks execution', async () => {
1275 let taskAsyncId
1276 let initCalls = 0
1277 let beforeCalls = 0
1278 let afterCalls = 0
1279 let resolveCalls = 0
1280 const hook = createHook({
1281 init (asyncId, type) {
1282 if (type === 'poolifier:task') {
1283 initCalls++
1284 taskAsyncId = asyncId
1285 }
1286 },
1287 before (asyncId) {
1288 if (asyncId === taskAsyncId) beforeCalls++
1289 },
1290 after (asyncId) {
1291 if (asyncId === taskAsyncId) afterCalls++
1292 },
1293 promiseResolve () {
1294 if (executionAsyncId() === taskAsyncId) resolveCalls++
1295 }
1296 })
1297 const pool = new FixedThreadPool(
1298 numberOfWorkers,
1299 './tests/worker-files/thread/testWorker.mjs'
1300 )
1301 hook.enable()
1302 await pool.execute()
1303 hook.disable()
1304 expect(initCalls).toBe(1)
1305 expect(beforeCalls).toBe(1)
1306 expect(afterCalls).toBe(1)
1307 expect(resolveCalls).toBe(1)
1308 await pool.destroy()
1309 })
1310
1311 it('Verify that hasTaskFunction() is working', async () => {
1312 const dynamicThreadPool = new DynamicThreadPool(
1313 Math.floor(numberOfWorkers / 2),
1314 numberOfWorkers,
1315 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1316 )
1317 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1318 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1320 true
1321 )
1322 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1324 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1325 await dynamicThreadPool.destroy()
1326 const fixedClusterPool = new FixedClusterPool(
1327 numberOfWorkers,
1328 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1329 )
1330 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1331 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1333 true
1334 )
1335 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1338 await fixedClusterPool.destroy()
1339 })
1340
1341 it('Verify that addTaskFunction() is working', async () => {
1342 const dynamicThreadPool = new DynamicThreadPool(
1343 Math.floor(numberOfWorkers / 2),
1344 numberOfWorkers,
1345 './tests/worker-files/thread/testWorker.mjs'
1346 )
1347 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1348 await expect(
1349 dynamicThreadPool.addTaskFunction(0, () => {})
1350 ).rejects.toThrow(new TypeError('name argument must be a string'))
1351 await expect(
1352 dynamicThreadPool.addTaskFunction('', () => {})
1353 ).rejects.toThrow(
1354 new TypeError('name argument must not be an empty string')
1355 )
1356 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1358 )
1359 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1360 new TypeError('fn argument must be a function')
1361 )
1362 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1363 DEFAULT_TASK_NAME,
1364 'test'
1365 ])
1366 const echoTaskFunction = data => {
1367 return data
1368 }
1369 await expect(
1370 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1371 ).resolves.toBe(true)
1372 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1373 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1374 echoTaskFunction
1375 )
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 DEFAULT_TASK_NAME,
1378 'test',
1379 'echo'
1380 ])
1381 const taskFunctionData = { test: 'test' }
1382 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1383 expect(echoResult).toStrictEqual(taskFunctionData)
1384 for (const workerNode of dynamicThreadPool.workerNodes) {
1385 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1386 tasks: {
1387 executed: expect.any(Number),
1388 executing: 0,
1389 queued: 0,
1390 sequentiallyStolen: 0,
1391 stolen: 0,
1392 failed: 0
1393 },
1394 runTime: {
1395 history: new CircularArray()
1396 },
1397 waitTime: {
1398 history: new CircularArray()
1399 },
1400 elu: {
1401 idle: {
1402 history: new CircularArray()
1403 },
1404 active: {
1405 history: new CircularArray()
1406 }
1407 }
1408 })
1409 }
1410 await dynamicThreadPool.destroy()
1411 })
1412
1413 it('Verify that removeTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1416 numberOfWorkers,
1417 './tests/worker-files/thread/testWorker.mjs'
1418 )
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1420 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1421 DEFAULT_TASK_NAME,
1422 'test'
1423 ])
1424 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1425 new Error('Cannot remove a task function not handled on the pool side')
1426 )
1427 const echoTaskFunction = data => {
1428 return data
1429 }
1430 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1432 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1433 echoTaskFunction
1434 )
1435 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1436 DEFAULT_TASK_NAME,
1437 'test',
1438 'echo'
1439 ])
1440 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1441 true
1442 )
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1445 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1446 DEFAULT_TASK_NAME,
1447 'test'
1448 ])
1449 await dynamicThreadPool.destroy()
1450 })
1451
1452 it('Verify that listTaskFunctionNames() is working', async () => {
1453 const dynamicThreadPool = new DynamicThreadPool(
1454 Math.floor(numberOfWorkers / 2),
1455 numberOfWorkers,
1456 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1457 )
1458 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 DEFAULT_TASK_NAME,
1461 'jsonIntegerSerialization',
1462 'factorial',
1463 'fibonacci'
1464 ])
1465 await dynamicThreadPool.destroy()
1466 const fixedClusterPool = new FixedClusterPool(
1467 numberOfWorkers,
1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1469 )
1470 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1471 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1472 DEFAULT_TASK_NAME,
1473 'jsonIntegerSerialization',
1474 'factorial',
1475 'fibonacci'
1476 ])
1477 await fixedClusterPool.destroy()
1478 })
1479
1480 it('Verify that setDefaultTaskFunction() is working', async () => {
1481 const dynamicThreadPool = new DynamicThreadPool(
1482 Math.floor(numberOfWorkers / 2),
1483 numberOfWorkers,
1484 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1485 )
1486 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1487 const workerId = dynamicThreadPool.workerNodes[0].info.id
1488 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1489 new Error(
1490 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1491 )
1492 )
1493 await expect(
1494 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1495 ).rejects.toThrow(
1496 new Error(
1497 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1498 )
1499 )
1500 await expect(
1501 dynamicThreadPool.setDefaultTaskFunction('unknown')
1502 ).rejects.toThrow(
1503 new Error(
1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1505 )
1506 )
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 DEFAULT_TASK_NAME,
1509 'jsonIntegerSerialization',
1510 'factorial',
1511 'fibonacci'
1512 ])
1513 await expect(
1514 dynamicThreadPool.setDefaultTaskFunction('factorial')
1515 ).resolves.toBe(true)
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 DEFAULT_TASK_NAME,
1518 'factorial',
1519 'jsonIntegerSerialization',
1520 'fibonacci'
1521 ])
1522 await expect(
1523 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1526 DEFAULT_TASK_NAME,
1527 'fibonacci',
1528 'jsonIntegerSerialization',
1529 'factorial'
1530 ])
1531 await dynamicThreadPool.destroy()
1532 })
1533
1534 it('Verify that multiple task functions worker is working', async () => {
1535 const pool = new DynamicClusterPool(
1536 Math.floor(numberOfWorkers / 2),
1537 numberOfWorkers,
1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1539 )
1540 const data = { n: 10 }
1541 const result0 = await pool.execute(data)
1542 expect(result0).toStrictEqual({ ok: 1 })
1543 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1544 expect(result1).toStrictEqual({ ok: 1 })
1545 const result2 = await pool.execute(data, 'factorial')
1546 expect(result2).toBe(3628800)
1547 const result3 = await pool.execute(data, 'fibonacci')
1548 expect(result3).toBe(55)
1549 expect(pool.info.executingTasks).toBe(0)
1550 expect(pool.info.executedTasks).toBe(4)
1551 for (const workerNode of pool.workerNodes) {
1552 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1553 DEFAULT_TASK_NAME,
1554 'jsonIntegerSerialization',
1555 'factorial',
1556 'fibonacci'
1557 ])
1558 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1559 for (const name of pool.listTaskFunctionNames()) {
1560 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1561 tasks: {
1562 executed: expect.any(Number),
1563 executing: 0,
1564 failed: 0,
1565 queued: 0,
1566 sequentiallyStolen: 0,
1567 stolen: 0
1568 },
1569 runTime: {
1570 history: expect.any(CircularArray)
1571 },
1572 waitTime: {
1573 history: expect.any(CircularArray)
1574 },
1575 elu: {
1576 idle: {
1577 history: expect.any(CircularArray)
1578 },
1579 active: {
1580 history: expect.any(CircularArray)
1581 }
1582 }
1583 })
1584 expect(
1585 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1586 ).toBeGreaterThan(0)
1587 }
1588 expect(
1589 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1590 ).toStrictEqual(
1591 workerNode.getTaskFunctionWorkerUsage(
1592 workerNode.info.taskFunctionNames[1]
1593 )
1594 )
1595 }
1596 await pool.destroy()
1597 })
1598
1599 it('Verify sendKillMessageToWorker()', async () => {
1600 const pool = new DynamicClusterPool(
1601 Math.floor(numberOfWorkers / 2),
1602 numberOfWorkers,
1603 './tests/worker-files/cluster/testWorker.js'
1604 )
1605 const workerNodeKey = 0
1606 await expect(
1607 pool.sendKillMessageToWorker(workerNodeKey)
1608 ).resolves.toBeUndefined()
1609 await pool.destroy()
1610 })
1611
1612 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1613 const pool = new DynamicClusterPool(
1614 Math.floor(numberOfWorkers / 2),
1615 numberOfWorkers,
1616 './tests/worker-files/cluster/testWorker.js'
1617 )
1618 const workerNodeKey = 0
1619 await expect(
1620 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1621 taskFunctionOperation: 'add',
1622 taskFunctionName: 'empty',
1623 taskFunction: (() => {}).toString()
1624 })
1625 ).resolves.toBe(true)
1626 expect(
1627 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1628 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1629 await pool.destroy()
1630 })
1631
1632 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1633 const pool = new DynamicClusterPool(
1634 Math.floor(numberOfWorkers / 2),
1635 numberOfWorkers,
1636 './tests/worker-files/cluster/testWorker.js'
1637 )
1638 await expect(
1639 pool.sendTaskFunctionOperationToWorkers({
1640 taskFunctionOperation: 'add',
1641 taskFunctionName: 'empty',
1642 taskFunction: (() => {}).toString()
1643 })
1644 ).resolves.toBe(true)
1645 for (const workerNode of pool.workerNodes) {
1646 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1647 DEFAULT_TASK_NAME,
1648 'test',
1649 'empty'
1650 ])
1651 }
1652 await pool.destroy()
1653 })
1654 })