fix: fix task execution tracking with pool async resource
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new Error("Cannot find the worker file 'undefined'")
82 )
83 expect(
84 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
85 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
86 })
87
88 it('Verify that numberOfWorkers is checked', () => {
89 expect(
90 () =>
91 new FixedThreadPool(
92 undefined,
93 './tests/worker-files/thread/testWorker.mjs'
94 )
95 ).toThrow(
96 new Error(
97 'Cannot instantiate a pool without specifying the number of workers'
98 )
99 )
100 })
101
102 it('Verify that a negative number of workers is checked', () => {
103 expect(
104 () =>
105 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
106 ).toThrow(
107 new RangeError(
108 'Cannot instantiate a pool with a negative number of workers'
109 )
110 )
111 })
112
113 it('Verify that a non integer number of workers is checked', () => {
114 expect(
115 () =>
116 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
117 ).toThrow(
118 new TypeError(
119 'Cannot instantiate a pool with a non safe integer number of workers'
120 )
121 )
122 })
123
124 it('Verify that dynamic pool sizing is checked', () => {
125 expect(
126 () =>
127 new DynamicClusterPool(
128 1,
129 undefined,
130 './tests/worker-files/cluster/testWorker.js'
131 )
132 ).toThrow(
133 new TypeError(
134 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
135 )
136 )
137 expect(
138 () =>
139 new DynamicThreadPool(
140 0.5,
141 1,
142 './tests/worker-files/thread/testWorker.mjs'
143 )
144 ).toThrow(
145 new TypeError(
146 'Cannot instantiate a pool with a non safe integer number of workers'
147 )
148 )
149 expect(
150 () =>
151 new DynamicClusterPool(
152 0,
153 0.5,
154 './tests/worker-files/cluster/testWorker.js'
155 )
156 ).toThrow(
157 new TypeError(
158 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
159 )
160 )
161 expect(
162 () =>
163 new DynamicThreadPool(
164 2,
165 1,
166 './tests/worker-files/thread/testWorker.mjs'
167 )
168 ).toThrow(
169 new RangeError(
170 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
171 )
172 )
173 expect(
174 () =>
175 new DynamicThreadPool(
176 0,
177 0,
178 './tests/worker-files/thread/testWorker.mjs'
179 )
180 ).toThrow(
181 new RangeError(
182 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
183 )
184 )
185 expect(
186 () =>
187 new DynamicClusterPool(
188 1,
189 1,
190 './tests/worker-files/cluster/testWorker.js'
191 )
192 ).toThrow(
193 new RangeError(
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
195 )
196 )
197 })
198
199 it('Verify that pool options are checked', async () => {
200 let pool = new FixedThreadPool(
201 numberOfWorkers,
202 './tests/worker-files/thread/testWorker.mjs'
203 )
204 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
205 expect(pool.opts).toStrictEqual({
206 startWorkers: true,
207 enableEvents: true,
208 restartWorkerOnError: true,
209 enableTasksQueue: false,
210 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
211 workerChoiceStrategyOptions: {
212 retries: 6,
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
216 }
217 })
218 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
219 retries: 6,
220 runTime: { median: false },
221 waitTime: { median: false },
222 elu: { median: false }
223 })
224 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
225 .workerChoiceStrategies) {
226 expect(workerChoiceStrategy.opts).toStrictEqual({
227 retries: 6,
228 runTime: { median: false },
229 waitTime: { median: false },
230 elu: { median: false }
231 })
232 }
233 await pool.destroy()
234 const testHandler = () => console.info('test handler executed')
235 pool = new FixedThreadPool(
236 numberOfWorkers,
237 './tests/worker-files/thread/testWorker.mjs',
238 {
239 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
240 workerChoiceStrategyOptions: {
241 runTime: { median: true },
242 weights: { 0: 300, 1: 200 }
243 },
244 enableEvents: false,
245 restartWorkerOnError: false,
246 enableTasksQueue: true,
247 tasksQueueOptions: { concurrency: 2 },
248 messageHandler: testHandler,
249 errorHandler: testHandler,
250 onlineHandler: testHandler,
251 exitHandler: testHandler
252 }
253 )
254 expect(pool.emitter).toBeUndefined()
255 expect(pool.opts).toStrictEqual({
256 startWorkers: true,
257 enableEvents: false,
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
260 tasksQueueOptions: {
261 concurrency: 2,
262 size: Math.pow(numberOfWorkers, 2),
263 taskStealing: true,
264 tasksStealingOnBackPressure: true
265 },
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 retries: 6,
269 runTime: { median: true },
270 waitTime: { median: false },
271 elu: { median: false },
272 weights: { 0: 300, 1: 200 }
273 },
274 onlineHandler: testHandler,
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 exitHandler: testHandler
278 })
279 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
280 retries: 6,
281 runTime: { median: true },
282 waitTime: { median: false },
283 elu: { median: false },
284 weights: { 0: 300, 1: 200 }
285 })
286 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
287 .workerChoiceStrategies) {
288 expect(workerChoiceStrategy.opts).toStrictEqual({
289 retries: 6,
290 runTime: { median: true },
291 waitTime: { median: false },
292 elu: { median: false },
293 weights: { 0: 300, 1: 200 }
294 })
295 }
296 await pool.destroy()
297 })
298
299 it('Verify that pool options are validated', () => {
300 expect(
301 () =>
302 new FixedThreadPool(
303 numberOfWorkers,
304 './tests/worker-files/thread/testWorker.mjs',
305 {
306 workerChoiceStrategy: 'invalidStrategy'
307 }
308 )
309 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
310 expect(
311 () =>
312 new FixedThreadPool(
313 numberOfWorkers,
314 './tests/worker-files/thread/testWorker.mjs',
315 {
316 workerChoiceStrategyOptions: {
317 retries: 'invalidChoiceRetries'
318 }
319 }
320 )
321 ).toThrow(
322 new TypeError(
323 'Invalid worker choice strategy options: retries must be an integer'
324 )
325 )
326 expect(
327 () =>
328 new FixedThreadPool(
329 numberOfWorkers,
330 './tests/worker-files/thread/testWorker.mjs',
331 {
332 workerChoiceStrategyOptions: {
333 retries: -1
334 }
335 }
336 )
337 ).toThrow(
338 new RangeError(
339 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
340 )
341 )
342 expect(
343 () =>
344 new FixedThreadPool(
345 numberOfWorkers,
346 './tests/worker-files/thread/testWorker.mjs',
347 {
348 workerChoiceStrategyOptions: { weights: {} }
349 }
350 )
351 ).toThrow(
352 new Error(
353 'Invalid worker choice strategy options: must have a weight for each worker node'
354 )
355 )
356 expect(
357 () =>
358 new FixedThreadPool(
359 numberOfWorkers,
360 './tests/worker-files/thread/testWorker.mjs',
361 {
362 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
363 }
364 )
365 ).toThrow(
366 new Error(
367 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
368 )
369 )
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
374 './tests/worker-files/thread/testWorker.mjs',
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: 'invalidTasksQueueOptions'
378 }
379 )
380 ).toThrow(
381 new TypeError('Invalid tasks queue options: must be a plain object')
382 )
383 expect(
384 () =>
385 new FixedThreadPool(
386 numberOfWorkers,
387 './tests/worker-files/thread/testWorker.mjs',
388 {
389 enableTasksQueue: true,
390 tasksQueueOptions: { concurrency: 0 }
391 }
392 )
393 ).toThrow(
394 new RangeError(
395 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
396 )
397 )
398 expect(
399 () =>
400 new FixedThreadPool(
401 numberOfWorkers,
402 './tests/worker-files/thread/testWorker.mjs',
403 {
404 enableTasksQueue: true,
405 tasksQueueOptions: { concurrency: -1 }
406 }
407 )
408 ).toThrow(
409 new RangeError(
410 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
411 )
412 )
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
417 './tests/worker-files/thread/testWorker.mjs',
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { concurrency: 0.2 }
421 }
422 )
423 ).toThrow(
424 new TypeError('Invalid worker node tasks concurrency: must be an integer')
425 )
426 expect(
427 () =>
428 new FixedThreadPool(
429 numberOfWorkers,
430 './tests/worker-files/thread/testWorker.mjs',
431 {
432 enableTasksQueue: true,
433 tasksQueueOptions: { size: 0 }
434 }
435 )
436 ).toThrow(
437 new RangeError(
438 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
439 )
440 )
441 expect(
442 () =>
443 new FixedThreadPool(
444 numberOfWorkers,
445 './tests/worker-files/thread/testWorker.mjs',
446 {
447 enableTasksQueue: true,
448 tasksQueueOptions: { size: -1 }
449 }
450 )
451 ).toThrow(
452 new RangeError(
453 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
454 )
455 )
456 expect(
457 () =>
458 new FixedThreadPool(
459 numberOfWorkers,
460 './tests/worker-files/thread/testWorker.mjs',
461 {
462 enableTasksQueue: true,
463 tasksQueueOptions: { size: 0.2 }
464 }
465 )
466 ).toThrow(
467 new TypeError('Invalid worker node tasks queue size: must be an integer')
468 )
469 })
470
471 it('Verify that pool worker choice strategy options can be set', async () => {
472 const pool = new FixedThreadPool(
473 numberOfWorkers,
474 './tests/worker-files/thread/testWorker.mjs',
475 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
476 )
477 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
478 retries: 6,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false }
482 })
483 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
484 retries: 6,
485 runTime: { median: false },
486 waitTime: { median: false },
487 elu: { median: false }
488 })
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 retries: 6,
493 runTime: { median: false },
494 waitTime: { median: false },
495 elu: { median: false }
496 })
497 }
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
501 runTime: {
502 aggregate: true,
503 average: true,
504 median: false
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
511 elu: {
512 aggregate: true,
513 average: true,
514 median: false
515 }
516 })
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: true },
519 elu: { median: true }
520 })
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 retries: 6,
523 runTime: { median: true },
524 waitTime: { median: false },
525 elu: { median: true }
526 })
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
528 retries: 6,
529 runTime: { median: true },
530 waitTime: { median: false },
531 elu: { median: true }
532 })
533 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
534 .workerChoiceStrategies) {
535 expect(workerChoiceStrategy.opts).toStrictEqual({
536 retries: 6,
537 runTime: { median: true },
538 waitTime: { median: false },
539 elu: { median: true }
540 })
541 }
542 expect(
543 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
544 ).toStrictEqual({
545 runTime: {
546 aggregate: true,
547 average: false,
548 median: true
549 },
550 waitTime: {
551 aggregate: false,
552 average: false,
553 median: false
554 },
555 elu: {
556 aggregate: true,
557 average: false,
558 median: true
559 }
560 })
561 pool.setWorkerChoiceStrategyOptions({
562 runTime: { median: false },
563 elu: { median: false }
564 })
565 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
566 retries: 6,
567 runTime: { median: false },
568 waitTime: { median: false },
569 elu: { median: false }
570 })
571 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
572 retries: 6,
573 runTime: { median: false },
574 waitTime: { median: false },
575 elu: { median: false }
576 })
577 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
578 .workerChoiceStrategies) {
579 expect(workerChoiceStrategy.opts).toStrictEqual({
580 retries: 6,
581 runTime: { median: false },
582 waitTime: { median: false },
583 elu: { median: false }
584 })
585 }
586 expect(
587 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
588 ).toStrictEqual({
589 runTime: {
590 aggregate: true,
591 average: true,
592 median: false
593 },
594 waitTime: {
595 aggregate: false,
596 average: false,
597 median: false
598 },
599 elu: {
600 aggregate: true,
601 average: true,
602 median: false
603 }
604 })
605 expect(() =>
606 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
607 ).toThrow(
608 new TypeError(
609 'Invalid worker choice strategy options: must be a plain object'
610 )
611 )
612 expect(() =>
613 pool.setWorkerChoiceStrategyOptions({
614 retries: 'invalidChoiceRetries'
615 })
616 ).toThrow(
617 new TypeError(
618 'Invalid worker choice strategy options: retries must be an integer'
619 )
620 )
621 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
622 new RangeError(
623 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
624 )
625 )
626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
627 new Error(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
629 )
630 )
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
633 ).toThrow(
634 new Error(
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
636 )
637 )
638 await pool.destroy()
639 })
640
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
643 numberOfWorkers,
644 './tests/worker-files/thread/testWorker.mjs'
645 )
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 concurrency: 1,
652 size: Math.pow(numberOfWorkers, 2),
653 taskStealing: true,
654 tasksStealingOnBackPressure: true
655 })
656 pool.enableTasksQueue(true, { concurrency: 2 })
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 concurrency: 2,
660 size: Math.pow(numberOfWorkers, 2),
661 taskStealing: true,
662 tasksStealingOnBackPressure: true
663 })
664 pool.enableTasksQueue(false)
665 expect(pool.opts.enableTasksQueue).toBe(false)
666 expect(pool.opts.tasksQueueOptions).toBeUndefined()
667 await pool.destroy()
668 })
669
670 it('Verify that pool tasks queue options can be set', async () => {
671 const pool = new FixedThreadPool(
672 numberOfWorkers,
673 './tests/worker-files/thread/testWorker.mjs',
674 { enableTasksQueue: true }
675 )
676 expect(pool.opts.tasksQueueOptions).toStrictEqual({
677 concurrency: 1,
678 size: Math.pow(numberOfWorkers, 2),
679 taskStealing: true,
680 tasksStealingOnBackPressure: true
681 })
682 for (const workerNode of pool.workerNodes) {
683 expect(workerNode.tasksQueueBackPressureSize).toBe(
684 pool.opts.tasksQueueOptions.size
685 )
686 }
687 pool.setTasksQueueOptions({
688 concurrency: 2,
689 size: 2,
690 taskStealing: false,
691 tasksStealingOnBackPressure: false
692 })
693 expect(pool.opts.tasksQueueOptions).toStrictEqual({
694 concurrency: 2,
695 size: 2,
696 taskStealing: false,
697 tasksStealingOnBackPressure: false
698 })
699 for (const workerNode of pool.workerNodes) {
700 expect(workerNode.tasksQueueBackPressureSize).toBe(
701 pool.opts.tasksQueueOptions.size
702 )
703 }
704 pool.setTasksQueueOptions({
705 concurrency: 1,
706 taskStealing: true,
707 tasksStealingOnBackPressure: true
708 })
709 expect(pool.opts.tasksQueueOptions).toStrictEqual({
710 concurrency: 1,
711 size: Math.pow(numberOfWorkers, 2),
712 taskStealing: true,
713 tasksStealingOnBackPressure: true
714 })
715 for (const workerNode of pool.workerNodes) {
716 expect(workerNode.tasksQueueBackPressureSize).toBe(
717 pool.opts.tasksQueueOptions.size
718 )
719 }
720 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
721 new TypeError('Invalid tasks queue options: must be a plain object')
722 )
723 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
724 new RangeError(
725 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
726 )
727 )
728 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
729 new RangeError(
730 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
731 )
732 )
733 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
734 new TypeError('Invalid worker node tasks concurrency: must be an integer')
735 )
736 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
737 new RangeError(
738 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
739 )
740 )
741 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
742 new RangeError(
743 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
744 )
745 )
746 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
747 new TypeError('Invalid worker node tasks queue size: must be an integer')
748 )
749 await pool.destroy()
750 })
751
752 it('Verify that pool info is set', async () => {
753 let pool = new FixedThreadPool(
754 numberOfWorkers,
755 './tests/worker-files/thread/testWorker.mjs'
756 )
757 expect(pool.info).toStrictEqual({
758 version,
759 type: PoolTypes.fixed,
760 worker: WorkerTypes.thread,
761 started: true,
762 ready: true,
763 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
764 minSize: numberOfWorkers,
765 maxSize: numberOfWorkers,
766 workerNodes: numberOfWorkers,
767 idleWorkerNodes: numberOfWorkers,
768 busyWorkerNodes: 0,
769 executedTasks: 0,
770 executingTasks: 0,
771 failedTasks: 0
772 })
773 await pool.destroy()
774 pool = new DynamicClusterPool(
775 Math.floor(numberOfWorkers / 2),
776 numberOfWorkers,
777 './tests/worker-files/cluster/testWorker.js'
778 )
779 expect(pool.info).toStrictEqual({
780 version,
781 type: PoolTypes.dynamic,
782 worker: WorkerTypes.cluster,
783 started: true,
784 ready: true,
785 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
786 minSize: Math.floor(numberOfWorkers / 2),
787 maxSize: numberOfWorkers,
788 workerNodes: Math.floor(numberOfWorkers / 2),
789 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
790 busyWorkerNodes: 0,
791 executedTasks: 0,
792 executingTasks: 0,
793 failedTasks: 0
794 })
795 await pool.destroy()
796 })
797
798 it('Verify that pool worker tasks usage are initialized', async () => {
799 const pool = new FixedClusterPool(
800 numberOfWorkers,
801 './tests/worker-files/cluster/testWorker.js'
802 )
803 for (const workerNode of pool.workerNodes) {
804 expect(workerNode).toBeInstanceOf(WorkerNode)
805 expect(workerNode.usage).toStrictEqual({
806 tasks: {
807 executed: 0,
808 executing: 0,
809 queued: 0,
810 maxQueued: 0,
811 sequentiallyStolen: 0,
812 stolen: 0,
813 failed: 0
814 },
815 runTime: {
816 history: new CircularArray()
817 },
818 waitTime: {
819 history: new CircularArray()
820 },
821 elu: {
822 idle: {
823 history: new CircularArray()
824 },
825 active: {
826 history: new CircularArray()
827 }
828 }
829 })
830 }
831 await pool.destroy()
832 })
833
834 it('Verify that pool worker tasks queue are initialized', async () => {
835 let pool = new FixedClusterPool(
836 numberOfWorkers,
837 './tests/worker-files/cluster/testWorker.js'
838 )
839 for (const workerNode of pool.workerNodes) {
840 expect(workerNode).toBeInstanceOf(WorkerNode)
841 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
842 expect(workerNode.tasksQueue.size).toBe(0)
843 expect(workerNode.tasksQueue.maxSize).toBe(0)
844 }
845 await pool.destroy()
846 pool = new DynamicThreadPool(
847 Math.floor(numberOfWorkers / 2),
848 numberOfWorkers,
849 './tests/worker-files/thread/testWorker.mjs'
850 )
851 for (const workerNode of pool.workerNodes) {
852 expect(workerNode).toBeInstanceOf(WorkerNode)
853 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
854 expect(workerNode.tasksQueue.size).toBe(0)
855 expect(workerNode.tasksQueue.maxSize).toBe(0)
856 }
857 await pool.destroy()
858 })
859
860 it('Verify that pool worker info are initialized', async () => {
861 let pool = new FixedClusterPool(
862 numberOfWorkers,
863 './tests/worker-files/cluster/testWorker.js'
864 )
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.info).toStrictEqual({
868 id: expect.any(Number),
869 type: WorkerTypes.cluster,
870 dynamic: false,
871 ready: true
872 })
873 }
874 await pool.destroy()
875 pool = new DynamicThreadPool(
876 Math.floor(numberOfWorkers / 2),
877 numberOfWorkers,
878 './tests/worker-files/thread/testWorker.mjs'
879 )
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
882 expect(workerNode.info).toStrictEqual({
883 id: expect.any(Number),
884 type: WorkerTypes.thread,
885 dynamic: false,
886 ready: true
887 })
888 }
889 await pool.destroy()
890 })
891
892 it('Verify that pool statuses are checked at start or destroy', async () => {
893 const pool = new FixedThreadPool(
894 numberOfWorkers,
895 './tests/worker-files/thread/testWorker.mjs'
896 )
897 expect(pool.info.started).toBe(true)
898 expect(pool.info.ready).toBe(true)
899 expect(() => pool.start()).toThrow(
900 new Error('Cannot start an already started pool')
901 )
902 await pool.destroy()
903 expect(pool.info.started).toBe(false)
904 expect(pool.info.ready).toBe(false)
905 await expect(pool.destroy()).rejects.toThrow(
906 new Error('Cannot destroy an already destroyed pool')
907 )
908 })
909
910 it('Verify that pool can be started after initialization', async () => {
911 const pool = new FixedClusterPool(
912 numberOfWorkers,
913 './tests/worker-files/cluster/testWorker.js',
914 {
915 startWorkers: false
916 }
917 )
918 expect(pool.info.started).toBe(false)
919 expect(pool.info.ready).toBe(false)
920 expect(pool.readyEventEmitted).toBe(false)
921 expect(pool.workerNodes).toStrictEqual([])
922 await expect(pool.execute()).rejects.toThrow(
923 new Error('Cannot execute a task on not started pool')
924 )
925 pool.start()
926 expect(pool.info.started).toBe(true)
927 expect(pool.info.ready).toBe(true)
928 await waitPoolEvents(pool, PoolEvents.ready, 1)
929 expect(pool.readyEventEmitted).toBe(true)
930 expect(pool.workerNodes.length).toBe(numberOfWorkers)
931 for (const workerNode of pool.workerNodes) {
932 expect(workerNode).toBeInstanceOf(WorkerNode)
933 }
934 await pool.destroy()
935 })
936
937 it('Verify that pool execute() arguments are checked', async () => {
938 const pool = new FixedClusterPool(
939 numberOfWorkers,
940 './tests/worker-files/cluster/testWorker.js'
941 )
942 await expect(pool.execute(undefined, 0)).rejects.toThrow(
943 new TypeError('name argument must be a string')
944 )
945 await expect(pool.execute(undefined, '')).rejects.toThrow(
946 new TypeError('name argument must not be an empty string')
947 )
948 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
949 new TypeError('transferList argument must be an array')
950 )
951 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
952 "Task function 'unknown' not found"
953 )
954 await pool.destroy()
955 await expect(pool.execute()).rejects.toThrow(
956 new Error('Cannot execute a task on not started pool')
957 )
958 })
959
960 it('Verify that pool worker tasks usage are computed', async () => {
961 const pool = new FixedClusterPool(
962 numberOfWorkers,
963 './tests/worker-files/cluster/testWorker.js'
964 )
965 const promises = new Set()
966 const maxMultiplier = 2
967 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
968 promises.add(pool.execute())
969 }
970 for (const workerNode of pool.workerNodes) {
971 expect(workerNode.usage).toStrictEqual({
972 tasks: {
973 executed: 0,
974 executing: maxMultiplier,
975 queued: 0,
976 maxQueued: 0,
977 sequentiallyStolen: 0,
978 stolen: 0,
979 failed: 0
980 },
981 runTime: {
982 history: expect.any(CircularArray)
983 },
984 waitTime: {
985 history: expect.any(CircularArray)
986 },
987 elu: {
988 idle: {
989 history: expect.any(CircularArray)
990 },
991 active: {
992 history: expect.any(CircularArray)
993 }
994 }
995 })
996 }
997 await Promise.all(promises)
998 for (const workerNode of pool.workerNodes) {
999 expect(workerNode.usage).toStrictEqual({
1000 tasks: {
1001 executed: maxMultiplier,
1002 executing: 0,
1003 queued: 0,
1004 maxQueued: 0,
1005 sequentiallyStolen: 0,
1006 stolen: 0,
1007 failed: 0
1008 },
1009 runTime: {
1010 history: expect.any(CircularArray)
1011 },
1012 waitTime: {
1013 history: expect.any(CircularArray)
1014 },
1015 elu: {
1016 idle: {
1017 history: expect.any(CircularArray)
1018 },
1019 active: {
1020 history: expect.any(CircularArray)
1021 }
1022 }
1023 })
1024 }
1025 await pool.destroy()
1026 })
1027
1028 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1029 const pool = new DynamicThreadPool(
1030 Math.floor(numberOfWorkers / 2),
1031 numberOfWorkers,
1032 './tests/worker-files/thread/testWorker.mjs'
1033 )
1034 const promises = new Set()
1035 const maxMultiplier = 2
1036 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1037 promises.add(pool.execute())
1038 }
1039 await Promise.all(promises)
1040 for (const workerNode of pool.workerNodes) {
1041 expect(workerNode.usage).toStrictEqual({
1042 tasks: {
1043 executed: expect.any(Number),
1044 executing: 0,
1045 queued: 0,
1046 maxQueued: 0,
1047 sequentiallyStolen: 0,
1048 stolen: 0,
1049 failed: 0
1050 },
1051 runTime: {
1052 history: expect.any(CircularArray)
1053 },
1054 waitTime: {
1055 history: expect.any(CircularArray)
1056 },
1057 elu: {
1058 idle: {
1059 history: expect.any(CircularArray)
1060 },
1061 active: {
1062 history: expect.any(CircularArray)
1063 }
1064 }
1065 })
1066 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1067 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1068 numberOfWorkers * maxMultiplier
1069 )
1070 expect(workerNode.usage.runTime.history.length).toBe(0)
1071 expect(workerNode.usage.waitTime.history.length).toBe(0)
1072 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1073 expect(workerNode.usage.elu.active.history.length).toBe(0)
1074 }
1075 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1076 for (const workerNode of pool.workerNodes) {
1077 expect(workerNode.usage).toStrictEqual({
1078 tasks: {
1079 executed: 0,
1080 executing: 0,
1081 queued: 0,
1082 maxQueued: 0,
1083 sequentiallyStolen: 0,
1084 stolen: 0,
1085 failed: 0
1086 },
1087 runTime: {
1088 history: expect.any(CircularArray)
1089 },
1090 waitTime: {
1091 history: expect.any(CircularArray)
1092 },
1093 elu: {
1094 idle: {
1095 history: expect.any(CircularArray)
1096 },
1097 active: {
1098 history: expect.any(CircularArray)
1099 }
1100 }
1101 })
1102 expect(workerNode.usage.runTime.history.length).toBe(0)
1103 expect(workerNode.usage.waitTime.history.length).toBe(0)
1104 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1105 expect(workerNode.usage.elu.active.history.length).toBe(0)
1106 }
1107 await pool.destroy()
1108 })
1109
1110 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1111 const pool = new DynamicClusterPool(
1112 Math.floor(numberOfWorkers / 2),
1113 numberOfWorkers,
1114 './tests/worker-files/cluster/testWorker.js'
1115 )
1116 expect(pool.emitter.eventNames()).toStrictEqual([])
1117 let poolInfo
1118 let poolReady = 0
1119 pool.emitter.on(PoolEvents.ready, info => {
1120 ++poolReady
1121 poolInfo = info
1122 })
1123 await waitPoolEvents(pool, PoolEvents.ready, 1)
1124 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1125 expect(poolReady).toBe(1)
1126 expect(poolInfo).toStrictEqual({
1127 version,
1128 type: PoolTypes.dynamic,
1129 worker: WorkerTypes.cluster,
1130 started: true,
1131 ready: true,
1132 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1133 minSize: expect.any(Number),
1134 maxSize: expect.any(Number),
1135 workerNodes: expect.any(Number),
1136 idleWorkerNodes: expect.any(Number),
1137 busyWorkerNodes: expect.any(Number),
1138 executedTasks: expect.any(Number),
1139 executingTasks: expect.any(Number),
1140 failedTasks: expect.any(Number)
1141 })
1142 await pool.destroy()
1143 })
1144
1145 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1146 const pool = new FixedThreadPool(
1147 numberOfWorkers,
1148 './tests/worker-files/thread/testWorker.mjs'
1149 )
1150 expect(pool.emitter.eventNames()).toStrictEqual([])
1151 const promises = new Set()
1152 let poolBusy = 0
1153 let poolInfo
1154 pool.emitter.on(PoolEvents.busy, info => {
1155 ++poolBusy
1156 poolInfo = info
1157 })
1158 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1159 for (let i = 0; i < numberOfWorkers * 2; i++) {
1160 promises.add(pool.execute())
1161 }
1162 await Promise.all(promises)
1163 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1164 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1165 expect(poolBusy).toBe(numberOfWorkers + 1)
1166 expect(poolInfo).toStrictEqual({
1167 version,
1168 type: PoolTypes.fixed,
1169 worker: WorkerTypes.thread,
1170 started: true,
1171 ready: true,
1172 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1173 minSize: expect.any(Number),
1174 maxSize: expect.any(Number),
1175 workerNodes: expect.any(Number),
1176 idleWorkerNodes: expect.any(Number),
1177 busyWorkerNodes: expect.any(Number),
1178 executedTasks: expect.any(Number),
1179 executingTasks: expect.any(Number),
1180 failedTasks: expect.any(Number)
1181 })
1182 await pool.destroy()
1183 })
1184
1185 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1186 const pool = new DynamicThreadPool(
1187 Math.floor(numberOfWorkers / 2),
1188 numberOfWorkers,
1189 './tests/worker-files/thread/testWorker.mjs'
1190 )
1191 expect(pool.emitter.eventNames()).toStrictEqual([])
1192 const promises = new Set()
1193 let poolFull = 0
1194 let poolInfo
1195 pool.emitter.on(PoolEvents.full, info => {
1196 ++poolFull
1197 poolInfo = info
1198 })
1199 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1200 for (let i = 0; i < numberOfWorkers * 2; i++) {
1201 promises.add(pool.execute())
1202 }
1203 await Promise.all(promises)
1204 expect(poolFull).toBe(1)
1205 expect(poolInfo).toStrictEqual({
1206 version,
1207 type: PoolTypes.dynamic,
1208 worker: WorkerTypes.thread,
1209 started: true,
1210 ready: true,
1211 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 busyWorkerNodes: expect.any(Number),
1217 executedTasks: expect.any(Number),
1218 executingTasks: expect.any(Number),
1219 failedTasks: expect.any(Number)
1220 })
1221 await pool.destroy()
1222 })
1223
1224 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1225 const pool = new FixedThreadPool(
1226 numberOfWorkers,
1227 './tests/worker-files/thread/testWorker.mjs',
1228 {
1229 enableTasksQueue: true
1230 }
1231 )
1232 stub(pool, 'hasBackPressure').returns(true)
1233 expect(pool.emitter.eventNames()).toStrictEqual([])
1234 const promises = new Set()
1235 let poolBackPressure = 0
1236 let poolInfo
1237 pool.emitter.on(PoolEvents.backPressure, info => {
1238 ++poolBackPressure
1239 poolInfo = info
1240 })
1241 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1242 for (let i = 0; i < numberOfWorkers + 1; i++) {
1243 promises.add(pool.execute())
1244 }
1245 await Promise.all(promises)
1246 expect(poolBackPressure).toBe(1)
1247 expect(poolInfo).toStrictEqual({
1248 version,
1249 type: PoolTypes.fixed,
1250 worker: WorkerTypes.thread,
1251 started: true,
1252 ready: true,
1253 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1254 minSize: expect.any(Number),
1255 maxSize: expect.any(Number),
1256 workerNodes: expect.any(Number),
1257 idleWorkerNodes: expect.any(Number),
1258 busyWorkerNodes: expect.any(Number),
1259 executedTasks: expect.any(Number),
1260 executingTasks: expect.any(Number),
1261 maxQueuedTasks: expect.any(Number),
1262 queuedTasks: expect.any(Number),
1263 backPressure: true,
1264 stolenTasks: expect.any(Number),
1265 failedTasks: expect.any(Number)
1266 })
1267 expect(pool.hasBackPressure.called).toBe(true)
1268 await pool.destroy()
1269 })
1270
1271 it('Verify that pool asynchronous resource track tasks execution', async () => {
1272 let taskAsyncId
1273 let initCalls = 0
1274 let beforeCalls = 0
1275 let afterCalls = 0
1276 let resolveCalls = 0
1277 const hook = createHook({
1278 init (asyncId, type) {
1279 if (type === 'poolifier:task') {
1280 initCalls++
1281 taskAsyncId = asyncId
1282 }
1283 },
1284 before (asyncId) {
1285 if (asyncId === taskAsyncId) beforeCalls++
1286 },
1287 after (asyncId) {
1288 if (asyncId === taskAsyncId) afterCalls++
1289 },
1290 promiseResolve () {
1291 if (executionAsyncId() === taskAsyncId) resolveCalls++
1292 }
1293 })
1294 hook.enable()
1295 const pool = new FixedThreadPool(
1296 numberOfWorkers,
1297 './tests/worker-files/thread/testWorker.mjs'
1298 )
1299 await pool.execute()
1300 hook.disable()
1301 expect(initCalls).toBe(1)
1302 expect(beforeCalls).toBe(1)
1303 expect(afterCalls).toBe(1)
1304 expect(resolveCalls).toBe(1)
1305 })
1306
1307 it('Verify that hasTaskFunction() is working', async () => {
1308 const dynamicThreadPool = new DynamicThreadPool(
1309 Math.floor(numberOfWorkers / 2),
1310 numberOfWorkers,
1311 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1312 )
1313 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1314 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1315 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1316 true
1317 )
1318 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1320 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1321 await dynamicThreadPool.destroy()
1322 const fixedClusterPool = new FixedClusterPool(
1323 numberOfWorkers,
1324 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1325 )
1326 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1327 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1328 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1329 true
1330 )
1331 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1333 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1334 await fixedClusterPool.destroy()
1335 })
1336
1337 it('Verify that addTaskFunction() is working', async () => {
1338 const dynamicThreadPool = new DynamicThreadPool(
1339 Math.floor(numberOfWorkers / 2),
1340 numberOfWorkers,
1341 './tests/worker-files/thread/testWorker.mjs'
1342 )
1343 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1344 await expect(
1345 dynamicThreadPool.addTaskFunction(0, () => {})
1346 ).rejects.toThrow(new TypeError('name argument must be a string'))
1347 await expect(
1348 dynamicThreadPool.addTaskFunction('', () => {})
1349 ).rejects.toThrow(
1350 new TypeError('name argument must not be an empty string')
1351 )
1352 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1353 new TypeError('fn argument must be a function')
1354 )
1355 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1356 new TypeError('fn argument must be a function')
1357 )
1358 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1359 DEFAULT_TASK_NAME,
1360 'test'
1361 ])
1362 const echoTaskFunction = data => {
1363 return data
1364 }
1365 await expect(
1366 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1367 ).resolves.toBe(true)
1368 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1369 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1370 echoTaskFunction
1371 )
1372 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1373 DEFAULT_TASK_NAME,
1374 'test',
1375 'echo'
1376 ])
1377 const taskFunctionData = { test: 'test' }
1378 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1379 expect(echoResult).toStrictEqual(taskFunctionData)
1380 for (const workerNode of dynamicThreadPool.workerNodes) {
1381 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1382 tasks: {
1383 executed: expect.any(Number),
1384 executing: 0,
1385 queued: 0,
1386 sequentiallyStolen: 0,
1387 stolen: 0,
1388 failed: 0
1389 },
1390 runTime: {
1391 history: new CircularArray()
1392 },
1393 waitTime: {
1394 history: new CircularArray()
1395 },
1396 elu: {
1397 idle: {
1398 history: new CircularArray()
1399 },
1400 active: {
1401 history: new CircularArray()
1402 }
1403 }
1404 })
1405 }
1406 await dynamicThreadPool.destroy()
1407 })
1408
1409 it('Verify that removeTaskFunction() is working', async () => {
1410 const dynamicThreadPool = new DynamicThreadPool(
1411 Math.floor(numberOfWorkers / 2),
1412 numberOfWorkers,
1413 './tests/worker-files/thread/testWorker.mjs'
1414 )
1415 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1416 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1417 DEFAULT_TASK_NAME,
1418 'test'
1419 ])
1420 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1421 new Error('Cannot remove a task function not handled on the pool side')
1422 )
1423 const echoTaskFunction = data => {
1424 return data
1425 }
1426 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1427 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1428 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1429 echoTaskFunction
1430 )
1431 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1432 DEFAULT_TASK_NAME,
1433 'test',
1434 'echo'
1435 ])
1436 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1437 true
1438 )
1439 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1440 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1441 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1442 DEFAULT_TASK_NAME,
1443 'test'
1444 ])
1445 await dynamicThreadPool.destroy()
1446 })
1447
1448 it('Verify that listTaskFunctionNames() is working', async () => {
1449 const dynamicThreadPool = new DynamicThreadPool(
1450 Math.floor(numberOfWorkers / 2),
1451 numberOfWorkers,
1452 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1453 )
1454 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1456 DEFAULT_TASK_NAME,
1457 'jsonIntegerSerialization',
1458 'factorial',
1459 'fibonacci'
1460 ])
1461 await dynamicThreadPool.destroy()
1462 const fixedClusterPool = new FixedClusterPool(
1463 numberOfWorkers,
1464 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1465 )
1466 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1467 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1468 DEFAULT_TASK_NAME,
1469 'jsonIntegerSerialization',
1470 'factorial',
1471 'fibonacci'
1472 ])
1473 await fixedClusterPool.destroy()
1474 })
1475
1476 it('Verify that setDefaultTaskFunction() is working', async () => {
1477 const dynamicThreadPool = new DynamicThreadPool(
1478 Math.floor(numberOfWorkers / 2),
1479 numberOfWorkers,
1480 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1481 )
1482 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1483 const workerId = dynamicThreadPool.workerNodes[0].info.id
1484 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1485 new Error(
1486 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1487 )
1488 )
1489 await expect(
1490 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1491 ).rejects.toThrow(
1492 new Error(
1493 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1494 )
1495 )
1496 await expect(
1497 dynamicThreadPool.setDefaultTaskFunction('unknown')
1498 ).rejects.toThrow(
1499 new Error(
1500 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1501 )
1502 )
1503 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1504 DEFAULT_TASK_NAME,
1505 'jsonIntegerSerialization',
1506 'factorial',
1507 'fibonacci'
1508 ])
1509 await expect(
1510 dynamicThreadPool.setDefaultTaskFunction('factorial')
1511 ).resolves.toBe(true)
1512 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1513 DEFAULT_TASK_NAME,
1514 'factorial',
1515 'jsonIntegerSerialization',
1516 'fibonacci'
1517 ])
1518 await expect(
1519 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1520 ).resolves.toBe(true)
1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1522 DEFAULT_TASK_NAME,
1523 'fibonacci',
1524 'jsonIntegerSerialization',
1525 'factorial'
1526 ])
1527 await dynamicThreadPool.destroy()
1528 })
1529
1530 it('Verify that multiple task functions worker is working', async () => {
1531 const pool = new DynamicClusterPool(
1532 Math.floor(numberOfWorkers / 2),
1533 numberOfWorkers,
1534 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1535 )
1536 const data = { n: 10 }
1537 const result0 = await pool.execute(data)
1538 expect(result0).toStrictEqual({ ok: 1 })
1539 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1540 expect(result1).toStrictEqual({ ok: 1 })
1541 const result2 = await pool.execute(data, 'factorial')
1542 expect(result2).toBe(3628800)
1543 const result3 = await pool.execute(data, 'fibonacci')
1544 expect(result3).toBe(55)
1545 expect(pool.info.executingTasks).toBe(0)
1546 expect(pool.info.executedTasks).toBe(4)
1547 for (const workerNode of pool.workerNodes) {
1548 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1549 DEFAULT_TASK_NAME,
1550 'jsonIntegerSerialization',
1551 'factorial',
1552 'fibonacci'
1553 ])
1554 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1555 for (const name of pool.listTaskFunctionNames()) {
1556 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1557 tasks: {
1558 executed: expect.any(Number),
1559 executing: 0,
1560 failed: 0,
1561 queued: 0,
1562 sequentiallyStolen: 0,
1563 stolen: 0
1564 },
1565 runTime: {
1566 history: expect.any(CircularArray)
1567 },
1568 waitTime: {
1569 history: expect.any(CircularArray)
1570 },
1571 elu: {
1572 idle: {
1573 history: expect.any(CircularArray)
1574 },
1575 active: {
1576 history: expect.any(CircularArray)
1577 }
1578 }
1579 })
1580 expect(
1581 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1582 ).toBeGreaterThan(0)
1583 }
1584 expect(
1585 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1586 ).toStrictEqual(
1587 workerNode.getTaskFunctionWorkerUsage(
1588 workerNode.info.taskFunctionNames[1]
1589 )
1590 )
1591 }
1592 await pool.destroy()
1593 })
1594
1595 it('Verify sendKillMessageToWorker()', async () => {
1596 const pool = new DynamicClusterPool(
1597 Math.floor(numberOfWorkers / 2),
1598 numberOfWorkers,
1599 './tests/worker-files/cluster/testWorker.js'
1600 )
1601 const workerNodeKey = 0
1602 await expect(
1603 pool.sendKillMessageToWorker(workerNodeKey)
1604 ).resolves.toBeUndefined()
1605 await pool.destroy()
1606 })
1607
1608 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1609 const pool = new DynamicClusterPool(
1610 Math.floor(numberOfWorkers / 2),
1611 numberOfWorkers,
1612 './tests/worker-files/cluster/testWorker.js'
1613 )
1614 const workerNodeKey = 0
1615 await expect(
1616 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1617 taskFunctionOperation: 'add',
1618 taskFunctionName: 'empty',
1619 taskFunction: (() => {}).toString()
1620 })
1621 ).resolves.toBe(true)
1622 expect(
1623 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1624 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1625 await pool.destroy()
1626 })
1627
1628 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1629 const pool = new DynamicClusterPool(
1630 Math.floor(numberOfWorkers / 2),
1631 numberOfWorkers,
1632 './tests/worker-files/cluster/testWorker.js'
1633 )
1634 await expect(
1635 pool.sendTaskFunctionOperationToWorkers({
1636 taskFunctionOperation: 'add',
1637 taskFunctionName: 'empty',
1638 taskFunction: (() => {}).toString()
1639 })
1640 ).resolves.toBe(true)
1641 for (const workerNode of pool.workerNodes) {
1642 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1643 DEFAULT_TASK_NAME,
1644 'test',
1645 'empty'
1646 ])
1647 }
1648 await pool.destroy()
1649 })
1650 })