1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
11 import { CircularArray } from '../../lib/circular-array.cjs'
12 import { Deque } from '../../lib/deque.cjs'
20 WorkerChoiceStrategies,
22 } from '../../lib/index.cjs'
23 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
48 './tests/worker-files/thread/testWorker.mjs'
50 expect(pool).toBeInstanceOf(FixedThreadPool)
54 it('Verify that pool cannot be created from a non main thread/process', () => {
57 new StubPoolWithIsMain(
59 './tests/worker-files/thread/testWorker.mjs',
61 errorHandler: e => console.error(e)
66 'Cannot start a pool from a worker with the same type as the pool'
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
74 './tests/worker-files/thread/testWorker.mjs'
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
94 it('Verify that numberOfWorkers is checked', () => {
99 './tests/worker-files/thread/testWorker.mjs'
103 'Cannot instantiate a pool without specifying the number of workers'
108 it('Verify that a negative number of workers is checked', () => {
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
114 'Cannot instantiate a pool with a negative number of workers'
119 it('Verify that a non integer number of workers is checked', () => {
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
125 'Cannot instantiate a pool with a non safe integer number of workers'
130 it('Verify that pool arguments number and pool type are checked', () => {
135 './tests/worker-files/thread/testWorker.mjs',
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
146 it('Verify that dynamic pool sizing is checked', () => {
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.cjs'
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a pool with a non safe integer number of workers'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.cjs'
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 new DynamicThreadPool(
188 './tests/worker-files/thread/testWorker.mjs'
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 new DynamicThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
209 new DynamicClusterPool(
212 './tests/worker-files/cluster/testWorker.cjs'
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
224 './tests/worker-files/thread/testWorker.mjs'
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.mjs',
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 }
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
276 size: Math.pow(numberOfWorkers, 2),
278 tasksStealingOnBackPressure: true,
279 tasksFinishedTimeout: 2000
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 }
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: { weights: {} }
325 'Invalid worker choice strategy options: must have a weight for each worker node'
332 './tests/worker-files/thread/testWorker.mjs',
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 './tests/worker-files/thread/testWorker.mjs',
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
353 new TypeError('Invalid tasks queue options: must be a plain object')
359 './tests/worker-files/thread/testWorker.mjs',
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 }
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.mjs',
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.mjs',
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
463 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false }
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
541 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
563 'Invalid worker choice strategy options: must be a plain object'
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
584 './tests/worker-files/thread/testWorker.mjs'
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 size: Math.pow(numberOfWorkers, 2),
594 tasksStealingOnBackPressure: true,
595 tasksFinishedTimeout: 2000
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 size: Math.pow(numberOfWorkers, 2),
603 tasksStealingOnBackPressure: true,
604 tasksFinishedTimeout: 2000
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 size: Math.pow(numberOfWorkers, 2),
622 tasksStealingOnBackPressure: true,
623 tasksFinishedTimeout: 2000
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
630 pool.setTasksQueueOptions({
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
649 pool.setTasksQueueOptions({
652 tasksStealingOnBackPressure: true
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
701 './tests/worker-files/thread/testWorker.mjs'
703 expect(pool.info).toStrictEqual({
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
709 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
724 './tests/worker-files/cluster/testWorker.cjs'
726 expect(pool.info).toStrictEqual({
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
732 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
749 './tests/worker-files/cluster/testWorker.cjs'
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
759 sequentiallyStolen: 0,
764 history: new CircularArray()
767 history: new CircularArray()
771 history: new CircularArray()
774 history: new CircularArray()
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
785 './tests/worker-files/cluster/testWorker.cjs'
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
794 pool = new DynamicThreadPool(
795 Math.floor(numberOfWorkers / 2),
797 './tests/worker-files/thread/testWorker.mjs'
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode).toBeInstanceOf(WorkerNode)
801 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
802 expect(workerNode.tasksQueue.size).toBe(0)
803 expect(workerNode.tasksQueue.maxSize).toBe(0)
808 it('Verify that pool worker info are initialized', async () => {
809 let pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.cjs'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.info).toStrictEqual({
816 id: expect.any(Number),
817 type: WorkerTypes.cluster,
824 pool = new DynamicThreadPool(
825 Math.floor(numberOfWorkers / 2),
827 './tests/worker-files/thread/testWorker.mjs'
829 for (const workerNode of pool.workerNodes) {
830 expect(workerNode).toBeInstanceOf(WorkerNode)
831 expect(workerNode.info).toStrictEqual({
832 id: expect.any(Number),
833 type: WorkerTypes.thread,
842 it('Verify that pool statuses are checked at start or destroy', async () => {
843 const pool = new FixedThreadPool(
845 './tests/worker-files/thread/testWorker.mjs'
847 expect(pool.info.started).toBe(true)
848 expect(pool.info.ready).toBe(true)
849 expect(() => pool.start()).toThrow(
850 new Error('Cannot start an already started pool')
853 expect(pool.info.started).toBe(false)
854 expect(pool.info.ready).toBe(false)
855 await expect(pool.destroy()).rejects.toThrow(
856 new Error('Cannot destroy an already destroyed pool')
860 it('Verify that pool can be started after initialization', async () => {
861 const pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.cjs',
868 expect(pool.info.started).toBe(false)
869 expect(pool.info.ready).toBe(false)
870 expect(pool.workerNodes).toStrictEqual([])
871 expect(pool.readyEventEmitted).toBe(false)
872 await expect(pool.execute()).rejects.toThrow(
873 new Error('Cannot execute a task on not started pool')
876 expect(pool.info.started).toBe(true)
877 expect(pool.info.ready).toBe(true)
878 await waitPoolEvents(pool, PoolEvents.ready, 1)
879 expect(pool.readyEventEmitted).toBe(true)
880 expect(pool.workerNodes.length).toBe(numberOfWorkers)
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
887 it('Verify that pool execute() arguments are checked', async () => {
888 const pool = new FixedClusterPool(
890 './tests/worker-files/cluster/testWorker.cjs'
892 await expect(pool.execute(undefined, 0)).rejects.toThrow(
893 new TypeError('name argument must be a string')
895 await expect(pool.execute(undefined, '')).rejects.toThrow(
896 new TypeError('name argument must not be an empty string')
898 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
899 new TypeError('transferList argument must be an array')
901 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
902 "Task function 'unknown' not found"
905 await expect(pool.execute()).rejects.toThrow(
906 new Error('Cannot execute a task on not started pool')
910 it('Verify that pool worker tasks usage are computed', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.cjs'
915 const promises = new Set()
916 const maxMultiplier = 2
917 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
918 promises.add(pool.execute())
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode.usage).toStrictEqual({
924 executing: maxMultiplier,
927 sequentiallyStolen: 0,
932 history: expect.any(CircularArray)
935 history: expect.any(CircularArray)
939 history: expect.any(CircularArray)
942 history: expect.any(CircularArray)
947 await Promise.all(promises)
948 for (const workerNode of pool.workerNodes) {
949 expect(workerNode.usage).toStrictEqual({
951 executed: maxMultiplier,
955 sequentiallyStolen: 0,
960 history: expect.any(CircularArray)
963 history: expect.any(CircularArray)
967 history: expect.any(CircularArray)
970 history: expect.any(CircularArray)
978 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
979 const pool = new DynamicThreadPool(
980 Math.floor(numberOfWorkers / 2),
982 './tests/worker-files/thread/testWorker.mjs'
984 const promises = new Set()
985 const maxMultiplier = 2
986 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
987 promises.add(pool.execute())
989 await Promise.all(promises)
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
993 executed: expect.any(Number),
997 sequentiallyStolen: 0,
1002 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1016 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1017 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1018 numberOfWorkers * maxMultiplier
1020 expect(workerNode.usage.runTime.history.length).toBe(0)
1021 expect(workerNode.usage.waitTime.history.length).toBe(0)
1022 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1023 expect(workerNode.usage.elu.active.history.length).toBe(0)
1025 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1026 for (const workerNode of pool.workerNodes) {
1027 expect(workerNode.usage).toStrictEqual({
1033 sequentiallyStolen: 0,
1038 history: expect.any(CircularArray)
1041 history: expect.any(CircularArray)
1045 history: expect.any(CircularArray)
1048 history: expect.any(CircularArray)
1052 expect(workerNode.usage.runTime.history.length).toBe(0)
1053 expect(workerNode.usage.waitTime.history.length).toBe(0)
1054 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1055 expect(workerNode.usage.elu.active.history.length).toBe(0)
1057 await pool.destroy()
1060 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1061 const pool = new DynamicClusterPool(
1062 Math.floor(numberOfWorkers / 2),
1064 './tests/worker-files/cluster/testWorker.cjs'
1066 expect(pool.emitter.eventNames()).toStrictEqual([])
1069 pool.emitter.on(PoolEvents.ready, info => {
1073 await waitPoolEvents(pool, PoolEvents.ready, 1)
1074 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1075 expect(poolReady).toBe(1)
1076 expect(poolInfo).toStrictEqual({
1078 type: PoolTypes.dynamic,
1079 worker: WorkerTypes.cluster,
1082 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1083 strategyRetries: expect.any(Number),
1084 minSize: expect.any(Number),
1085 maxSize: expect.any(Number),
1086 workerNodes: expect.any(Number),
1087 idleWorkerNodes: expect.any(Number),
1088 busyWorkerNodes: expect.any(Number),
1089 executedTasks: expect.any(Number),
1090 executingTasks: expect.any(Number),
1091 failedTasks: expect.any(Number)
1093 await pool.destroy()
1096 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1097 const pool = new FixedThreadPool(
1099 './tests/worker-files/thread/testWorker.mjs'
1101 expect(pool.emitter.eventNames()).toStrictEqual([])
1102 const promises = new Set()
1105 pool.emitter.on(PoolEvents.busy, info => {
1109 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1110 for (let i = 0; i < numberOfWorkers * 2; i++) {
1111 promises.add(pool.execute())
1113 await Promise.all(promises)
1114 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1115 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1116 expect(poolBusy).toBe(numberOfWorkers + 1)
1117 expect(poolInfo).toStrictEqual({
1119 type: PoolTypes.fixed,
1120 worker: WorkerTypes.thread,
1123 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1124 strategyRetries: expect.any(Number),
1125 minSize: expect.any(Number),
1126 maxSize: expect.any(Number),
1127 workerNodes: expect.any(Number),
1128 idleWorkerNodes: expect.any(Number),
1129 busyWorkerNodes: expect.any(Number),
1130 executedTasks: expect.any(Number),
1131 executingTasks: expect.any(Number),
1132 failedTasks: expect.any(Number)
1134 await pool.destroy()
1137 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1138 const pool = new DynamicThreadPool(
1139 Math.floor(numberOfWorkers / 2),
1141 './tests/worker-files/thread/testWorker.mjs'
1143 expect(pool.emitter.eventNames()).toStrictEqual([])
1144 const promises = new Set()
1147 pool.emitter.on(PoolEvents.full, info => {
1151 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1152 for (let i = 0; i < numberOfWorkers * 2; i++) {
1153 promises.add(pool.execute())
1155 await Promise.all(promises)
1156 expect(poolFull).toBe(1)
1157 expect(poolInfo).toStrictEqual({
1159 type: PoolTypes.dynamic,
1160 worker: WorkerTypes.thread,
1163 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1164 strategyRetries: expect.any(Number),
1165 minSize: expect.any(Number),
1166 maxSize: expect.any(Number),
1167 workerNodes: expect.any(Number),
1168 idleWorkerNodes: expect.any(Number),
1169 busyWorkerNodes: expect.any(Number),
1170 executedTasks: expect.any(Number),
1171 executingTasks: expect.any(Number),
1172 failedTasks: expect.any(Number)
1174 await pool.destroy()
1177 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1178 const pool = new FixedThreadPool(
1180 './tests/worker-files/thread/testWorker.mjs',
1182 enableTasksQueue: true
1185 stub(pool, 'hasBackPressure').returns(true)
1186 expect(pool.emitter.eventNames()).toStrictEqual([])
1187 const promises = new Set()
1188 let poolBackPressure = 0
1190 pool.emitter.on(PoolEvents.backPressure, info => {
1194 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1195 for (let i = 0; i < numberOfWorkers + 1; i++) {
1196 promises.add(pool.execute())
1198 await Promise.all(promises)
1199 expect(poolBackPressure).toBe(1)
1200 expect(poolInfo).toStrictEqual({
1202 type: PoolTypes.fixed,
1203 worker: WorkerTypes.thread,
1206 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1207 strategyRetries: expect.any(Number),
1208 minSize: expect.any(Number),
1209 maxSize: expect.any(Number),
1210 workerNodes: expect.any(Number),
1211 idleWorkerNodes: expect.any(Number),
1212 stealingWorkerNodes: expect.any(Number),
1213 busyWorkerNodes: expect.any(Number),
1214 executedTasks: expect.any(Number),
1215 executingTasks: expect.any(Number),
1216 maxQueuedTasks: expect.any(Number),
1217 queuedTasks: expect.any(Number),
1219 stolenTasks: expect.any(Number),
1220 failedTasks: expect.any(Number)
1222 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1223 await pool.destroy()
1226 it('Verify that destroy() waits for queued tasks to finish', async () => {
1227 const tasksFinishedTimeout = 2500
1228 const pool = new FixedThreadPool(
1230 './tests/worker-files/thread/asyncWorker.mjs',
1232 enableTasksQueue: true,
1233 tasksQueueOptions: { tasksFinishedTimeout }
1236 const maxMultiplier = 4
1237 let tasksFinished = 0
1238 for (const workerNode of pool.workerNodes) {
1239 workerNode.on('taskFinished', () => {
1243 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1246 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1247 const startTime = performance.now()
1248 await pool.destroy()
1249 const elapsedTime = performance.now() - startTime
1250 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1251 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1252 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1255 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1256 const tasksFinishedTimeout = 1000
1257 const pool = new FixedThreadPool(
1259 './tests/worker-files/thread/asyncWorker.mjs',
1261 enableTasksQueue: true,
1262 tasksQueueOptions: { tasksFinishedTimeout }
1265 const maxMultiplier = 4
1266 let tasksFinished = 0
1267 for (const workerNode of pool.workerNodes) {
1268 workerNode.on('taskFinished', () => {
1272 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1275 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1276 const startTime = performance.now()
1277 await pool.destroy()
1278 const elapsedTime = performance.now() - startTime
1279 expect(tasksFinished).toBe(0)
1280 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1283 it('Verify that pool asynchronous resource track tasks execution', async () => {
1288 let resolveCalls = 0
1289 const hook = createHook({
1290 init (asyncId, type) {
1291 if (type === 'poolifier:task') {
1293 taskAsyncId = asyncId
1297 if (asyncId === taskAsyncId) beforeCalls++
1300 if (asyncId === taskAsyncId) afterCalls++
1303 if (executionAsyncId() === taskAsyncId) resolveCalls++
1306 const pool = new FixedThreadPool(
1308 './tests/worker-files/thread/testWorker.mjs'
1311 await pool.execute()
1313 expect(initCalls).toBe(1)
1314 expect(beforeCalls).toBe(1)
1315 expect(afterCalls).toBe(1)
1316 expect(resolveCalls).toBe(1)
1317 await pool.destroy()
1320 it('Verify that hasTaskFunction() is working', async () => {
1321 const dynamicThreadPool = new DynamicThreadPool(
1322 Math.floor(numberOfWorkers / 2),
1324 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1326 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1327 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1328 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1331 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1333 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1334 await dynamicThreadPool.destroy()
1335 const fixedClusterPool = new FixedClusterPool(
1337 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1339 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1340 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1341 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1344 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1346 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1347 await fixedClusterPool.destroy()
1350 it('Verify that addTaskFunction() is working', async () => {
1351 const dynamicThreadPool = new DynamicThreadPool(
1352 Math.floor(numberOfWorkers / 2),
1354 './tests/worker-files/thread/testWorker.mjs'
1356 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1358 dynamicThreadPool.addTaskFunction(0, () => {})
1359 ).rejects.toThrow(new TypeError('name argument must be a string'))
1361 dynamicThreadPool.addTaskFunction('', () => {})
1363 new TypeError('name argument must not be an empty string')
1365 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1366 new TypeError('fn argument must be a function')
1368 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1369 new TypeError('fn argument must be a function')
1371 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1375 const echoTaskFunction = data => {
1379 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1380 ).resolves.toBe(true)
1381 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1382 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1385 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1390 const taskFunctionData = { test: 'test' }
1391 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1392 expect(echoResult).toStrictEqual(taskFunctionData)
1393 for (const workerNode of dynamicThreadPool.workerNodes) {
1394 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1396 executed: expect.any(Number),
1399 sequentiallyStolen: 0,
1404 history: new CircularArray()
1407 history: new CircularArray()
1411 history: new CircularArray()
1414 history: new CircularArray()
1419 await dynamicThreadPool.destroy()
1422 it('Verify that removeTaskFunction() is working', async () => {
1423 const dynamicThreadPool = new DynamicThreadPool(
1424 Math.floor(numberOfWorkers / 2),
1426 './tests/worker-files/thread/testWorker.mjs'
1428 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1429 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1433 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1434 new Error('Cannot remove a task function not handled on the pool side')
1436 const echoTaskFunction = data => {
1439 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1440 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1441 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1449 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1452 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1453 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1454 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1458 await dynamicThreadPool.destroy()
1461 it('Verify that listTaskFunctionNames() is working', async () => {
1462 const dynamicThreadPool = new DynamicThreadPool(
1463 Math.floor(numberOfWorkers / 2),
1465 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1467 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1470 'jsonIntegerSerialization',
1474 await dynamicThreadPool.destroy()
1475 const fixedClusterPool = new FixedClusterPool(
1477 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1479 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1480 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1482 'jsonIntegerSerialization',
1486 await fixedClusterPool.destroy()
1489 it('Verify that setDefaultTaskFunction() is working', async () => {
1490 const dynamicThreadPool = new DynamicThreadPool(
1491 Math.floor(numberOfWorkers / 2),
1493 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1495 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1496 const workerId = dynamicThreadPool.workerNodes[0].info.id
1497 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1499 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1503 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1506 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1510 dynamicThreadPool.setDefaultTaskFunction('unknown')
1513 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1518 'jsonIntegerSerialization',
1523 dynamicThreadPool.setDefaultTaskFunction('factorial')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1528 'jsonIntegerSerialization',
1532 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1533 ).resolves.toBe(true)
1534 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1537 'jsonIntegerSerialization',
1540 await dynamicThreadPool.destroy()
1543 it('Verify that multiple task functions worker is working', async () => {
1544 const pool = new DynamicClusterPool(
1545 Math.floor(numberOfWorkers / 2),
1547 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1549 const data = { n: 10 }
1550 const result0 = await pool.execute(data)
1551 expect(result0).toStrictEqual({ ok: 1 })
1552 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1553 expect(result1).toStrictEqual({ ok: 1 })
1554 const result2 = await pool.execute(data, 'factorial')
1555 expect(result2).toBe(3628800)
1556 const result3 = await pool.execute(data, 'fibonacci')
1557 expect(result3).toBe(55)
1558 expect(pool.info.executingTasks).toBe(0)
1559 expect(pool.info.executedTasks).toBe(4)
1560 for (const workerNode of pool.workerNodes) {
1561 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1563 'jsonIntegerSerialization',
1567 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1568 for (const name of pool.listTaskFunctionNames()) {
1569 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1571 executed: expect.any(Number),
1575 sequentiallyStolen: 0,
1579 history: expect.any(CircularArray)
1582 history: expect.any(CircularArray)
1586 history: expect.any(CircularArray)
1589 history: expect.any(CircularArray)
1594 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1595 ).toBeGreaterThan(0)
1598 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1600 workerNode.getTaskFunctionWorkerUsage(
1601 workerNode.info.taskFunctionNames[1]
1605 await pool.destroy()
1608 it('Verify sendKillMessageToWorker()', async () => {
1609 const pool = new DynamicClusterPool(
1610 Math.floor(numberOfWorkers / 2),
1612 './tests/worker-files/cluster/testWorker.cjs'
1614 const workerNodeKey = 0
1616 pool.sendKillMessageToWorker(workerNodeKey)
1617 ).resolves.toBeUndefined()
1618 await pool.destroy()
1621 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1622 const pool = new DynamicClusterPool(
1623 Math.floor(numberOfWorkers / 2),
1625 './tests/worker-files/cluster/testWorker.cjs'
1627 const workerNodeKey = 0
1629 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1630 taskFunctionOperation: 'add',
1631 taskFunctionName: 'empty',
1632 taskFunction: (() => {}).toString()
1634 ).resolves.toBe(true)
1636 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1637 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1638 await pool.destroy()
1641 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1642 const pool = new DynamicClusterPool(
1643 Math.floor(numberOfWorkers / 2),
1645 './tests/worker-files/cluster/testWorker.cjs'
1648 pool.sendTaskFunctionOperationToWorkers({
1649 taskFunctionOperation: 'add',
1650 taskFunctionName: 'empty',
1651 taskFunction: (() => {}).toString()
1653 ).resolves.toBe(true)
1654 for (const workerNode of pool.workerNodes) {
1655 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1661 await pool.destroy()