fix: ensure the number of worker choice retries is enough for WRR
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.js'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries: pool.info.maxSize,
233 runTime: { median: false },
234 waitTime: { median: false },
235 elu: { median: false }
236 })
237 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
238 .workerChoiceStrategies) {
239 expect(workerChoiceStrategy.opts).toStrictEqual({
240 retries: pool.info.maxSize,
241 runTime: { median: false },
242 waitTime: { median: false },
243 elu: { median: false }
244 })
245 }
246 await pool.destroy()
247 const testHandler = () => console.info('test handler executed')
248 pool = new FixedThreadPool(
249 numberOfWorkers,
250 './tests/worker-files/thread/testWorker.mjs',
251 {
252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
253 workerChoiceStrategyOptions: {
254 runTime: { median: true },
255 weights: { 0: 300, 1: 200 }
256 },
257 enableEvents: false,
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
260 tasksQueueOptions: { concurrency: 2 },
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
265 }
266 )
267 expect(pool.emitter).toBeUndefined()
268 expect(pool.opts).toStrictEqual({
269 startWorkers: true,
270 enableEvents: false,
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
273 tasksQueueOptions: {
274 concurrency: 2,
275 size: Math.pow(numberOfWorkers, 2),
276 taskStealing: true,
277 tasksStealingOnBackPressure: true,
278 tasksFinishedTimeout: 2000
279 },
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
282 runTime: { median: true },
283 weights: { 0: 300, 1: 200 }
284 },
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
289 })
290 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
291 retries:
292 pool.info.maxSize +
293 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
298 })
299 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
300 .workerChoiceStrategies) {
301 expect(workerChoiceStrategy.opts).toStrictEqual({
302 retries:
303 pool.info.maxSize +
304 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
305 runTime: { median: true },
306 waitTime: { median: false },
307 elu: { median: false },
308 weights: { 0: 300, 1: 200 }
309 })
310 }
311 await pool.destroy()
312 })
313
314 it('Verify that pool options are validated', () => {
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
319 './tests/worker-files/thread/testWorker.mjs',
320 {
321 workerChoiceStrategy: 'invalidStrategy'
322 }
323 )
324 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
329 './tests/worker-files/thread/testWorker.mjs',
330 {
331 workerChoiceStrategyOptions: { weights: {} }
332 }
333 )
334 ).toThrow(
335 new Error(
336 'Invalid worker choice strategy options: must have a weight for each worker node'
337 )
338 )
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.mjs',
344 {
345 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
346 }
347 )
348 ).toThrow(
349 new Error(
350 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
351 )
352 )
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
357 './tests/worker-files/thread/testWorker.mjs',
358 {
359 enableTasksQueue: true,
360 tasksQueueOptions: 'invalidTasksQueueOptions'
361 }
362 )
363 ).toThrow(
364 new TypeError('Invalid tasks queue options: must be a plain object')
365 )
366 expect(
367 () =>
368 new FixedThreadPool(
369 numberOfWorkers,
370 './tests/worker-files/thread/testWorker.mjs',
371 {
372 enableTasksQueue: true,
373 tasksQueueOptions: { concurrency: 0 }
374 }
375 )
376 ).toThrow(
377 new RangeError(
378 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
379 )
380 )
381 expect(
382 () =>
383 new FixedThreadPool(
384 numberOfWorkers,
385 './tests/worker-files/thread/testWorker.mjs',
386 {
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: -1 }
389 }
390 )
391 ).toThrow(
392 new RangeError(
393 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
394 )
395 )
396 expect(
397 () =>
398 new FixedThreadPool(
399 numberOfWorkers,
400 './tests/worker-files/thread/testWorker.mjs',
401 {
402 enableTasksQueue: true,
403 tasksQueueOptions: { concurrency: 0.2 }
404 }
405 )
406 ).toThrow(
407 new TypeError('Invalid worker node tasks concurrency: must be an integer')
408 )
409 expect(
410 () =>
411 new FixedThreadPool(
412 numberOfWorkers,
413 './tests/worker-files/thread/testWorker.mjs',
414 {
415 enableTasksQueue: true,
416 tasksQueueOptions: { size: 0 }
417 }
418 )
419 ).toThrow(
420 new RangeError(
421 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
422 )
423 )
424 expect(
425 () =>
426 new FixedThreadPool(
427 numberOfWorkers,
428 './tests/worker-files/thread/testWorker.mjs',
429 {
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: -1 }
432 }
433 )
434 ).toThrow(
435 new RangeError(
436 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
437 )
438 )
439 expect(
440 () =>
441 new FixedThreadPool(
442 numberOfWorkers,
443 './tests/worker-files/thread/testWorker.mjs',
444 {
445 enableTasksQueue: true,
446 tasksQueueOptions: { size: 0.2 }
447 }
448 )
449 ).toThrow(
450 new TypeError('Invalid worker node tasks queue size: must be an integer')
451 )
452 })
453
454 it('Verify that pool worker choice strategy options can be set', async () => {
455 const pool = new FixedThreadPool(
456 numberOfWorkers,
457 './tests/worker-files/thread/testWorker.mjs',
458 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
459 )
460 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
461 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
462 retries: pool.info.maxSize,
463 runTime: { median: false },
464 waitTime: { median: false },
465 elu: { median: false }
466 })
467 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
468 .workerChoiceStrategies) {
469 expect(workerChoiceStrategy.opts).toStrictEqual({
470 retries: pool.info.maxSize,
471 runTime: { median: false },
472 waitTime: { median: false },
473 elu: { median: false }
474 })
475 }
476 expect(
477 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
478 ).toStrictEqual({
479 runTime: {
480 aggregate: true,
481 average: true,
482 median: false
483 },
484 waitTime: {
485 aggregate: false,
486 average: false,
487 median: false
488 },
489 elu: {
490 aggregate: true,
491 average: true,
492 median: false
493 }
494 })
495 pool.setWorkerChoiceStrategyOptions({
496 runTime: { median: true },
497 elu: { median: true }
498 })
499 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
500 runTime: { median: true },
501 elu: { median: true }
502 })
503 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
504 retries: pool.info.maxSize,
505 runTime: { median: true },
506 waitTime: { median: false },
507 elu: { median: true }
508 })
509 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
510 .workerChoiceStrategies) {
511 expect(workerChoiceStrategy.opts).toStrictEqual({
512 retries: pool.info.maxSize,
513 runTime: { median: true },
514 waitTime: { median: false },
515 elu: { median: true }
516 })
517 }
518 expect(
519 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
520 ).toStrictEqual({
521 runTime: {
522 aggregate: true,
523 average: false,
524 median: true
525 },
526 waitTime: {
527 aggregate: false,
528 average: false,
529 median: false
530 },
531 elu: {
532 aggregate: true,
533 average: false,
534 median: true
535 }
536 })
537 pool.setWorkerChoiceStrategyOptions({
538 runTime: { median: false },
539 elu: { median: false }
540 })
541 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
542 runTime: { median: false },
543 elu: { median: false }
544 })
545 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
546 retries: pool.info.maxSize,
547 runTime: { median: false },
548 waitTime: { median: false },
549 elu: { median: false }
550 })
551 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
552 .workerChoiceStrategies) {
553 expect(workerChoiceStrategy.opts).toStrictEqual({
554 retries: pool.info.maxSize,
555 runTime: { median: false },
556 waitTime: { median: false },
557 elu: { median: false }
558 })
559 }
560 expect(
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
562 ).toStrictEqual({
563 runTime: {
564 aggregate: true,
565 average: true,
566 median: false
567 },
568 waitTime: {
569 aggregate: false,
570 average: false,
571 median: false
572 },
573 elu: {
574 aggregate: true,
575 average: true,
576 median: false
577 }
578 })
579 expect(() =>
580 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
581 ).toThrow(
582 new TypeError(
583 'Invalid worker choice strategy options: must be a plain object'
584 )
585 )
586 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
587 new Error(
588 'Invalid worker choice strategy options: must have a weight for each worker node'
589 )
590 )
591 expect(() =>
592 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
593 ).toThrow(
594 new Error(
595 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
596 )
597 )
598 await pool.destroy()
599 })
600
601 it('Verify that pool tasks queue can be enabled/disabled', async () => {
602 const pool = new FixedThreadPool(
603 numberOfWorkers,
604 './tests/worker-files/thread/testWorker.mjs'
605 )
606 expect(pool.opts.enableTasksQueue).toBe(false)
607 expect(pool.opts.tasksQueueOptions).toBeUndefined()
608 pool.enableTasksQueue(true)
609 expect(pool.opts.enableTasksQueue).toBe(true)
610 expect(pool.opts.tasksQueueOptions).toStrictEqual({
611 concurrency: 1,
612 size: Math.pow(numberOfWorkers, 2),
613 taskStealing: true,
614 tasksStealingOnBackPressure: true,
615 tasksFinishedTimeout: 2000
616 })
617 pool.enableTasksQueue(true, { concurrency: 2 })
618 expect(pool.opts.enableTasksQueue).toBe(true)
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 concurrency: 2,
621 size: Math.pow(numberOfWorkers, 2),
622 taskStealing: true,
623 tasksStealingOnBackPressure: true,
624 tasksFinishedTimeout: 2000
625 })
626 pool.enableTasksQueue(false)
627 expect(pool.opts.enableTasksQueue).toBe(false)
628 expect(pool.opts.tasksQueueOptions).toBeUndefined()
629 await pool.destroy()
630 })
631
632 it('Verify that pool tasks queue options can be set', async () => {
633 const pool = new FixedThreadPool(
634 numberOfWorkers,
635 './tests/worker-files/thread/testWorker.mjs',
636 { enableTasksQueue: true }
637 )
638 expect(pool.opts.tasksQueueOptions).toStrictEqual({
639 concurrency: 1,
640 size: Math.pow(numberOfWorkers, 2),
641 taskStealing: true,
642 tasksStealingOnBackPressure: true,
643 tasksFinishedTimeout: 2000
644 })
645 for (const workerNode of pool.workerNodes) {
646 expect(workerNode.tasksQueueBackPressureSize).toBe(
647 pool.opts.tasksQueueOptions.size
648 )
649 }
650 pool.setTasksQueueOptions({
651 concurrency: 2,
652 size: 2,
653 taskStealing: false,
654 tasksStealingOnBackPressure: false,
655 tasksFinishedTimeout: 3000
656 })
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
659 size: 2,
660 taskStealing: false,
661 tasksStealingOnBackPressure: false,
662 tasksFinishedTimeout: 3000
663 })
664 for (const workerNode of pool.workerNodes) {
665 expect(workerNode.tasksQueueBackPressureSize).toBe(
666 pool.opts.tasksQueueOptions.size
667 )
668 }
669 pool.setTasksQueueOptions({
670 concurrency: 1,
671 taskStealing: true,
672 tasksStealingOnBackPressure: true
673 })
674 expect(pool.opts.tasksQueueOptions).toStrictEqual({
675 concurrency: 1,
676 size: Math.pow(numberOfWorkers, 2),
677 taskStealing: true,
678 tasksStealingOnBackPressure: true,
679 tasksFinishedTimeout: 2000
680 })
681 for (const workerNode of pool.workerNodes) {
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
684 )
685 }
686 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
687 new TypeError('Invalid tasks queue options: must be a plain object')
688 )
689 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
690 new RangeError(
691 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
692 )
693 )
694 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
695 new RangeError(
696 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
697 )
698 )
699 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
700 new TypeError('Invalid worker node tasks concurrency: must be an integer')
701 )
702 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
703 new RangeError(
704 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
705 )
706 )
707 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
708 new RangeError(
709 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
710 )
711 )
712 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
713 new TypeError('Invalid worker node tasks queue size: must be an integer')
714 )
715 await pool.destroy()
716 })
717
718 it('Verify that pool info is set', async () => {
719 let pool = new FixedThreadPool(
720 numberOfWorkers,
721 './tests/worker-files/thread/testWorker.mjs'
722 )
723 expect(pool.info).toStrictEqual({
724 version,
725 type: PoolTypes.fixed,
726 worker: WorkerTypes.thread,
727 started: true,
728 ready: true,
729 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
730 minSize: numberOfWorkers,
731 maxSize: numberOfWorkers,
732 workerNodes: numberOfWorkers,
733 idleWorkerNodes: numberOfWorkers,
734 busyWorkerNodes: 0,
735 executedTasks: 0,
736 executingTasks: 0,
737 failedTasks: 0
738 })
739 await pool.destroy()
740 pool = new DynamicClusterPool(
741 Math.floor(numberOfWorkers / 2),
742 numberOfWorkers,
743 './tests/worker-files/cluster/testWorker.js'
744 )
745 expect(pool.info).toStrictEqual({
746 version,
747 type: PoolTypes.dynamic,
748 worker: WorkerTypes.cluster,
749 started: true,
750 ready: true,
751 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
752 minSize: Math.floor(numberOfWorkers / 2),
753 maxSize: numberOfWorkers,
754 workerNodes: Math.floor(numberOfWorkers / 2),
755 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
756 busyWorkerNodes: 0,
757 executedTasks: 0,
758 executingTasks: 0,
759 failedTasks: 0
760 })
761 await pool.destroy()
762 })
763
764 it('Verify that pool worker tasks usage are initialized', async () => {
765 const pool = new FixedClusterPool(
766 numberOfWorkers,
767 './tests/worker-files/cluster/testWorker.js'
768 )
769 for (const workerNode of pool.workerNodes) {
770 expect(workerNode).toBeInstanceOf(WorkerNode)
771 expect(workerNode.usage).toStrictEqual({
772 tasks: {
773 executed: 0,
774 executing: 0,
775 queued: 0,
776 maxQueued: 0,
777 sequentiallyStolen: 0,
778 stolen: 0,
779 failed: 0
780 },
781 runTime: {
782 history: new CircularArray()
783 },
784 waitTime: {
785 history: new CircularArray()
786 },
787 elu: {
788 idle: {
789 history: new CircularArray()
790 },
791 active: {
792 history: new CircularArray()
793 }
794 }
795 })
796 }
797 await pool.destroy()
798 })
799
800 it('Verify that pool worker tasks queue are initialized', async () => {
801 let pool = new FixedClusterPool(
802 numberOfWorkers,
803 './tests/worker-files/cluster/testWorker.js'
804 )
805 for (const workerNode of pool.workerNodes) {
806 expect(workerNode).toBeInstanceOf(WorkerNode)
807 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
808 expect(workerNode.tasksQueue.size).toBe(0)
809 expect(workerNode.tasksQueue.maxSize).toBe(0)
810 }
811 await pool.destroy()
812 pool = new DynamicThreadPool(
813 Math.floor(numberOfWorkers / 2),
814 numberOfWorkers,
815 './tests/worker-files/thread/testWorker.mjs'
816 )
817 for (const workerNode of pool.workerNodes) {
818 expect(workerNode).toBeInstanceOf(WorkerNode)
819 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
820 expect(workerNode.tasksQueue.size).toBe(0)
821 expect(workerNode.tasksQueue.maxSize).toBe(0)
822 }
823 await pool.destroy()
824 })
825
826 it('Verify that pool worker info are initialized', async () => {
827 let pool = new FixedClusterPool(
828 numberOfWorkers,
829 './tests/worker-files/cluster/testWorker.js'
830 )
831 for (const workerNode of pool.workerNodes) {
832 expect(workerNode).toBeInstanceOf(WorkerNode)
833 expect(workerNode.info).toStrictEqual({
834 id: expect.any(Number),
835 type: WorkerTypes.cluster,
836 dynamic: false,
837 ready: true
838 })
839 }
840 await pool.destroy()
841 pool = new DynamicThreadPool(
842 Math.floor(numberOfWorkers / 2),
843 numberOfWorkers,
844 './tests/worker-files/thread/testWorker.mjs'
845 )
846 for (const workerNode of pool.workerNodes) {
847 expect(workerNode).toBeInstanceOf(WorkerNode)
848 expect(workerNode.info).toStrictEqual({
849 id: expect.any(Number),
850 type: WorkerTypes.thread,
851 dynamic: false,
852 ready: true
853 })
854 }
855 await pool.destroy()
856 })
857
858 it('Verify that pool statuses are checked at start or destroy', async () => {
859 const pool = new FixedThreadPool(
860 numberOfWorkers,
861 './tests/worker-files/thread/testWorker.mjs'
862 )
863 expect(pool.info.started).toBe(true)
864 expect(pool.info.ready).toBe(true)
865 expect(() => pool.start()).toThrow(
866 new Error('Cannot start an already started pool')
867 )
868 await pool.destroy()
869 expect(pool.info.started).toBe(false)
870 expect(pool.info.ready).toBe(false)
871 await expect(pool.destroy()).rejects.toThrow(
872 new Error('Cannot destroy an already destroyed pool')
873 )
874 })
875
876 it('Verify that pool can be started after initialization', async () => {
877 const pool = new FixedClusterPool(
878 numberOfWorkers,
879 './tests/worker-files/cluster/testWorker.js',
880 {
881 startWorkers: false
882 }
883 )
884 expect(pool.info.started).toBe(false)
885 expect(pool.info.ready).toBe(false)
886 expect(pool.readyEventEmitted).toBe(false)
887 expect(pool.workerNodes).toStrictEqual([])
888 await expect(pool.execute()).rejects.toThrow(
889 new Error('Cannot execute a task on not started pool')
890 )
891 pool.start()
892 expect(pool.info.started).toBe(true)
893 expect(pool.info.ready).toBe(true)
894 await waitPoolEvents(pool, PoolEvents.ready, 1)
895 expect(pool.readyEventEmitted).toBe(true)
896 expect(pool.workerNodes.length).toBe(numberOfWorkers)
897 for (const workerNode of pool.workerNodes) {
898 expect(workerNode).toBeInstanceOf(WorkerNode)
899 }
900 await pool.destroy()
901 })
902
903 it('Verify that pool execute() arguments are checked', async () => {
904 const pool = new FixedClusterPool(
905 numberOfWorkers,
906 './tests/worker-files/cluster/testWorker.js'
907 )
908 await expect(pool.execute(undefined, 0)).rejects.toThrow(
909 new TypeError('name argument must be a string')
910 )
911 await expect(pool.execute(undefined, '')).rejects.toThrow(
912 new TypeError('name argument must not be an empty string')
913 )
914 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
915 new TypeError('transferList argument must be an array')
916 )
917 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
918 "Task function 'unknown' not found"
919 )
920 await pool.destroy()
921 await expect(pool.execute()).rejects.toThrow(
922 new Error('Cannot execute a task on not started pool')
923 )
924 })
925
926 it('Verify that pool worker tasks usage are computed', async () => {
927 const pool = new FixedClusterPool(
928 numberOfWorkers,
929 './tests/worker-files/cluster/testWorker.js'
930 )
931 const promises = new Set()
932 const maxMultiplier = 2
933 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
934 promises.add(pool.execute())
935 }
936 for (const workerNode of pool.workerNodes) {
937 expect(workerNode.usage).toStrictEqual({
938 tasks: {
939 executed: 0,
940 executing: maxMultiplier,
941 queued: 0,
942 maxQueued: 0,
943 sequentiallyStolen: 0,
944 stolen: 0,
945 failed: 0
946 },
947 runTime: {
948 history: expect.any(CircularArray)
949 },
950 waitTime: {
951 history: expect.any(CircularArray)
952 },
953 elu: {
954 idle: {
955 history: expect.any(CircularArray)
956 },
957 active: {
958 history: expect.any(CircularArray)
959 }
960 }
961 })
962 }
963 await Promise.all(promises)
964 for (const workerNode of pool.workerNodes) {
965 expect(workerNode.usage).toStrictEqual({
966 tasks: {
967 executed: maxMultiplier,
968 executing: 0,
969 queued: 0,
970 maxQueued: 0,
971 sequentiallyStolen: 0,
972 stolen: 0,
973 failed: 0
974 },
975 runTime: {
976 history: expect.any(CircularArray)
977 },
978 waitTime: {
979 history: expect.any(CircularArray)
980 },
981 elu: {
982 idle: {
983 history: expect.any(CircularArray)
984 },
985 active: {
986 history: expect.any(CircularArray)
987 }
988 }
989 })
990 }
991 await pool.destroy()
992 })
993
994 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
995 const pool = new DynamicThreadPool(
996 Math.floor(numberOfWorkers / 2),
997 numberOfWorkers,
998 './tests/worker-files/thread/testWorker.mjs'
999 )
1000 const promises = new Set()
1001 const maxMultiplier = 2
1002 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1003 promises.add(pool.execute())
1004 }
1005 await Promise.all(promises)
1006 for (const workerNode of pool.workerNodes) {
1007 expect(workerNode.usage).toStrictEqual({
1008 tasks: {
1009 executed: expect.any(Number),
1010 executing: 0,
1011 queued: 0,
1012 maxQueued: 0,
1013 sequentiallyStolen: 0,
1014 stolen: 0,
1015 failed: 0
1016 },
1017 runTime: {
1018 history: expect.any(CircularArray)
1019 },
1020 waitTime: {
1021 history: expect.any(CircularArray)
1022 },
1023 elu: {
1024 idle: {
1025 history: expect.any(CircularArray)
1026 },
1027 active: {
1028 history: expect.any(CircularArray)
1029 }
1030 }
1031 })
1032 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1033 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1034 numberOfWorkers * maxMultiplier
1035 )
1036 expect(workerNode.usage.runTime.history.length).toBe(0)
1037 expect(workerNode.usage.waitTime.history.length).toBe(0)
1038 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1039 expect(workerNode.usage.elu.active.history.length).toBe(0)
1040 }
1041 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1042 for (const workerNode of pool.workerNodes) {
1043 expect(workerNode.usage).toStrictEqual({
1044 tasks: {
1045 executed: 0,
1046 executing: 0,
1047 queued: 0,
1048 maxQueued: 0,
1049 sequentiallyStolen: 0,
1050 stolen: 0,
1051 failed: 0
1052 },
1053 runTime: {
1054 history: expect.any(CircularArray)
1055 },
1056 waitTime: {
1057 history: expect.any(CircularArray)
1058 },
1059 elu: {
1060 idle: {
1061 history: expect.any(CircularArray)
1062 },
1063 active: {
1064 history: expect.any(CircularArray)
1065 }
1066 }
1067 })
1068 expect(workerNode.usage.runTime.history.length).toBe(0)
1069 expect(workerNode.usage.waitTime.history.length).toBe(0)
1070 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1071 expect(workerNode.usage.elu.active.history.length).toBe(0)
1072 }
1073 await pool.destroy()
1074 })
1075
1076 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1077 const pool = new DynamicClusterPool(
1078 Math.floor(numberOfWorkers / 2),
1079 numberOfWorkers,
1080 './tests/worker-files/cluster/testWorker.js'
1081 )
1082 expect(pool.emitter.eventNames()).toStrictEqual([])
1083 let poolInfo
1084 let poolReady = 0
1085 pool.emitter.on(PoolEvents.ready, info => {
1086 ++poolReady
1087 poolInfo = info
1088 })
1089 await waitPoolEvents(pool, PoolEvents.ready, 1)
1090 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1091 expect(poolReady).toBe(1)
1092 expect(poolInfo).toStrictEqual({
1093 version,
1094 type: PoolTypes.dynamic,
1095 worker: WorkerTypes.cluster,
1096 started: true,
1097 ready: true,
1098 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1099 minSize: expect.any(Number),
1100 maxSize: expect.any(Number),
1101 workerNodes: expect.any(Number),
1102 idleWorkerNodes: expect.any(Number),
1103 busyWorkerNodes: expect.any(Number),
1104 executedTasks: expect.any(Number),
1105 executingTasks: expect.any(Number),
1106 failedTasks: expect.any(Number)
1107 })
1108 await pool.destroy()
1109 })
1110
1111 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1112 const pool = new FixedThreadPool(
1113 numberOfWorkers,
1114 './tests/worker-files/thread/testWorker.mjs'
1115 )
1116 expect(pool.emitter.eventNames()).toStrictEqual([])
1117 const promises = new Set()
1118 let poolBusy = 0
1119 let poolInfo
1120 pool.emitter.on(PoolEvents.busy, info => {
1121 ++poolBusy
1122 poolInfo = info
1123 })
1124 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1125 for (let i = 0; i < numberOfWorkers * 2; i++) {
1126 promises.add(pool.execute())
1127 }
1128 await Promise.all(promises)
1129 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1130 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1131 expect(poolBusy).toBe(numberOfWorkers + 1)
1132 expect(poolInfo).toStrictEqual({
1133 version,
1134 type: PoolTypes.fixed,
1135 worker: WorkerTypes.thread,
1136 started: true,
1137 ready: true,
1138 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1139 minSize: expect.any(Number),
1140 maxSize: expect.any(Number),
1141 workerNodes: expect.any(Number),
1142 idleWorkerNodes: expect.any(Number),
1143 busyWorkerNodes: expect.any(Number),
1144 executedTasks: expect.any(Number),
1145 executingTasks: expect.any(Number),
1146 failedTasks: expect.any(Number)
1147 })
1148 await pool.destroy()
1149 })
1150
1151 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1152 const pool = new DynamicThreadPool(
1153 Math.floor(numberOfWorkers / 2),
1154 numberOfWorkers,
1155 './tests/worker-files/thread/testWorker.mjs'
1156 )
1157 expect(pool.emitter.eventNames()).toStrictEqual([])
1158 const promises = new Set()
1159 let poolFull = 0
1160 let poolInfo
1161 pool.emitter.on(PoolEvents.full, info => {
1162 ++poolFull
1163 poolInfo = info
1164 })
1165 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1166 for (let i = 0; i < numberOfWorkers * 2; i++) {
1167 promises.add(pool.execute())
1168 }
1169 await Promise.all(promises)
1170 expect(poolFull).toBe(1)
1171 expect(poolInfo).toStrictEqual({
1172 version,
1173 type: PoolTypes.dynamic,
1174 worker: WorkerTypes.thread,
1175 started: true,
1176 ready: true,
1177 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1178 minSize: expect.any(Number),
1179 maxSize: expect.any(Number),
1180 workerNodes: expect.any(Number),
1181 idleWorkerNodes: expect.any(Number),
1182 busyWorkerNodes: expect.any(Number),
1183 executedTasks: expect.any(Number),
1184 executingTasks: expect.any(Number),
1185 failedTasks: expect.any(Number)
1186 })
1187 await pool.destroy()
1188 })
1189
1190 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1191 const pool = new FixedThreadPool(
1192 numberOfWorkers,
1193 './tests/worker-files/thread/testWorker.mjs',
1194 {
1195 enableTasksQueue: true
1196 }
1197 )
1198 stub(pool, 'hasBackPressure').returns(true)
1199 expect(pool.emitter.eventNames()).toStrictEqual([])
1200 const promises = new Set()
1201 let poolBackPressure = 0
1202 let poolInfo
1203 pool.emitter.on(PoolEvents.backPressure, info => {
1204 ++poolBackPressure
1205 poolInfo = info
1206 })
1207 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1208 for (let i = 0; i < numberOfWorkers + 1; i++) {
1209 promises.add(pool.execute())
1210 }
1211 await Promise.all(promises)
1212 expect(poolBackPressure).toBe(1)
1213 expect(poolInfo).toStrictEqual({
1214 version,
1215 type: PoolTypes.fixed,
1216 worker: WorkerTypes.thread,
1217 started: true,
1218 ready: true,
1219 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1220 minSize: expect.any(Number),
1221 maxSize: expect.any(Number),
1222 workerNodes: expect.any(Number),
1223 idleWorkerNodes: expect.any(Number),
1224 busyWorkerNodes: expect.any(Number),
1225 executedTasks: expect.any(Number),
1226 executingTasks: expect.any(Number),
1227 maxQueuedTasks: expect.any(Number),
1228 queuedTasks: expect.any(Number),
1229 backPressure: true,
1230 stolenTasks: expect.any(Number),
1231 failedTasks: expect.any(Number)
1232 })
1233 expect(pool.hasBackPressure.callCount).toBe(5)
1234 await pool.destroy()
1235 })
1236
1237 it('Verify that destroy() waits for queued tasks to finish', async () => {
1238 const tasksFinishedTimeout = 2500
1239 const pool = new FixedThreadPool(
1240 numberOfWorkers,
1241 './tests/worker-files/thread/asyncWorker.mjs',
1242 {
1243 enableTasksQueue: true,
1244 tasksQueueOptions: { tasksFinishedTimeout }
1245 }
1246 )
1247 const maxMultiplier = 4
1248 let tasksFinished = 0
1249 for (const workerNode of pool.workerNodes) {
1250 workerNode.on('taskFinished', () => {
1251 ++tasksFinished
1252 })
1253 }
1254 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1255 pool.execute()
1256 }
1257 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1258 const startTime = performance.now()
1259 await pool.destroy()
1260 const elapsedTime = performance.now() - startTime
1261 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1262 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1263 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1264 })
1265
1266 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1267 const tasksFinishedTimeout = 1000
1268 const pool = new FixedThreadPool(
1269 numberOfWorkers,
1270 './tests/worker-files/thread/asyncWorker.mjs',
1271 {
1272 enableTasksQueue: true,
1273 tasksQueueOptions: { tasksFinishedTimeout }
1274 }
1275 )
1276 const maxMultiplier = 4
1277 let tasksFinished = 0
1278 for (const workerNode of pool.workerNodes) {
1279 workerNode.on('taskFinished', () => {
1280 ++tasksFinished
1281 })
1282 }
1283 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1284 pool.execute()
1285 }
1286 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1287 const startTime = performance.now()
1288 await pool.destroy()
1289 const elapsedTime = performance.now() - startTime
1290 expect(tasksFinished).toBe(0)
1291 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1292 })
1293
1294 it('Verify that pool asynchronous resource track tasks execution', async () => {
1295 let taskAsyncId
1296 let initCalls = 0
1297 let beforeCalls = 0
1298 let afterCalls = 0
1299 let resolveCalls = 0
1300 const hook = createHook({
1301 init (asyncId, type) {
1302 if (type === 'poolifier:task') {
1303 initCalls++
1304 taskAsyncId = asyncId
1305 }
1306 },
1307 before (asyncId) {
1308 if (asyncId === taskAsyncId) beforeCalls++
1309 },
1310 after (asyncId) {
1311 if (asyncId === taskAsyncId) afterCalls++
1312 },
1313 promiseResolve () {
1314 if (executionAsyncId() === taskAsyncId) resolveCalls++
1315 }
1316 })
1317 const pool = new FixedThreadPool(
1318 numberOfWorkers,
1319 './tests/worker-files/thread/testWorker.mjs'
1320 )
1321 hook.enable()
1322 await pool.execute()
1323 hook.disable()
1324 expect(initCalls).toBe(1)
1325 expect(beforeCalls).toBe(1)
1326 expect(afterCalls).toBe(1)
1327 expect(resolveCalls).toBe(1)
1328 await pool.destroy()
1329 })
1330
1331 it('Verify that hasTaskFunction() is working', async () => {
1332 const dynamicThreadPool = new DynamicThreadPool(
1333 Math.floor(numberOfWorkers / 2),
1334 numberOfWorkers,
1335 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1336 )
1337 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1338 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1339 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1340 true
1341 )
1342 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1343 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1344 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1345 await dynamicThreadPool.destroy()
1346 const fixedClusterPool = new FixedClusterPool(
1347 numberOfWorkers,
1348 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1349 )
1350 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1351 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1352 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1353 true
1354 )
1355 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1356 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1357 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1358 await fixedClusterPool.destroy()
1359 })
1360
1361 it('Verify that addTaskFunction() is working', async () => {
1362 const dynamicThreadPool = new DynamicThreadPool(
1363 Math.floor(numberOfWorkers / 2),
1364 numberOfWorkers,
1365 './tests/worker-files/thread/testWorker.mjs'
1366 )
1367 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1368 await expect(
1369 dynamicThreadPool.addTaskFunction(0, () => {})
1370 ).rejects.toThrow(new TypeError('name argument must be a string'))
1371 await expect(
1372 dynamicThreadPool.addTaskFunction('', () => {})
1373 ).rejects.toThrow(
1374 new TypeError('name argument must not be an empty string')
1375 )
1376 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1377 new TypeError('fn argument must be a function')
1378 )
1379 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1380 new TypeError('fn argument must be a function')
1381 )
1382 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1383 DEFAULT_TASK_NAME,
1384 'test'
1385 ])
1386 const echoTaskFunction = data => {
1387 return data
1388 }
1389 await expect(
1390 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1391 ).resolves.toBe(true)
1392 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1393 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1394 echoTaskFunction
1395 )
1396 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1397 DEFAULT_TASK_NAME,
1398 'test',
1399 'echo'
1400 ])
1401 const taskFunctionData = { test: 'test' }
1402 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1403 expect(echoResult).toStrictEqual(taskFunctionData)
1404 for (const workerNode of dynamicThreadPool.workerNodes) {
1405 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1406 tasks: {
1407 executed: expect.any(Number),
1408 executing: 0,
1409 queued: 0,
1410 sequentiallyStolen: 0,
1411 stolen: 0,
1412 failed: 0
1413 },
1414 runTime: {
1415 history: new CircularArray()
1416 },
1417 waitTime: {
1418 history: new CircularArray()
1419 },
1420 elu: {
1421 idle: {
1422 history: new CircularArray()
1423 },
1424 active: {
1425 history: new CircularArray()
1426 }
1427 }
1428 })
1429 }
1430 await dynamicThreadPool.destroy()
1431 })
1432
1433 it('Verify that removeTaskFunction() is working', async () => {
1434 const dynamicThreadPool = new DynamicThreadPool(
1435 Math.floor(numberOfWorkers / 2),
1436 numberOfWorkers,
1437 './tests/worker-files/thread/testWorker.mjs'
1438 )
1439 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1440 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1441 DEFAULT_TASK_NAME,
1442 'test'
1443 ])
1444 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1445 new Error('Cannot remove a task function not handled on the pool side')
1446 )
1447 const echoTaskFunction = data => {
1448 return data
1449 }
1450 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1451 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1452 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1453 echoTaskFunction
1454 )
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1456 DEFAULT_TASK_NAME,
1457 'test',
1458 'echo'
1459 ])
1460 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1461 true
1462 )
1463 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1464 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1465 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1466 DEFAULT_TASK_NAME,
1467 'test'
1468 ])
1469 await dynamicThreadPool.destroy()
1470 })
1471
1472 it('Verify that listTaskFunctionNames() is working', async () => {
1473 const dynamicThreadPool = new DynamicThreadPool(
1474 Math.floor(numberOfWorkers / 2),
1475 numberOfWorkers,
1476 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1477 )
1478 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1479 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1480 DEFAULT_TASK_NAME,
1481 'jsonIntegerSerialization',
1482 'factorial',
1483 'fibonacci'
1484 ])
1485 await dynamicThreadPool.destroy()
1486 const fixedClusterPool = new FixedClusterPool(
1487 numberOfWorkers,
1488 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1489 )
1490 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1491 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1492 DEFAULT_TASK_NAME,
1493 'jsonIntegerSerialization',
1494 'factorial',
1495 'fibonacci'
1496 ])
1497 await fixedClusterPool.destroy()
1498 })
1499
1500 it('Verify that setDefaultTaskFunction() is working', async () => {
1501 const dynamicThreadPool = new DynamicThreadPool(
1502 Math.floor(numberOfWorkers / 2),
1503 numberOfWorkers,
1504 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1505 )
1506 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1507 const workerId = dynamicThreadPool.workerNodes[0].info.id
1508 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1509 new Error(
1510 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1511 )
1512 )
1513 await expect(
1514 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1515 ).rejects.toThrow(
1516 new Error(
1517 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1518 )
1519 )
1520 await expect(
1521 dynamicThreadPool.setDefaultTaskFunction('unknown')
1522 ).rejects.toThrow(
1523 new Error(
1524 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1525 )
1526 )
1527 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1528 DEFAULT_TASK_NAME,
1529 'jsonIntegerSerialization',
1530 'factorial',
1531 'fibonacci'
1532 ])
1533 await expect(
1534 dynamicThreadPool.setDefaultTaskFunction('factorial')
1535 ).resolves.toBe(true)
1536 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1537 DEFAULT_TASK_NAME,
1538 'factorial',
1539 'jsonIntegerSerialization',
1540 'fibonacci'
1541 ])
1542 await expect(
1543 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1544 ).resolves.toBe(true)
1545 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1546 DEFAULT_TASK_NAME,
1547 'fibonacci',
1548 'jsonIntegerSerialization',
1549 'factorial'
1550 ])
1551 await dynamicThreadPool.destroy()
1552 })
1553
1554 it('Verify that multiple task functions worker is working', async () => {
1555 const pool = new DynamicClusterPool(
1556 Math.floor(numberOfWorkers / 2),
1557 numberOfWorkers,
1558 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1559 )
1560 const data = { n: 10 }
1561 const result0 = await pool.execute(data)
1562 expect(result0).toStrictEqual({ ok: 1 })
1563 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1564 expect(result1).toStrictEqual({ ok: 1 })
1565 const result2 = await pool.execute(data, 'factorial')
1566 expect(result2).toBe(3628800)
1567 const result3 = await pool.execute(data, 'fibonacci')
1568 expect(result3).toBe(55)
1569 expect(pool.info.executingTasks).toBe(0)
1570 expect(pool.info.executedTasks).toBe(4)
1571 for (const workerNode of pool.workerNodes) {
1572 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1573 DEFAULT_TASK_NAME,
1574 'jsonIntegerSerialization',
1575 'factorial',
1576 'fibonacci'
1577 ])
1578 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1579 for (const name of pool.listTaskFunctionNames()) {
1580 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1581 tasks: {
1582 executed: expect.any(Number),
1583 executing: 0,
1584 failed: 0,
1585 queued: 0,
1586 sequentiallyStolen: 0,
1587 stolen: 0
1588 },
1589 runTime: {
1590 history: expect.any(CircularArray)
1591 },
1592 waitTime: {
1593 history: expect.any(CircularArray)
1594 },
1595 elu: {
1596 idle: {
1597 history: expect.any(CircularArray)
1598 },
1599 active: {
1600 history: expect.any(CircularArray)
1601 }
1602 }
1603 })
1604 expect(
1605 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1606 ).toBeGreaterThan(0)
1607 }
1608 expect(
1609 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1610 ).toStrictEqual(
1611 workerNode.getTaskFunctionWorkerUsage(
1612 workerNode.info.taskFunctionNames[1]
1613 )
1614 )
1615 }
1616 await pool.destroy()
1617 })
1618
1619 it('Verify sendKillMessageToWorker()', async () => {
1620 const pool = new DynamicClusterPool(
1621 Math.floor(numberOfWorkers / 2),
1622 numberOfWorkers,
1623 './tests/worker-files/cluster/testWorker.js'
1624 )
1625 const workerNodeKey = 0
1626 await expect(
1627 pool.sendKillMessageToWorker(workerNodeKey)
1628 ).resolves.toBeUndefined()
1629 await expect(
1630 pool.sendKillMessageToWorker(numberOfWorkers)
1631 ).rejects.toStrictEqual(
1632 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1633 )
1634 await pool.destroy()
1635 })
1636
1637 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1638 const pool = new DynamicClusterPool(
1639 Math.floor(numberOfWorkers / 2),
1640 numberOfWorkers,
1641 './tests/worker-files/cluster/testWorker.js'
1642 )
1643 const workerNodeKey = 0
1644 await expect(
1645 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1646 taskFunctionOperation: 'add',
1647 taskFunctionName: 'empty',
1648 taskFunction: (() => {}).toString()
1649 })
1650 ).resolves.toBe(true)
1651 expect(
1652 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1653 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1654 await pool.destroy()
1655 })
1656
1657 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1658 const pool = new DynamicClusterPool(
1659 Math.floor(numberOfWorkers / 2),
1660 numberOfWorkers,
1661 './tests/worker-files/cluster/testWorker.js'
1662 )
1663 await expect(
1664 pool.sendTaskFunctionOperationToWorkers({
1665 taskFunctionOperation: 'add',
1666 taskFunctionName: 'empty',
1667 taskFunction: (() => {}).toString()
1668 })
1669 ).resolves.toBe(true)
1670 for (const workerNode of pool.workerNodes) {
1671 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1672 DEFAULT_TASK_NAME,
1673 'test',
1674 'empty'
1675 ])
1676 }
1677 await pool.destroy()
1678 })
1679 })