refactor: refine prettier configuration
[poolifier.git] / tests / pools / abstract-pool.test.mjs
... / ...
CommitLineData
1import { EventEmitterAsyncResource } from 'node:events'
2import { dirname, join } from 'node:path'
3import { readFileSync } from 'node:fs'
4import { fileURLToPath } from 'node:url'
5import { createHook, executionAsyncId } from 'node:async_hooks'
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17} from '../../lib/index.cjs'
18import { CircularArray } from '../../lib/circular-array.cjs'
19import { Deque } from '../../lib/deque.cjs'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21import { waitPoolEvents } from '../test-utils.cjs'
22import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23
24describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.cjs'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.cjs'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.cjs'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.emitter.eventNames()).toStrictEqual([])
225 expect(pool.opts).toStrictEqual({
226 startWorkers: true,
227 enableEvents: true,
228 restartWorkerOnError: true,
229 enableTasksQueue: false,
230 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 })
232 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
233 .workerChoiceStrategies) {
234 expect(workerChoiceStrategy.opts).toStrictEqual({
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 }
244 await pool.destroy()
245 const testHandler = () => console.info('test handler executed')
246 pool = new FixedThreadPool(
247 numberOfWorkers,
248 './tests/worker-files/thread/testWorker.mjs',
249 {
250 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
251 workerChoiceStrategyOptions: {
252 runTime: { median: true },
253 weights: { 0: 300, 1: 200 }
254 },
255 enableEvents: false,
256 restartWorkerOnError: false,
257 enableTasksQueue: true,
258 tasksQueueOptions: { concurrency: 2 },
259 messageHandler: testHandler,
260 errorHandler: testHandler,
261 onlineHandler: testHandler,
262 exitHandler: testHandler
263 }
264 )
265 expect(pool.emitter).toBeUndefined()
266 expect(pool.opts).toStrictEqual({
267 startWorkers: true,
268 enableEvents: false,
269 restartWorkerOnError: false,
270 enableTasksQueue: true,
271 tasksQueueOptions: {
272 concurrency: 2,
273 size: Math.pow(numberOfWorkers, 2),
274 taskStealing: true,
275 tasksStealingOnBackPressure: true,
276 tasksFinishedTimeout: 2000
277 },
278 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
279 workerChoiceStrategyOptions: {
280 runTime: { median: true },
281 weights: { 0: 300, 1: 200 }
282 },
283 onlineHandler: testHandler,
284 messageHandler: testHandler,
285 errorHandler: testHandler,
286 exitHandler: testHandler
287 })
288 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
289 .workerChoiceStrategies) {
290 expect(workerChoiceStrategy.opts).toStrictEqual({
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
295 })
296 }
297 await pool.destroy()
298 })
299
300 it('Verify that pool options are validated', () => {
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
305 './tests/worker-files/thread/testWorker.mjs',
306 {
307 workerChoiceStrategy: 'invalidStrategy'
308 }
309 )
310 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
311 expect(
312 () =>
313 new FixedThreadPool(
314 numberOfWorkers,
315 './tests/worker-files/thread/testWorker.mjs',
316 {
317 workerChoiceStrategyOptions: { weights: {} }
318 }
319 )
320 ).toThrow(
321 new Error(
322 'Invalid worker choice strategy options: must have a weight for each worker node'
323 )
324 )
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
329 './tests/worker-files/thread/testWorker.mjs',
330 {
331 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
332 }
333 )
334 ).toThrow(
335 new Error(
336 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
337 )
338 )
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.mjs',
344 {
345 enableTasksQueue: true,
346 tasksQueueOptions: 'invalidTasksQueueOptions'
347 }
348 )
349 ).toThrow(
350 new TypeError('Invalid tasks queue options: must be a plain object')
351 )
352 expect(
353 () =>
354 new FixedThreadPool(
355 numberOfWorkers,
356 './tests/worker-files/thread/testWorker.mjs',
357 {
358 enableTasksQueue: true,
359 tasksQueueOptions: { concurrency: 0 }
360 }
361 )
362 ).toThrow(
363 new RangeError(
364 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
365 )
366 )
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
371 './tests/worker-files/thread/testWorker.mjs',
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: { concurrency: -1 }
375 }
376 )
377 ).toThrow(
378 new RangeError(
379 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
380 )
381 )
382 expect(
383 () =>
384 new FixedThreadPool(
385 numberOfWorkers,
386 './tests/worker-files/thread/testWorker.mjs',
387 {
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0.2 }
390 }
391 )
392 ).toThrow(
393 new TypeError('Invalid worker node tasks concurrency: must be an integer')
394 )
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
399 './tests/worker-files/thread/testWorker.mjs',
400 {
401 enableTasksQueue: true,
402 tasksQueueOptions: { size: 0 }
403 }
404 )
405 ).toThrow(
406 new RangeError(
407 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
408 )
409 )
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
414 './tests/worker-files/thread/testWorker.mjs',
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { size: -1 }
418 }
419 )
420 ).toThrow(
421 new RangeError(
422 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
423 )
424 )
425 expect(
426 () =>
427 new FixedThreadPool(
428 numberOfWorkers,
429 './tests/worker-files/thread/testWorker.mjs',
430 {
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0.2 }
433 }
434 )
435 ).toThrow(
436 new TypeError('Invalid worker node tasks queue size: must be an integer')
437 )
438 })
439
440 it('Verify that pool worker choice strategy options can be set', async () => {
441 const pool = new FixedThreadPool(
442 numberOfWorkers,
443 './tests/worker-files/thread/testWorker.mjs',
444 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
445 )
446 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
447 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
448 .workerChoiceStrategies) {
449 expect(workerChoiceStrategy.opts).toStrictEqual({
450 runTime: { median: false },
451 waitTime: { median: false },
452 elu: { median: false },
453 weights: expect.objectContaining({
454 0: expect.any(Number),
455 [pool.info.maxSize - 1]: expect.any(Number)
456 })
457 })
458 }
459 expect(
460 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 ).toStrictEqual({
462 runTime: {
463 aggregate: true,
464 average: true,
465 median: false
466 },
467 waitTime: {
468 aggregate: false,
469 average: false,
470 median: false
471 },
472 elu: {
473 aggregate: true,
474 average: true,
475 median: false
476 }
477 })
478 pool.setWorkerChoiceStrategyOptions({
479 runTime: { median: true },
480 elu: { median: true }
481 })
482 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
483 runTime: { median: true },
484 elu: { median: true }
485 })
486 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
487 .workerChoiceStrategies) {
488 expect(workerChoiceStrategy.opts).toStrictEqual({
489 runTime: { median: true },
490 waitTime: { median: false },
491 elu: { median: true },
492 weights: expect.objectContaining({
493 0: expect.any(Number),
494 [pool.info.maxSize - 1]: expect.any(Number)
495 })
496 })
497 }
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
501 runTime: {
502 aggregate: true,
503 average: false,
504 median: true
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
511 elu: {
512 aggregate: true,
513 average: false,
514 median: true
515 }
516 })
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: false },
519 elu: { median: false }
520 })
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: false },
523 elu: { median: false }
524 })
525 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
526 .workerChoiceStrategies) {
527 expect(workerChoiceStrategy.opts).toStrictEqual({
528 runTime: { median: false },
529 waitTime: { median: false },
530 elu: { median: false },
531 weights: expect.objectContaining({
532 0: expect.any(Number),
533 [pool.info.maxSize - 1]: expect.any(Number)
534 })
535 })
536 }
537 expect(
538 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
539 ).toStrictEqual({
540 runTime: {
541 aggregate: true,
542 average: true,
543 median: false
544 },
545 waitTime: {
546 aggregate: false,
547 average: false,
548 median: false
549 },
550 elu: {
551 aggregate: true,
552 average: true,
553 median: false
554 }
555 })
556 expect(() =>
557 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
558 ).toThrow(
559 new TypeError(
560 'Invalid worker choice strategy options: must be a plain object'
561 )
562 )
563 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
564 new Error(
565 'Invalid worker choice strategy options: must have a weight for each worker node'
566 )
567 )
568 expect(() =>
569 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
570 ).toThrow(
571 new Error(
572 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
573 )
574 )
575 await pool.destroy()
576 })
577
578 it('Verify that pool tasks queue can be enabled/disabled', async () => {
579 const pool = new FixedThreadPool(
580 numberOfWorkers,
581 './tests/worker-files/thread/testWorker.mjs'
582 )
583 expect(pool.opts.enableTasksQueue).toBe(false)
584 expect(pool.opts.tasksQueueOptions).toBeUndefined()
585 pool.enableTasksQueue(true)
586 expect(pool.opts.enableTasksQueue).toBe(true)
587 expect(pool.opts.tasksQueueOptions).toStrictEqual({
588 concurrency: 1,
589 size: Math.pow(numberOfWorkers, 2),
590 taskStealing: true,
591 tasksStealingOnBackPressure: true,
592 tasksFinishedTimeout: 2000
593 })
594 pool.enableTasksQueue(true, { concurrency: 2 })
595 expect(pool.opts.enableTasksQueue).toBe(true)
596 expect(pool.opts.tasksQueueOptions).toStrictEqual({
597 concurrency: 2,
598 size: Math.pow(numberOfWorkers, 2),
599 taskStealing: true,
600 tasksStealingOnBackPressure: true,
601 tasksFinishedTimeout: 2000
602 })
603 pool.enableTasksQueue(false)
604 expect(pool.opts.enableTasksQueue).toBe(false)
605 expect(pool.opts.tasksQueueOptions).toBeUndefined()
606 await pool.destroy()
607 })
608
609 it('Verify that pool tasks queue options can be set', async () => {
610 const pool = new FixedThreadPool(
611 numberOfWorkers,
612 './tests/worker-files/thread/testWorker.mjs',
613 { enableTasksQueue: true }
614 )
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
616 concurrency: 1,
617 size: Math.pow(numberOfWorkers, 2),
618 taskStealing: true,
619 tasksStealingOnBackPressure: true,
620 tasksFinishedTimeout: 2000
621 })
622 for (const workerNode of pool.workerNodes) {
623 expect(workerNode.tasksQueueBackPressureSize).toBe(
624 pool.opts.tasksQueueOptions.size
625 )
626 }
627 pool.setTasksQueueOptions({
628 concurrency: 2,
629 size: 2,
630 taskStealing: false,
631 tasksStealingOnBackPressure: false,
632 tasksFinishedTimeout: 3000
633 })
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
635 concurrency: 2,
636 size: 2,
637 taskStealing: false,
638 tasksStealingOnBackPressure: false,
639 tasksFinishedTimeout: 3000
640 })
641 for (const workerNode of pool.workerNodes) {
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
644 )
645 }
646 pool.setTasksQueueOptions({
647 concurrency: 1,
648 taskStealing: true,
649 tasksStealingOnBackPressure: true
650 })
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 concurrency: 1,
653 size: Math.pow(numberOfWorkers, 2),
654 taskStealing: true,
655 tasksStealingOnBackPressure: true,
656 tasksFinishedTimeout: 2000
657 })
658 for (const workerNode of pool.workerNodes) {
659 expect(workerNode.tasksQueueBackPressureSize).toBe(
660 pool.opts.tasksQueueOptions.size
661 )
662 }
663 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
664 new TypeError('Invalid tasks queue options: must be a plain object')
665 )
666 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
667 new RangeError(
668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
669 )
670 )
671 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
672 new RangeError(
673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
674 )
675 )
676 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
678 )
679 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
680 new RangeError(
681 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
682 )
683 )
684 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
685 new RangeError(
686 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
687 )
688 )
689 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
690 new TypeError('Invalid worker node tasks queue size: must be an integer')
691 )
692 await pool.destroy()
693 })
694
695 it('Verify that pool info is set', async () => {
696 let pool = new FixedThreadPool(
697 numberOfWorkers,
698 './tests/worker-files/thread/testWorker.mjs'
699 )
700 expect(pool.info).toStrictEqual({
701 version,
702 type: PoolTypes.fixed,
703 worker: WorkerTypes.thread,
704 started: true,
705 ready: true,
706 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
707 strategyRetries: 0,
708 minSize: numberOfWorkers,
709 maxSize: numberOfWorkers,
710 workerNodes: numberOfWorkers,
711 idleWorkerNodes: numberOfWorkers,
712 busyWorkerNodes: 0,
713 executedTasks: 0,
714 executingTasks: 0,
715 failedTasks: 0
716 })
717 await pool.destroy()
718 pool = new DynamicClusterPool(
719 Math.floor(numberOfWorkers / 2),
720 numberOfWorkers,
721 './tests/worker-files/cluster/testWorker.cjs'
722 )
723 expect(pool.info).toStrictEqual({
724 version,
725 type: PoolTypes.dynamic,
726 worker: WorkerTypes.cluster,
727 started: true,
728 ready: true,
729 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
730 strategyRetries: 0,
731 minSize: Math.floor(numberOfWorkers / 2),
732 maxSize: numberOfWorkers,
733 workerNodes: Math.floor(numberOfWorkers / 2),
734 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
735 busyWorkerNodes: 0,
736 executedTasks: 0,
737 executingTasks: 0,
738 failedTasks: 0
739 })
740 await pool.destroy()
741 })
742
743 it('Verify that pool worker tasks usage are initialized', async () => {
744 const pool = new FixedClusterPool(
745 numberOfWorkers,
746 './tests/worker-files/cluster/testWorker.cjs'
747 )
748 for (const workerNode of pool.workerNodes) {
749 expect(workerNode).toBeInstanceOf(WorkerNode)
750 expect(workerNode.usage).toStrictEqual({
751 tasks: {
752 executed: 0,
753 executing: 0,
754 queued: 0,
755 maxQueued: 0,
756 sequentiallyStolen: 0,
757 stolen: 0,
758 failed: 0
759 },
760 runTime: {
761 history: new CircularArray()
762 },
763 waitTime: {
764 history: new CircularArray()
765 },
766 elu: {
767 idle: {
768 history: new CircularArray()
769 },
770 active: {
771 history: new CircularArray()
772 }
773 }
774 })
775 }
776 await pool.destroy()
777 })
778
779 it('Verify that pool worker tasks queue are initialized', async () => {
780 let pool = new FixedClusterPool(
781 numberOfWorkers,
782 './tests/worker-files/cluster/testWorker.cjs'
783 )
784 for (const workerNode of pool.workerNodes) {
785 expect(workerNode).toBeInstanceOf(WorkerNode)
786 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
787 expect(workerNode.tasksQueue.size).toBe(0)
788 expect(workerNode.tasksQueue.maxSize).toBe(0)
789 }
790 await pool.destroy()
791 pool = new DynamicThreadPool(
792 Math.floor(numberOfWorkers / 2),
793 numberOfWorkers,
794 './tests/worker-files/thread/testWorker.mjs'
795 )
796 for (const workerNode of pool.workerNodes) {
797 expect(workerNode).toBeInstanceOf(WorkerNode)
798 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
799 expect(workerNode.tasksQueue.size).toBe(0)
800 expect(workerNode.tasksQueue.maxSize).toBe(0)
801 }
802 await pool.destroy()
803 })
804
805 it('Verify that pool worker info are initialized', async () => {
806 let pool = new FixedClusterPool(
807 numberOfWorkers,
808 './tests/worker-files/cluster/testWorker.cjs'
809 )
810 for (const workerNode of pool.workerNodes) {
811 expect(workerNode).toBeInstanceOf(WorkerNode)
812 expect(workerNode.info).toStrictEqual({
813 id: expect.any(Number),
814 type: WorkerTypes.cluster,
815 dynamic: false,
816 ready: true,
817 stealing: false
818 })
819 }
820 await pool.destroy()
821 pool = new DynamicThreadPool(
822 Math.floor(numberOfWorkers / 2),
823 numberOfWorkers,
824 './tests/worker-files/thread/testWorker.mjs'
825 )
826 for (const workerNode of pool.workerNodes) {
827 expect(workerNode).toBeInstanceOf(WorkerNode)
828 expect(workerNode.info).toStrictEqual({
829 id: expect.any(Number),
830 type: WorkerTypes.thread,
831 dynamic: false,
832 ready: true,
833 stealing: false
834 })
835 }
836 await pool.destroy()
837 })
838
839 it('Verify that pool statuses are checked at start or destroy', async () => {
840 const pool = new FixedThreadPool(
841 numberOfWorkers,
842 './tests/worker-files/thread/testWorker.mjs'
843 )
844 expect(pool.info.started).toBe(true)
845 expect(pool.info.ready).toBe(true)
846 expect(() => pool.start()).toThrow(
847 new Error('Cannot start an already started pool')
848 )
849 await pool.destroy()
850 expect(pool.info.started).toBe(false)
851 expect(pool.info.ready).toBe(false)
852 await expect(pool.destroy()).rejects.toThrow(
853 new Error('Cannot destroy an already destroyed pool')
854 )
855 })
856
857 it('Verify that pool can be started after initialization', async () => {
858 const pool = new FixedClusterPool(
859 numberOfWorkers,
860 './tests/worker-files/cluster/testWorker.cjs',
861 {
862 startWorkers: false
863 }
864 )
865 expect(pool.info.started).toBe(false)
866 expect(pool.info.ready).toBe(false)
867 expect(pool.workerNodes).toStrictEqual([])
868 expect(pool.readyEventEmitted).toBe(false)
869 await expect(pool.execute()).rejects.toThrow(
870 new Error('Cannot execute a task on not started pool')
871 )
872 pool.start()
873 expect(pool.info.started).toBe(true)
874 expect(pool.info.ready).toBe(true)
875 await waitPoolEvents(pool, PoolEvents.ready, 1)
876 expect(pool.readyEventEmitted).toBe(true)
877 expect(pool.workerNodes.length).toBe(numberOfWorkers)
878 for (const workerNode of pool.workerNodes) {
879 expect(workerNode).toBeInstanceOf(WorkerNode)
880 }
881 await pool.destroy()
882 })
883
884 it('Verify that pool execute() arguments are checked', async () => {
885 const pool = new FixedClusterPool(
886 numberOfWorkers,
887 './tests/worker-files/cluster/testWorker.cjs'
888 )
889 await expect(pool.execute(undefined, 0)).rejects.toThrow(
890 new TypeError('name argument must be a string')
891 )
892 await expect(pool.execute(undefined, '')).rejects.toThrow(
893 new TypeError('name argument must not be an empty string')
894 )
895 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
896 new TypeError('transferList argument must be an array')
897 )
898 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
899 "Task function 'unknown' not found"
900 )
901 await pool.destroy()
902 await expect(pool.execute()).rejects.toThrow(
903 new Error('Cannot execute a task on not started pool')
904 )
905 })
906
907 it('Verify that pool worker tasks usage are computed', async () => {
908 const pool = new FixedClusterPool(
909 numberOfWorkers,
910 './tests/worker-files/cluster/testWorker.cjs'
911 )
912 const promises = new Set()
913 const maxMultiplier = 2
914 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
915 promises.add(pool.execute())
916 }
917 for (const workerNode of pool.workerNodes) {
918 expect(workerNode.usage).toStrictEqual({
919 tasks: {
920 executed: 0,
921 executing: maxMultiplier,
922 queued: 0,
923 maxQueued: 0,
924 sequentiallyStolen: 0,
925 stolen: 0,
926 failed: 0
927 },
928 runTime: {
929 history: expect.any(CircularArray)
930 },
931 waitTime: {
932 history: expect.any(CircularArray)
933 },
934 elu: {
935 idle: {
936 history: expect.any(CircularArray)
937 },
938 active: {
939 history: expect.any(CircularArray)
940 }
941 }
942 })
943 }
944 await Promise.all(promises)
945 for (const workerNode of pool.workerNodes) {
946 expect(workerNode.usage).toStrictEqual({
947 tasks: {
948 executed: maxMultiplier,
949 executing: 0,
950 queued: 0,
951 maxQueued: 0,
952 sequentiallyStolen: 0,
953 stolen: 0,
954 failed: 0
955 },
956 runTime: {
957 history: expect.any(CircularArray)
958 },
959 waitTime: {
960 history: expect.any(CircularArray)
961 },
962 elu: {
963 idle: {
964 history: expect.any(CircularArray)
965 },
966 active: {
967 history: expect.any(CircularArray)
968 }
969 }
970 })
971 }
972 await pool.destroy()
973 })
974
975 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
976 const pool = new DynamicThreadPool(
977 Math.floor(numberOfWorkers / 2),
978 numberOfWorkers,
979 './tests/worker-files/thread/testWorker.mjs'
980 )
981 const promises = new Set()
982 const maxMultiplier = 2
983 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
984 promises.add(pool.execute())
985 }
986 await Promise.all(promises)
987 for (const workerNode of pool.workerNodes) {
988 expect(workerNode.usage).toStrictEqual({
989 tasks: {
990 executed: expect.any(Number),
991 executing: 0,
992 queued: 0,
993 maxQueued: 0,
994 sequentiallyStolen: 0,
995 stolen: 0,
996 failed: 0
997 },
998 runTime: {
999 history: expect.any(CircularArray)
1000 },
1001 waitTime: {
1002 history: expect.any(CircularArray)
1003 },
1004 elu: {
1005 idle: {
1006 history: expect.any(CircularArray)
1007 },
1008 active: {
1009 history: expect.any(CircularArray)
1010 }
1011 }
1012 })
1013 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1014 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1015 numberOfWorkers * maxMultiplier
1016 )
1017 expect(workerNode.usage.runTime.history.length).toBe(0)
1018 expect(workerNode.usage.waitTime.history.length).toBe(0)
1019 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1020 expect(workerNode.usage.elu.active.history.length).toBe(0)
1021 }
1022 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1023 for (const workerNode of pool.workerNodes) {
1024 expect(workerNode.usage).toStrictEqual({
1025 tasks: {
1026 executed: 0,
1027 executing: 0,
1028 queued: 0,
1029 maxQueued: 0,
1030 sequentiallyStolen: 0,
1031 stolen: 0,
1032 failed: 0
1033 },
1034 runTime: {
1035 history: expect.any(CircularArray)
1036 },
1037 waitTime: {
1038 history: expect.any(CircularArray)
1039 },
1040 elu: {
1041 idle: {
1042 history: expect.any(CircularArray)
1043 },
1044 active: {
1045 history: expect.any(CircularArray)
1046 }
1047 }
1048 })
1049 expect(workerNode.usage.runTime.history.length).toBe(0)
1050 expect(workerNode.usage.waitTime.history.length).toBe(0)
1051 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1052 expect(workerNode.usage.elu.active.history.length).toBe(0)
1053 }
1054 await pool.destroy()
1055 })
1056
1057 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1058 const pool = new DynamicClusterPool(
1059 Math.floor(numberOfWorkers / 2),
1060 numberOfWorkers,
1061 './tests/worker-files/cluster/testWorker.cjs'
1062 )
1063 expect(pool.emitter.eventNames()).toStrictEqual([])
1064 let poolInfo
1065 let poolReady = 0
1066 pool.emitter.on(PoolEvents.ready, info => {
1067 ++poolReady
1068 poolInfo = info
1069 })
1070 await waitPoolEvents(pool, PoolEvents.ready, 1)
1071 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1072 expect(poolReady).toBe(1)
1073 expect(poolInfo).toStrictEqual({
1074 version,
1075 type: PoolTypes.dynamic,
1076 worker: WorkerTypes.cluster,
1077 started: true,
1078 ready: true,
1079 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1080 strategyRetries: expect.any(Number),
1081 minSize: expect.any(Number),
1082 maxSize: expect.any(Number),
1083 workerNodes: expect.any(Number),
1084 idleWorkerNodes: expect.any(Number),
1085 busyWorkerNodes: expect.any(Number),
1086 executedTasks: expect.any(Number),
1087 executingTasks: expect.any(Number),
1088 failedTasks: expect.any(Number)
1089 })
1090 await pool.destroy()
1091 })
1092
1093 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1094 const pool = new FixedThreadPool(
1095 numberOfWorkers,
1096 './tests/worker-files/thread/testWorker.mjs'
1097 )
1098 expect(pool.emitter.eventNames()).toStrictEqual([])
1099 const promises = new Set()
1100 let poolBusy = 0
1101 let poolInfo
1102 pool.emitter.on(PoolEvents.busy, info => {
1103 ++poolBusy
1104 poolInfo = info
1105 })
1106 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1107 for (let i = 0; i < numberOfWorkers * 2; i++) {
1108 promises.add(pool.execute())
1109 }
1110 await Promise.all(promises)
1111 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1112 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1113 expect(poolBusy).toBe(numberOfWorkers + 1)
1114 expect(poolInfo).toStrictEqual({
1115 version,
1116 type: PoolTypes.fixed,
1117 worker: WorkerTypes.thread,
1118 started: true,
1119 ready: true,
1120 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1121 strategyRetries: expect.any(Number),
1122 minSize: expect.any(Number),
1123 maxSize: expect.any(Number),
1124 workerNodes: expect.any(Number),
1125 idleWorkerNodes: expect.any(Number),
1126 busyWorkerNodes: expect.any(Number),
1127 executedTasks: expect.any(Number),
1128 executingTasks: expect.any(Number),
1129 failedTasks: expect.any(Number)
1130 })
1131 await pool.destroy()
1132 })
1133
1134 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1135 const pool = new DynamicThreadPool(
1136 Math.floor(numberOfWorkers / 2),
1137 numberOfWorkers,
1138 './tests/worker-files/thread/testWorker.mjs'
1139 )
1140 expect(pool.emitter.eventNames()).toStrictEqual([])
1141 const promises = new Set()
1142 let poolFull = 0
1143 let poolInfo
1144 pool.emitter.on(PoolEvents.full, info => {
1145 ++poolFull
1146 poolInfo = info
1147 })
1148 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1149 for (let i = 0; i < numberOfWorkers * 2; i++) {
1150 promises.add(pool.execute())
1151 }
1152 await Promise.all(promises)
1153 expect(poolFull).toBe(1)
1154 expect(poolInfo).toStrictEqual({
1155 version,
1156 type: PoolTypes.dynamic,
1157 worker: WorkerTypes.thread,
1158 started: true,
1159 ready: true,
1160 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1161 strategyRetries: expect.any(Number),
1162 minSize: expect.any(Number),
1163 maxSize: expect.any(Number),
1164 workerNodes: expect.any(Number),
1165 idleWorkerNodes: expect.any(Number),
1166 busyWorkerNodes: expect.any(Number),
1167 executedTasks: expect.any(Number),
1168 executingTasks: expect.any(Number),
1169 failedTasks: expect.any(Number)
1170 })
1171 await pool.destroy()
1172 })
1173
1174 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1175 const pool = new FixedThreadPool(
1176 numberOfWorkers,
1177 './tests/worker-files/thread/testWorker.mjs',
1178 {
1179 enableTasksQueue: true
1180 }
1181 )
1182 stub(pool, 'hasBackPressure').returns(true)
1183 expect(pool.emitter.eventNames()).toStrictEqual([])
1184 const promises = new Set()
1185 let poolBackPressure = 0
1186 let poolInfo
1187 pool.emitter.on(PoolEvents.backPressure, info => {
1188 ++poolBackPressure
1189 poolInfo = info
1190 })
1191 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1192 for (let i = 0; i < numberOfWorkers + 1; i++) {
1193 promises.add(pool.execute())
1194 }
1195 await Promise.all(promises)
1196 expect(poolBackPressure).toBe(1)
1197 expect(poolInfo).toStrictEqual({
1198 version,
1199 type: PoolTypes.fixed,
1200 worker: WorkerTypes.thread,
1201 started: true,
1202 ready: true,
1203 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1204 strategyRetries: expect.any(Number),
1205 minSize: expect.any(Number),
1206 maxSize: expect.any(Number),
1207 workerNodes: expect.any(Number),
1208 idleWorkerNodes: expect.any(Number),
1209 stealingWorkerNodes: expect.any(Number),
1210 busyWorkerNodes: expect.any(Number),
1211 executedTasks: expect.any(Number),
1212 executingTasks: expect.any(Number),
1213 maxQueuedTasks: expect.any(Number),
1214 queuedTasks: expect.any(Number),
1215 backPressure: true,
1216 stolenTasks: expect.any(Number),
1217 failedTasks: expect.any(Number)
1218 })
1219 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1220 await pool.destroy()
1221 })
1222
1223 it('Verify that destroy() waits for queued tasks to finish', async () => {
1224 const tasksFinishedTimeout = 2500
1225 const pool = new FixedThreadPool(
1226 numberOfWorkers,
1227 './tests/worker-files/thread/asyncWorker.mjs',
1228 {
1229 enableTasksQueue: true,
1230 tasksQueueOptions: { tasksFinishedTimeout }
1231 }
1232 )
1233 const maxMultiplier = 4
1234 let tasksFinished = 0
1235 for (const workerNode of pool.workerNodes) {
1236 workerNode.on('taskFinished', () => {
1237 ++tasksFinished
1238 })
1239 }
1240 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1241 pool.execute()
1242 }
1243 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1244 const startTime = performance.now()
1245 await pool.destroy()
1246 const elapsedTime = performance.now() - startTime
1247 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1248 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1249 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1250 })
1251
1252 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1253 const tasksFinishedTimeout = 1000
1254 const pool = new FixedThreadPool(
1255 numberOfWorkers,
1256 './tests/worker-files/thread/asyncWorker.mjs',
1257 {
1258 enableTasksQueue: true,
1259 tasksQueueOptions: { tasksFinishedTimeout }
1260 }
1261 )
1262 const maxMultiplier = 4
1263 let tasksFinished = 0
1264 for (const workerNode of pool.workerNodes) {
1265 workerNode.on('taskFinished', () => {
1266 ++tasksFinished
1267 })
1268 }
1269 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1270 pool.execute()
1271 }
1272 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1273 const startTime = performance.now()
1274 await pool.destroy()
1275 const elapsedTime = performance.now() - startTime
1276 expect(tasksFinished).toBe(0)
1277 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1278 })
1279
1280 it('Verify that pool asynchronous resource track tasks execution', async () => {
1281 let taskAsyncId
1282 let initCalls = 0
1283 let beforeCalls = 0
1284 let afterCalls = 0
1285 let resolveCalls = 0
1286 const hook = createHook({
1287 init (asyncId, type) {
1288 if (type === 'poolifier:task') {
1289 initCalls++
1290 taskAsyncId = asyncId
1291 }
1292 },
1293 before (asyncId) {
1294 if (asyncId === taskAsyncId) beforeCalls++
1295 },
1296 after (asyncId) {
1297 if (asyncId === taskAsyncId) afterCalls++
1298 },
1299 promiseResolve () {
1300 if (executionAsyncId() === taskAsyncId) resolveCalls++
1301 }
1302 })
1303 const pool = new FixedThreadPool(
1304 numberOfWorkers,
1305 './tests/worker-files/thread/testWorker.mjs'
1306 )
1307 hook.enable()
1308 await pool.execute()
1309 hook.disable()
1310 expect(initCalls).toBe(1)
1311 expect(beforeCalls).toBe(1)
1312 expect(afterCalls).toBe(1)
1313 expect(resolveCalls).toBe(1)
1314 await pool.destroy()
1315 })
1316
1317 it('Verify that hasTaskFunction() is working', async () => {
1318 const dynamicThreadPool = new DynamicThreadPool(
1319 Math.floor(numberOfWorkers / 2),
1320 numberOfWorkers,
1321 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1322 )
1323 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1324 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1325 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1326 true
1327 )
1328 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1329 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1330 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1331 await dynamicThreadPool.destroy()
1332 const fixedClusterPool = new FixedClusterPool(
1333 numberOfWorkers,
1334 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1335 )
1336 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1337 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1338 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1339 true
1340 )
1341 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1342 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1343 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1344 await fixedClusterPool.destroy()
1345 })
1346
1347 it('Verify that addTaskFunction() is working', async () => {
1348 const dynamicThreadPool = new DynamicThreadPool(
1349 Math.floor(numberOfWorkers / 2),
1350 numberOfWorkers,
1351 './tests/worker-files/thread/testWorker.mjs'
1352 )
1353 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1354 await expect(
1355 dynamicThreadPool.addTaskFunction(0, () => {})
1356 ).rejects.toThrow(new TypeError('name argument must be a string'))
1357 await expect(
1358 dynamicThreadPool.addTaskFunction('', () => {})
1359 ).rejects.toThrow(
1360 new TypeError('name argument must not be an empty string')
1361 )
1362 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1363 new TypeError('fn argument must be a function')
1364 )
1365 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1366 new TypeError('fn argument must be a function')
1367 )
1368 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1369 DEFAULT_TASK_NAME,
1370 'test'
1371 ])
1372 const echoTaskFunction = data => {
1373 return data
1374 }
1375 await expect(
1376 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1377 ).resolves.toBe(true)
1378 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1379 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1380 echoTaskFunction
1381 )
1382 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1383 DEFAULT_TASK_NAME,
1384 'test',
1385 'echo'
1386 ])
1387 const taskFunctionData = { test: 'test' }
1388 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1389 expect(echoResult).toStrictEqual(taskFunctionData)
1390 for (const workerNode of dynamicThreadPool.workerNodes) {
1391 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1392 tasks: {
1393 executed: expect.any(Number),
1394 executing: 0,
1395 queued: 0,
1396 sequentiallyStolen: 0,
1397 stolen: 0,
1398 failed: 0
1399 },
1400 runTime: {
1401 history: new CircularArray()
1402 },
1403 waitTime: {
1404 history: new CircularArray()
1405 },
1406 elu: {
1407 idle: {
1408 history: new CircularArray()
1409 },
1410 active: {
1411 history: new CircularArray()
1412 }
1413 }
1414 })
1415 }
1416 await dynamicThreadPool.destroy()
1417 })
1418
1419 it('Verify that removeTaskFunction() is working', async () => {
1420 const dynamicThreadPool = new DynamicThreadPool(
1421 Math.floor(numberOfWorkers / 2),
1422 numberOfWorkers,
1423 './tests/worker-files/thread/testWorker.mjs'
1424 )
1425 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1426 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1427 DEFAULT_TASK_NAME,
1428 'test'
1429 ])
1430 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1431 new Error('Cannot remove a task function not handled on the pool side')
1432 )
1433 const echoTaskFunction = data => {
1434 return data
1435 }
1436 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1437 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1438 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1439 echoTaskFunction
1440 )
1441 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1442 DEFAULT_TASK_NAME,
1443 'test',
1444 'echo'
1445 ])
1446 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1447 true
1448 )
1449 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1450 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1451 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1452 DEFAULT_TASK_NAME,
1453 'test'
1454 ])
1455 await dynamicThreadPool.destroy()
1456 })
1457
1458 it('Verify that listTaskFunctionNames() is working', async () => {
1459 const dynamicThreadPool = new DynamicThreadPool(
1460 Math.floor(numberOfWorkers / 2),
1461 numberOfWorkers,
1462 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1463 )
1464 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1465 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1466 DEFAULT_TASK_NAME,
1467 'jsonIntegerSerialization',
1468 'factorial',
1469 'fibonacci'
1470 ])
1471 await dynamicThreadPool.destroy()
1472 const fixedClusterPool = new FixedClusterPool(
1473 numberOfWorkers,
1474 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1475 )
1476 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1477 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1478 DEFAULT_TASK_NAME,
1479 'jsonIntegerSerialization',
1480 'factorial',
1481 'fibonacci'
1482 ])
1483 await fixedClusterPool.destroy()
1484 })
1485
1486 it('Verify that setDefaultTaskFunction() is working', async () => {
1487 const dynamicThreadPool = new DynamicThreadPool(
1488 Math.floor(numberOfWorkers / 2),
1489 numberOfWorkers,
1490 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1491 )
1492 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1493 const workerId = dynamicThreadPool.workerNodes[0].info.id
1494 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1495 new Error(
1496 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1497 )
1498 )
1499 await expect(
1500 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1501 ).rejects.toThrow(
1502 new Error(
1503 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1504 )
1505 )
1506 await expect(
1507 dynamicThreadPool.setDefaultTaskFunction('unknown')
1508 ).rejects.toThrow(
1509 new Error(
1510 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1511 )
1512 )
1513 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1514 DEFAULT_TASK_NAME,
1515 'jsonIntegerSerialization',
1516 'factorial',
1517 'fibonacci'
1518 ])
1519 await expect(
1520 dynamicThreadPool.setDefaultTaskFunction('factorial')
1521 ).resolves.toBe(true)
1522 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1523 DEFAULT_TASK_NAME,
1524 'factorial',
1525 'jsonIntegerSerialization',
1526 'fibonacci'
1527 ])
1528 await expect(
1529 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1530 ).resolves.toBe(true)
1531 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1532 DEFAULT_TASK_NAME,
1533 'fibonacci',
1534 'jsonIntegerSerialization',
1535 'factorial'
1536 ])
1537 await dynamicThreadPool.destroy()
1538 })
1539
1540 it('Verify that multiple task functions worker is working', async () => {
1541 const pool = new DynamicClusterPool(
1542 Math.floor(numberOfWorkers / 2),
1543 numberOfWorkers,
1544 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1545 )
1546 const data = { n: 10 }
1547 const result0 = await pool.execute(data)
1548 expect(result0).toStrictEqual({ ok: 1 })
1549 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1550 expect(result1).toStrictEqual({ ok: 1 })
1551 const result2 = await pool.execute(data, 'factorial')
1552 expect(result2).toBe(3628800)
1553 const result3 = await pool.execute(data, 'fibonacci')
1554 expect(result3).toBe(55)
1555 expect(pool.info.executingTasks).toBe(0)
1556 expect(pool.info.executedTasks).toBe(4)
1557 for (const workerNode of pool.workerNodes) {
1558 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1559 DEFAULT_TASK_NAME,
1560 'jsonIntegerSerialization',
1561 'factorial',
1562 'fibonacci'
1563 ])
1564 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1565 for (const name of pool.listTaskFunctionNames()) {
1566 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1567 tasks: {
1568 executed: expect.any(Number),
1569 executing: 0,
1570 failed: 0,
1571 queued: 0,
1572 sequentiallyStolen: 0,
1573 stolen: 0
1574 },
1575 runTime: {
1576 history: expect.any(CircularArray)
1577 },
1578 waitTime: {
1579 history: expect.any(CircularArray)
1580 },
1581 elu: {
1582 idle: {
1583 history: expect.any(CircularArray)
1584 },
1585 active: {
1586 history: expect.any(CircularArray)
1587 }
1588 }
1589 })
1590 expect(
1591 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1592 ).toBeGreaterThan(0)
1593 }
1594 expect(
1595 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1596 ).toStrictEqual(
1597 workerNode.getTaskFunctionWorkerUsage(
1598 workerNode.info.taskFunctionNames[1]
1599 )
1600 )
1601 }
1602 await pool.destroy()
1603 })
1604
1605 it('Verify sendKillMessageToWorker()', async () => {
1606 const pool = new DynamicClusterPool(
1607 Math.floor(numberOfWorkers / 2),
1608 numberOfWorkers,
1609 './tests/worker-files/cluster/testWorker.cjs'
1610 )
1611 const workerNodeKey = 0
1612 await expect(
1613 pool.sendKillMessageToWorker(workerNodeKey)
1614 ).resolves.toBeUndefined()
1615 await pool.destroy()
1616 })
1617
1618 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1619 const pool = new DynamicClusterPool(
1620 Math.floor(numberOfWorkers / 2),
1621 numberOfWorkers,
1622 './tests/worker-files/cluster/testWorker.cjs'
1623 )
1624 const workerNodeKey = 0
1625 await expect(
1626 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1627 taskFunctionOperation: 'add',
1628 taskFunctionName: 'empty',
1629 taskFunction: (() => {}).toString()
1630 })
1631 ).resolves.toBe(true)
1632 expect(
1633 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1634 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1635 await pool.destroy()
1636 })
1637
1638 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1639 const pool = new DynamicClusterPool(
1640 Math.floor(numberOfWorkers / 2),
1641 numberOfWorkers,
1642 './tests/worker-files/cluster/testWorker.cjs'
1643 )
1644 await expect(
1645 pool.sendTaskFunctionOperationToWorkers({
1646 taskFunctionOperation: 'add',
1647 taskFunctionName: 'empty',
1648 taskFunction: (() => {}).toString()
1649 })
1650 ).resolves.toBe(true)
1651 for (const workerNode of pool.workerNodes) {
1652 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1653 DEFAULT_TASK_NAME,
1654 'test',
1655 'empty'
1656 ])
1657 }
1658 await pool.destroy()
1659 })
1660})