1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
11 import { CircularArray } from '../../lib/circular-array.cjs'
19 WorkerChoiceStrategies,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
48 './tests/worker-files/thread/testWorker.mjs'
50 expect(pool).toBeInstanceOf(FixedThreadPool)
54 it('Verify that pool cannot be created from a non main thread/process', () => {
57 new StubPoolWithIsMain(
59 './tests/worker-files/thread/testWorker.mjs',
61 errorHandler: e => console.error(e)
66 'Cannot start a pool from a worker with the same type as the pool'
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
74 './tests/worker-files/thread/testWorker.mjs'
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
94 it('Verify that numberOfWorkers is checked', () => {
99 './tests/worker-files/thread/testWorker.mjs'
103 'Cannot instantiate a pool without specifying the number of workers'
108 it('Verify that a negative number of workers is checked', () => {
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
114 'Cannot instantiate a pool with a negative number of workers'
119 it('Verify that a non integer number of workers is checked', () => {
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
125 'Cannot instantiate a pool with a non safe integer number of workers'
130 it('Verify that pool arguments number and pool type are checked', () => {
135 './tests/worker-files/thread/testWorker.mjs',
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
146 it('Verify that dynamic pool sizing is checked', () => {
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.cjs'
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a pool with a non safe integer number of workers'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.cjs'
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 new DynamicThreadPool(
188 './tests/worker-files/thread/testWorker.mjs'
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 new DynamicThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
209 new DynamicClusterPool(
212 './tests/worker-files/cluster/testWorker.cjs'
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
224 './tests/worker-files/thread/testWorker.mjs'
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.mjs',
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 }
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
276 size: Math.pow(numberOfWorkers, 2),
278 tasksStealingOnBackPressure: true,
279 tasksFinishedTimeout: 2000
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 }
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: { weights: {} }
325 'Invalid worker choice strategy options: must have a weight for each worker node'
332 './tests/worker-files/thread/testWorker.mjs',
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 './tests/worker-files/thread/testWorker.mjs',
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
353 new TypeError('Invalid tasks queue options: must be a plain object')
359 './tests/worker-files/thread/testWorker.mjs',
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 }
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.mjs',
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.mjs',
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false }
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
563 'Invalid worker choice strategy options: must be a plain object'
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
584 './tests/worker-files/thread/testWorker.mjs'
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 size: Math.pow(numberOfWorkers, 2),
594 tasksStealingOnBackPressure: true,
595 tasksFinishedTimeout: 2000
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 size: Math.pow(numberOfWorkers, 2),
603 tasksStealingOnBackPressure: true,
604 tasksFinishedTimeout: 2000
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 size: Math.pow(numberOfWorkers, 2),
622 tasksStealingOnBackPressure: true,
623 tasksFinishedTimeout: 2000
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
630 pool.setTasksQueueOptions({
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
649 pool.setTasksQueueOptions({
652 tasksStealingOnBackPressure: true
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
701 './tests/worker-files/thread/testWorker.mjs'
703 expect(pool.info).toStrictEqual({
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
724 './tests/worker-files/cluster/testWorker.cjs'
726 expect(pool.info).toStrictEqual({
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
749 './tests/worker-files/cluster/testWorker.cjs'
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
759 sequentiallyStolen: 0,
764 history: new CircularArray()
767 history: new CircularArray()
771 history: new CircularArray()
774 history: new CircularArray()
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
785 './tests/worker-files/cluster/testWorker.cjs'
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
794 pool = new DynamicThreadPool(
795 Math.floor(numberOfWorkers / 2),
797 './tests/worker-files/thread/testWorker.mjs'
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode).toBeInstanceOf(WorkerNode)
801 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
802 expect(workerNode.tasksQueue.size).toBe(0)
803 expect(workerNode.tasksQueue.maxSize).toBe(0)
808 it('Verify that pool worker info are initialized', async () => {
809 let pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.cjs'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.info).toStrictEqual({
816 id: expect.any(Number),
817 type: WorkerTypes.cluster,
824 pool = new DynamicThreadPool(
825 Math.floor(numberOfWorkers / 2),
827 './tests/worker-files/thread/testWorker.mjs'
829 for (const workerNode of pool.workerNodes) {
830 expect(workerNode).toBeInstanceOf(WorkerNode)
831 expect(workerNode.info).toStrictEqual({
832 id: expect.any(Number),
833 type: WorkerTypes.thread,
842 it('Verify that pool statuses are checked at start or destroy', async () => {
843 const pool = new FixedThreadPool(
845 './tests/worker-files/thread/testWorker.mjs'
847 expect(pool.info.started).toBe(true)
848 expect(pool.info.ready).toBe(true)
849 expect(() => pool.start()).toThrow(
850 new Error('Cannot start an already started pool')
853 expect(pool.info.started).toBe(false)
854 expect(pool.info.ready).toBe(false)
855 await expect(pool.destroy()).rejects.toThrow(
856 new Error('Cannot destroy an already destroyed pool')
860 it('Verify that pool can be started after initialization', async () => {
861 const pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.cjs',
868 expect(pool.info.started).toBe(false)
869 expect(pool.info.ready).toBe(false)
870 expect(pool.workerNodes).toStrictEqual([])
871 expect(pool.readyEventEmitted).toBe(false)
872 await expect(pool.execute()).rejects.toThrow(
873 new Error('Cannot execute a task on not started pool')
876 expect(pool.info.started).toBe(true)
877 expect(pool.info.ready).toBe(true)
878 await waitPoolEvents(pool, PoolEvents.ready, 1)
879 expect(pool.readyEventEmitted).toBe(true)
880 expect(pool.workerNodes.length).toBe(numberOfWorkers)
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
887 it('Verify that pool execute() arguments are checked', async () => {
888 const pool = new FixedClusterPool(
890 './tests/worker-files/cluster/testWorker.cjs'
892 await expect(pool.execute(undefined, 0)).rejects.toThrow(
893 new TypeError('name argument must be a string')
895 await expect(pool.execute(undefined, '')).rejects.toThrow(
896 new TypeError('name argument must not be an empty string')
898 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
899 new TypeError('transferList argument must be an array')
901 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
902 "Task function 'unknown' not found"
905 await expect(pool.execute()).rejects.toThrow(
906 new Error('Cannot execute a task on not started pool')
910 it('Verify that pool worker tasks usage are computed', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.cjs'
915 const promises = new Set()
916 const maxMultiplier = 2
917 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
918 promises.add(pool.execute())
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode.usage).toStrictEqual({
924 executing: maxMultiplier,
927 sequentiallyStolen: 0,
932 history: expect.any(CircularArray)
935 history: expect.any(CircularArray)
939 history: expect.any(CircularArray)
942 history: expect.any(CircularArray)
947 await Promise.all(promises)
948 for (const workerNode of pool.workerNodes) {
949 expect(workerNode.usage).toStrictEqual({
951 executed: maxMultiplier,
955 sequentiallyStolen: 0,
960 history: expect.any(CircularArray)
963 history: expect.any(CircularArray)
967 history: expect.any(CircularArray)
970 history: expect.any(CircularArray)
978 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
979 const pool = new DynamicThreadPool(
980 Math.floor(numberOfWorkers / 2),
982 './tests/worker-files/thread/testWorker.mjs'
984 const promises = new Set()
985 const maxMultiplier = 2
986 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
987 promises.add(pool.execute())
989 await Promise.all(promises)
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
993 executed: expect.any(Number),
997 sequentiallyStolen: 0,
1002 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1016 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1017 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1018 numberOfWorkers * maxMultiplier
1020 expect(workerNode.usage.runTime.history.length).toBe(0)
1021 expect(workerNode.usage.waitTime.history.length).toBe(0)
1022 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1023 expect(workerNode.usage.elu.active.history.length).toBe(0)
1025 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1026 for (const workerNode of pool.workerNodes) {
1027 expect(workerNode.usage).toStrictEqual({
1029 executed: expect.any(Number),
1033 sequentiallyStolen: 0,
1038 history: expect.any(CircularArray)
1041 history: expect.any(CircularArray)
1045 history: expect.any(CircularArray)
1048 history: expect.any(CircularArray)
1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1056 expect(workerNode.usage.runTime.history.length).toBe(0)
1057 expect(workerNode.usage.waitTime.history.length).toBe(0)
1058 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1059 expect(workerNode.usage.elu.active.history.length).toBe(0)
1061 await pool.destroy()
1064 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1065 const pool = new DynamicClusterPool(
1066 Math.floor(numberOfWorkers / 2),
1068 './tests/worker-files/cluster/testWorker.cjs'
1070 expect(pool.emitter.eventNames()).toStrictEqual([])
1073 pool.emitter.on(PoolEvents.ready, info => {
1077 await waitPoolEvents(pool, PoolEvents.ready, 1)
1078 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1079 expect(poolReady).toBe(1)
1080 expect(poolInfo).toStrictEqual({
1082 type: PoolTypes.dynamic,
1083 worker: WorkerTypes.cluster,
1086 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1087 strategyRetries: expect.any(Number),
1088 minSize: expect.any(Number),
1089 maxSize: expect.any(Number),
1090 workerNodes: expect.any(Number),
1091 idleWorkerNodes: expect.any(Number),
1092 busyWorkerNodes: expect.any(Number),
1093 executedTasks: expect.any(Number),
1094 executingTasks: expect.any(Number),
1095 failedTasks: expect.any(Number)
1097 await pool.destroy()
1100 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1101 const pool = new FixedThreadPool(
1103 './tests/worker-files/thread/testWorker.mjs'
1105 expect(pool.emitter.eventNames()).toStrictEqual([])
1106 const promises = new Set()
1109 pool.emitter.on(PoolEvents.busy, info => {
1113 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1114 for (let i = 0; i < numberOfWorkers * 2; i++) {
1115 promises.add(pool.execute())
1117 await Promise.all(promises)
1118 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1119 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1120 expect(poolBusy).toBe(numberOfWorkers + 1)
1121 expect(poolInfo).toStrictEqual({
1123 type: PoolTypes.fixed,
1124 worker: WorkerTypes.thread,
1127 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1128 strategyRetries: expect.any(Number),
1129 minSize: expect.any(Number),
1130 maxSize: expect.any(Number),
1131 workerNodes: expect.any(Number),
1132 idleWorkerNodes: expect.any(Number),
1133 busyWorkerNodes: expect.any(Number),
1134 executedTasks: expect.any(Number),
1135 executingTasks: expect.any(Number),
1136 failedTasks: expect.any(Number)
1138 await pool.destroy()
1141 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1142 const pool = new DynamicThreadPool(
1143 Math.floor(numberOfWorkers / 2),
1145 './tests/worker-files/thread/testWorker.mjs'
1147 expect(pool.emitter.eventNames()).toStrictEqual([])
1148 const promises = new Set()
1151 pool.emitter.on(PoolEvents.full, info => {
1155 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1156 for (let i = 0; i < numberOfWorkers * 2; i++) {
1157 promises.add(pool.execute())
1159 await Promise.all(promises)
1160 expect(poolFull).toBe(1)
1161 expect(poolInfo).toStrictEqual({
1163 type: PoolTypes.dynamic,
1164 worker: WorkerTypes.thread,
1167 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1168 strategyRetries: expect.any(Number),
1169 minSize: expect.any(Number),
1170 maxSize: expect.any(Number),
1171 workerNodes: expect.any(Number),
1172 idleWorkerNodes: expect.any(Number),
1173 busyWorkerNodes: expect.any(Number),
1174 executedTasks: expect.any(Number),
1175 executingTasks: expect.any(Number),
1176 failedTasks: expect.any(Number)
1178 await pool.destroy()
1181 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1182 const pool = new FixedThreadPool(
1184 './tests/worker-files/thread/testWorker.mjs',
1186 enableTasksQueue: true
1189 stub(pool, 'hasBackPressure').returns(true)
1190 expect(pool.emitter.eventNames()).toStrictEqual([])
1191 const promises = new Set()
1192 let poolBackPressure = 0
1194 pool.emitter.on(PoolEvents.backPressure, info => {
1198 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1199 for (let i = 0; i < numberOfWorkers + 1; i++) {
1200 promises.add(pool.execute())
1202 await Promise.all(promises)
1203 expect(poolBackPressure).toBe(1)
1204 expect(poolInfo).toStrictEqual({
1206 type: PoolTypes.fixed,
1207 worker: WorkerTypes.thread,
1210 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1211 strategyRetries: expect.any(Number),
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 stealingWorkerNodes: expect.any(Number),
1217 busyWorkerNodes: expect.any(Number),
1218 executedTasks: expect.any(Number),
1219 executingTasks: expect.any(Number),
1220 maxQueuedTasks: expect.any(Number),
1221 queuedTasks: expect.any(Number),
1223 stolenTasks: expect.any(Number),
1224 failedTasks: expect.any(Number)
1226 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1227 await pool.destroy()
1230 it('Verify that destroy() waits for queued tasks to finish', async () => {
1231 const tasksFinishedTimeout = 2500
1232 const pool = new FixedThreadPool(
1234 './tests/worker-files/thread/asyncWorker.mjs',
1236 enableTasksQueue: true,
1237 tasksQueueOptions: { tasksFinishedTimeout }
1240 const maxMultiplier = 4
1241 let tasksFinished = 0
1242 for (const workerNode of pool.workerNodes) {
1243 workerNode.on('taskFinished', () => {
1247 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1250 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1251 const startTime = performance.now()
1252 await pool.destroy()
1253 const elapsedTime = performance.now() - startTime
1254 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1255 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1256 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1259 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1260 const tasksFinishedTimeout = 1000
1261 const pool = new FixedThreadPool(
1263 './tests/worker-files/thread/asyncWorker.mjs',
1265 enableTasksQueue: true,
1266 tasksQueueOptions: { tasksFinishedTimeout }
1269 const maxMultiplier = 4
1270 let tasksFinished = 0
1271 for (const workerNode of pool.workerNodes) {
1272 workerNode.on('taskFinished', () => {
1276 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1279 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1280 const startTime = performance.now()
1281 await pool.destroy()
1282 const elapsedTime = performance.now() - startTime
1283 expect(tasksFinished).toBe(0)
1284 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1287 it('Verify that pool asynchronous resource track tasks execution', async () => {
1292 let resolveCalls = 0
1293 const hook = createHook({
1294 init (asyncId, type) {
1295 if (type === 'poolifier:task') {
1297 taskAsyncId = asyncId
1301 if (asyncId === taskAsyncId) beforeCalls++
1304 if (asyncId === taskAsyncId) afterCalls++
1307 if (executionAsyncId() === taskAsyncId) resolveCalls++
1310 const pool = new FixedThreadPool(
1312 './tests/worker-files/thread/testWorker.mjs'
1315 await pool.execute()
1317 expect(initCalls).toBe(1)
1318 expect(beforeCalls).toBe(1)
1319 expect(afterCalls).toBe(1)
1320 expect(resolveCalls).toBe(1)
1321 await pool.destroy()
1324 it('Verify that hasTaskFunction() is working', async () => {
1325 const dynamicThreadPool = new DynamicThreadPool(
1326 Math.floor(numberOfWorkers / 2),
1328 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1330 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1331 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1335 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1338 await dynamicThreadPool.destroy()
1339 const fixedClusterPool = new FixedClusterPool(
1341 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1343 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1344 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1348 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1349 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1350 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1351 await fixedClusterPool.destroy()
1354 it('Verify that addTaskFunction() is working', async () => {
1355 const dynamicThreadPool = new DynamicThreadPool(
1356 Math.floor(numberOfWorkers / 2),
1358 './tests/worker-files/thread/testWorker.mjs'
1360 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1362 dynamicThreadPool.addTaskFunction(0, () => {})
1363 ).rejects.toThrow(new TypeError('name argument must be a string'))
1365 dynamicThreadPool.addTaskFunction('', () => {})
1367 new TypeError('name argument must not be an empty string')
1369 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1370 new TypeError('taskFunction property must be a function')
1372 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1373 new TypeError('taskFunction property must be a function')
1376 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1377 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1379 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1380 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1382 dynamicThreadPool.addTaskFunction('test', {
1383 taskFunction: () => {},
1387 new RangeError("Property 'priority' must be between -20 and 19")
1390 dynamicThreadPool.addTaskFunction('test', {
1391 taskFunction: () => {},
1395 new RangeError("Property 'priority' must be between -20 and 19")
1398 dynamicThreadPool.addTaskFunction('test', {
1399 taskFunction: () => {},
1400 strategy: 'invalidStrategy'
1403 new Error("Invalid worker choice strategy 'invalidStrategy'")
1405 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1406 { name: DEFAULT_TASK_NAME },
1410 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1411 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1412 const echoTaskFunction = data => {
1416 dynamicThreadPool.addTaskFunction('echo', {
1417 taskFunction: echoTaskFunction,
1418 strategy: WorkerChoiceStrategies.LEAST_ELU
1420 ).resolves.toBe(true)
1421 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1422 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1423 taskFunction: echoTaskFunction,
1424 strategy: WorkerChoiceStrategies.LEAST_ELU
1427 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1429 WorkerChoiceStrategies.ROUND_ROBIN,
1430 WorkerChoiceStrategies.LEAST_ELU
1432 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1433 { name: DEFAULT_TASK_NAME },
1435 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1437 const taskFunctionData = { test: 'test' }
1438 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1439 expect(echoResult).toStrictEqual(taskFunctionData)
1440 for (const workerNode of dynamicThreadPool.workerNodes) {
1441 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1443 executed: expect.any(Number),
1446 sequentiallyStolen: 0,
1451 history: new CircularArray()
1454 history: new CircularArray()
1461 history: new CircularArray()
1467 history: new CircularArray()
1472 await dynamicThreadPool.destroy()
1475 it('Verify that removeTaskFunction() is working', async () => {
1476 const dynamicThreadPool = new DynamicThreadPool(
1477 Math.floor(numberOfWorkers / 2),
1479 './tests/worker-files/thread/testWorker.mjs'
1481 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1482 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1483 { name: DEFAULT_TASK_NAME },
1486 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1487 new Error('Cannot remove a task function not handled on the pool side')
1489 const echoTaskFunction = data => {
1492 await dynamicThreadPool.addTaskFunction('echo', {
1493 taskFunction: echoTaskFunction,
1494 strategy: WorkerChoiceStrategies.LEAST_ELU
1496 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1497 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1498 taskFunction: echoTaskFunction,
1499 strategy: WorkerChoiceStrategies.LEAST_ELU
1502 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1504 WorkerChoiceStrategies.ROUND_ROBIN,
1505 WorkerChoiceStrategies.LEAST_ELU
1507 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1508 { name: DEFAULT_TASK_NAME },
1510 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1512 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1515 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1516 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1518 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1519 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1520 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1521 { name: DEFAULT_TASK_NAME },
1524 await dynamicThreadPool.destroy()
1527 it('Verify that listTaskFunctionsProperties() is working', async () => {
1528 const dynamicThreadPool = new DynamicThreadPool(
1529 Math.floor(numberOfWorkers / 2),
1531 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1533 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1534 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1535 { name: DEFAULT_TASK_NAME },
1536 { name: 'jsonIntegerSerialization' },
1537 { name: 'factorial' },
1538 { name: 'fibonacci' }
1540 await dynamicThreadPool.destroy()
1541 const fixedClusterPool = new FixedClusterPool(
1543 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1545 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1546 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1547 { name: DEFAULT_TASK_NAME },
1548 { name: 'jsonIntegerSerialization' },
1549 { name: 'factorial' },
1550 { name: 'fibonacci' }
1552 await fixedClusterPool.destroy()
1555 it('Verify that setDefaultTaskFunction() is working', async () => {
1556 const dynamicThreadPool = new DynamicThreadPool(
1557 Math.floor(numberOfWorkers / 2),
1559 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1561 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1562 const workerId = dynamicThreadPool.workerNodes[0].info.id
1563 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1565 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1569 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1572 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1576 dynamicThreadPool.setDefaultTaskFunction('unknown')
1579 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1582 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1583 { name: DEFAULT_TASK_NAME },
1584 { name: 'jsonIntegerSerialization' },
1585 { name: 'factorial' },
1586 { name: 'fibonacci' }
1589 dynamicThreadPool.setDefaultTaskFunction('factorial')
1590 ).resolves.toBe(true)
1591 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1592 { name: DEFAULT_TASK_NAME },
1593 { name: 'factorial' },
1594 { name: 'jsonIntegerSerialization' },
1595 { name: 'fibonacci' }
1598 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1599 ).resolves.toBe(true)
1600 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1601 { name: DEFAULT_TASK_NAME },
1602 { name: 'fibonacci' },
1603 { name: 'jsonIntegerSerialization' },
1604 { name: 'factorial' }
1606 await dynamicThreadPool.destroy()
1609 it('Verify that multiple task functions worker is working', async () => {
1610 const pool = new DynamicClusterPool(
1611 Math.floor(numberOfWorkers / 2),
1613 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1615 const data = { n: 10 }
1616 const result0 = await pool.execute(data)
1617 expect(result0).toStrictEqual({ ok: 1 })
1618 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1619 expect(result1).toStrictEqual({ ok: 1 })
1620 const result2 = await pool.execute(data, 'factorial')
1621 expect(result2).toBe(3628800)
1622 const result3 = await pool.execute(data, 'fibonacci')
1623 expect(result3).toBe(55)
1624 expect(pool.info.executingTasks).toBe(0)
1625 expect(pool.info.executedTasks).toBe(4)
1626 for (const workerNode of pool.workerNodes) {
1627 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1628 { name: DEFAULT_TASK_NAME },
1629 { name: 'jsonIntegerSerialization' },
1630 { name: 'factorial' },
1631 { name: 'fibonacci' }
1633 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1634 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1636 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1639 executed: expect.any(Number),
1643 sequentiallyStolen: 0,
1647 history: expect.any(CircularArray)
1650 history: expect.any(CircularArray)
1654 history: expect.any(CircularArray)
1657 history: expect.any(CircularArray)
1662 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1664 ).toBeGreaterThan(0)
1667 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1669 workerNode.getTaskFunctionWorkerUsage(
1670 workerNode.info.taskFunctionsProperties[1].name
1674 await pool.destroy()
1677 it('Verify sendKillMessageToWorker()', async () => {
1678 const pool = new DynamicClusterPool(
1679 Math.floor(numberOfWorkers / 2),
1681 './tests/worker-files/cluster/testWorker.cjs'
1683 const workerNodeKey = 0
1685 pool.sendKillMessageToWorker(workerNodeKey)
1686 ).resolves.toBeUndefined()
1687 await pool.destroy()
1690 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1691 const pool = new DynamicClusterPool(
1692 Math.floor(numberOfWorkers / 2),
1694 './tests/worker-files/cluster/testWorker.cjs'
1696 const workerNodeKey = 0
1698 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1699 taskFunctionOperation: 'add',
1700 taskFunctionProperties: { name: 'empty' },
1701 taskFunction: (() => {}).toString()
1703 ).resolves.toBe(true)
1705 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1707 { name: DEFAULT_TASK_NAME },
1711 await pool.destroy()
1714 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1715 const pool = new DynamicClusterPool(
1716 Math.floor(numberOfWorkers / 2),
1718 './tests/worker-files/cluster/testWorker.cjs'
1721 pool.sendTaskFunctionOperationToWorkers({
1722 taskFunctionOperation: 'add',
1723 taskFunctionProperties: { name: 'empty' },
1724 taskFunction: (() => {}).toString()
1726 ).resolves.toBe(true)
1727 for (const workerNode of pool.workerNodes) {
1728 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1729 { name: DEFAULT_TASK_NAME },
1734 await pool.destroy()