1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
11 import { CircularArray } from '../../lib/circular-array.cjs'
19 WorkerChoiceStrategies,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
48 './tests/worker-files/thread/testWorker.mjs'
50 expect(pool).toBeInstanceOf(FixedThreadPool)
54 it('Verify that pool cannot be created from a non main thread/process', () => {
57 new StubPoolWithIsMain(
59 './tests/worker-files/thread/testWorker.mjs',
61 errorHandler: e => console.error(e)
66 'Cannot start a pool from a worker with the same type as the pool'
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
74 './tests/worker-files/thread/testWorker.mjs'
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
94 it('Verify that numberOfWorkers is checked', () => {
99 './tests/worker-files/thread/testWorker.mjs'
103 'Cannot instantiate a pool without specifying the number of workers'
108 it('Verify that a negative number of workers is checked', () => {
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
114 'Cannot instantiate a pool with a negative number of workers'
119 it('Verify that a non integer number of workers is checked', () => {
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
125 'Cannot instantiate a pool with a non safe integer number of workers'
130 it('Verify that pool arguments number and pool type are checked', () => {
135 './tests/worker-files/thread/testWorker.mjs',
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
146 it('Verify that dynamic pool sizing is checked', () => {
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.cjs'
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a pool with a non safe integer number of workers'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.cjs'
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 new DynamicThreadPool(
188 './tests/worker-files/thread/testWorker.mjs'
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 new DynamicThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
209 new DynamicClusterPool(
212 './tests/worker-files/cluster/testWorker.cjs'
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
224 './tests/worker-files/thread/testWorker.mjs'
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.mjs',
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 }
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
276 size: Math.pow(numberOfWorkers, 2),
278 tasksStealingOnBackPressure: true,
279 tasksFinishedTimeout: 2000
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 }
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: { weights: {} }
325 'Invalid worker choice strategy options: must have a weight for each worker node'
332 './tests/worker-files/thread/testWorker.mjs',
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 './tests/worker-files/thread/testWorker.mjs',
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
353 new TypeError('Invalid tasks queue options: must be a plain object')
359 './tests/worker-files/thread/testWorker.mjs',
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 }
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.mjs',
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.mjs',
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false }
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
563 'Invalid worker choice strategy options: must be a plain object'
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
584 './tests/worker-files/thread/testWorker.mjs'
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 size: Math.pow(numberOfWorkers, 2),
594 tasksStealingOnBackPressure: true,
595 tasksFinishedTimeout: 2000
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 size: Math.pow(numberOfWorkers, 2),
603 tasksStealingOnBackPressure: true,
604 tasksFinishedTimeout: 2000
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 size: Math.pow(numberOfWorkers, 2),
622 tasksStealingOnBackPressure: true,
623 tasksFinishedTimeout: 2000
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
630 pool.setTasksQueueOptions({
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
649 pool.setTasksQueueOptions({
652 tasksStealingOnBackPressure: true
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
701 './tests/worker-files/thread/testWorker.mjs'
703 expect(pool.info).toStrictEqual({
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
724 './tests/worker-files/cluster/testWorker.cjs'
726 expect(pool.info).toStrictEqual({
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
749 './tests/worker-files/cluster/testWorker.cjs'
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
759 sequentiallyStolen: 0,
764 history: new CircularArray()
767 history: new CircularArray()
771 history: new CircularArray()
774 history: new CircularArray()
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
785 './tests/worker-files/cluster/testWorker.cjs'
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
794 pool = new DynamicThreadPool(
795 Math.floor(numberOfWorkers / 2),
797 './tests/worker-files/thread/testWorker.mjs'
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode).toBeInstanceOf(WorkerNode)
801 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
802 expect(workerNode.tasksQueue.size).toBe(0)
803 expect(workerNode.tasksQueue.maxSize).toBe(0)
808 it('Verify that pool worker info are initialized', async () => {
809 let pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.cjs'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.info).toStrictEqual({
816 id: expect.any(Number),
817 type: WorkerTypes.cluster,
824 pool = new DynamicThreadPool(
825 Math.floor(numberOfWorkers / 2),
827 './tests/worker-files/thread/testWorker.mjs'
829 for (const workerNode of pool.workerNodes) {
830 expect(workerNode).toBeInstanceOf(WorkerNode)
831 expect(workerNode.info).toStrictEqual({
832 id: expect.any(Number),
833 type: WorkerTypes.thread,
842 it('Verify that pool statuses are checked at start or destroy', async () => {
843 const pool = new FixedThreadPool(
845 './tests/worker-files/thread/testWorker.mjs'
847 expect(pool.info.started).toBe(true)
848 expect(pool.info.ready).toBe(true)
849 expect(() => pool.start()).toThrow(
850 new Error('Cannot start an already started pool')
853 expect(pool.info.started).toBe(false)
854 expect(pool.info.ready).toBe(false)
855 await expect(pool.destroy()).rejects.toThrow(
856 new Error('Cannot destroy an already destroyed pool')
860 it('Verify that pool can be started after initialization', async () => {
861 const pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.cjs',
868 expect(pool.info.started).toBe(false)
869 expect(pool.info.ready).toBe(false)
870 expect(pool.workerNodes).toStrictEqual([])
871 expect(pool.readyEventEmitted).toBe(false)
872 await expect(pool.execute()).rejects.toThrow(
873 new Error('Cannot execute a task on not started pool')
876 expect(pool.info.started).toBe(true)
877 expect(pool.info.ready).toBe(true)
878 await waitPoolEvents(pool, PoolEvents.ready, 1)
879 expect(pool.readyEventEmitted).toBe(true)
880 expect(pool.workerNodes.length).toBe(numberOfWorkers)
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
887 it('Verify that pool execute() arguments are checked', async () => {
888 const pool = new FixedClusterPool(
890 './tests/worker-files/cluster/testWorker.cjs'
892 await expect(pool.execute(undefined, 0)).rejects.toThrow(
893 new TypeError('name argument must be a string')
895 await expect(pool.execute(undefined, '')).rejects.toThrow(
896 new TypeError('name argument must not be an empty string')
898 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
899 new TypeError('transferList argument must be an array')
901 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
902 "Task function 'unknown' not found"
905 await expect(pool.execute()).rejects.toThrow(
906 new Error('Cannot execute a task on not started pool')
910 it('Verify that pool worker tasks usage are computed', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.cjs'
915 const promises = new Set()
916 const maxMultiplier = 2
917 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
918 promises.add(pool.execute())
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode.usage).toStrictEqual({
924 executing: maxMultiplier,
927 sequentiallyStolen: 0,
932 history: expect.any(CircularArray)
935 history: expect.any(CircularArray)
939 history: expect.any(CircularArray)
942 history: expect.any(CircularArray)
947 await Promise.all(promises)
948 for (const workerNode of pool.workerNodes) {
949 expect(workerNode.usage).toStrictEqual({
951 executed: maxMultiplier,
955 sequentiallyStolen: 0,
960 history: expect.any(CircularArray)
963 history: expect.any(CircularArray)
967 history: expect.any(CircularArray)
970 history: expect.any(CircularArray)
978 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
979 const pool = new DynamicThreadPool(
980 Math.floor(numberOfWorkers / 2),
982 './tests/worker-files/thread/testWorker.mjs'
984 const promises = new Set()
985 const maxMultiplier = 2
986 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
987 promises.add(pool.execute())
989 await Promise.all(promises)
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
993 executed: expect.any(Number),
997 sequentiallyStolen: 0,
1002 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1016 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1017 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1018 numberOfWorkers * maxMultiplier
1020 expect(workerNode.usage.runTime.history.length).toBe(0)
1021 expect(workerNode.usage.waitTime.history.length).toBe(0)
1022 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1023 expect(workerNode.usage.elu.active.history.length).toBe(0)
1025 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1026 for (const workerNode of pool.workerNodes) {
1027 expect(workerNode.usage).toStrictEqual({
1029 executed: expect.any(Number),
1033 sequentiallyStolen: 0,
1038 history: expect.any(CircularArray)
1041 history: expect.any(CircularArray)
1045 history: expect.any(CircularArray)
1048 history: expect.any(CircularArray)
1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1056 expect(workerNode.usage.runTime.history.length).toBe(0)
1057 expect(workerNode.usage.waitTime.history.length).toBe(0)
1058 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1059 expect(workerNode.usage.elu.active.history.length).toBe(0)
1061 await pool.destroy()
1064 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1065 const pool = new DynamicClusterPool(
1066 Math.floor(numberOfWorkers / 2),
1068 './tests/worker-files/cluster/testWorker.cjs'
1070 expect(pool.emitter.eventNames()).toStrictEqual([])
1073 pool.emitter.on(PoolEvents.ready, info => {
1077 await waitPoolEvents(pool, PoolEvents.ready, 1)
1078 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1079 expect(poolReady).toBe(1)
1080 expect(poolInfo).toStrictEqual({
1082 type: PoolTypes.dynamic,
1083 worker: WorkerTypes.cluster,
1086 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1087 strategyRetries: expect.any(Number),
1088 minSize: expect.any(Number),
1089 maxSize: expect.any(Number),
1090 workerNodes: expect.any(Number),
1091 idleWorkerNodes: expect.any(Number),
1092 busyWorkerNodes: expect.any(Number),
1093 executedTasks: expect.any(Number),
1094 executingTasks: expect.any(Number),
1095 failedTasks: expect.any(Number)
1097 await pool.destroy()
1100 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1101 const pool = new FixedThreadPool(
1103 './tests/worker-files/thread/testWorker.mjs'
1105 expect(pool.emitter.eventNames()).toStrictEqual([])
1106 const promises = new Set()
1109 pool.emitter.on(PoolEvents.busy, info => {
1113 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1114 for (let i = 0; i < numberOfWorkers * 2; i++) {
1115 promises.add(pool.execute())
1117 await Promise.all(promises)
1118 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1119 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1120 expect(poolBusy).toBe(numberOfWorkers + 1)
1121 expect(poolInfo).toStrictEqual({
1123 type: PoolTypes.fixed,
1124 worker: WorkerTypes.thread,
1127 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1128 strategyRetries: expect.any(Number),
1129 minSize: expect.any(Number),
1130 maxSize: expect.any(Number),
1131 workerNodes: expect.any(Number),
1132 idleWorkerNodes: expect.any(Number),
1133 busyWorkerNodes: expect.any(Number),
1134 executedTasks: expect.any(Number),
1135 executingTasks: expect.any(Number),
1136 failedTasks: expect.any(Number)
1138 await pool.destroy()
1141 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1142 const pool = new DynamicThreadPool(
1143 Math.floor(numberOfWorkers / 2),
1145 './tests/worker-files/thread/testWorker.mjs'
1147 expect(pool.emitter.eventNames()).toStrictEqual([])
1148 const promises = new Set()
1151 pool.emitter.on(PoolEvents.full, info => {
1155 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1156 for (let i = 0; i < numberOfWorkers * 2; i++) {
1157 promises.add(pool.execute())
1159 await Promise.all(promises)
1160 expect(poolFull).toBe(1)
1161 expect(poolInfo).toStrictEqual({
1163 type: PoolTypes.dynamic,
1164 worker: WorkerTypes.thread,
1167 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1168 strategyRetries: expect.any(Number),
1169 minSize: expect.any(Number),
1170 maxSize: expect.any(Number),
1171 workerNodes: expect.any(Number),
1172 idleWorkerNodes: expect.any(Number),
1173 busyWorkerNodes: expect.any(Number),
1174 executedTasks: expect.any(Number),
1175 executingTasks: expect.any(Number),
1176 failedTasks: expect.any(Number)
1178 await pool.destroy()
1181 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1182 const pool = new FixedThreadPool(
1184 './tests/worker-files/thread/testWorker.mjs',
1186 enableTasksQueue: true
1189 stub(pool, 'hasBackPressure').returns(true)
1190 expect(pool.emitter.eventNames()).toStrictEqual([])
1191 const promises = new Set()
1192 let poolBackPressure = 0
1194 pool.emitter.on(PoolEvents.backPressure, info => {
1198 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1199 for (let i = 0; i < numberOfWorkers + 1; i++) {
1200 promises.add(pool.execute())
1202 await Promise.all(promises)
1203 expect(poolBackPressure).toBe(1)
1204 expect(poolInfo).toStrictEqual({
1206 type: PoolTypes.fixed,
1207 worker: WorkerTypes.thread,
1210 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1211 strategyRetries: expect.any(Number),
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 stealingWorkerNodes: expect.any(Number),
1217 busyWorkerNodes: expect.any(Number),
1218 executedTasks: expect.any(Number),
1219 executingTasks: expect.any(Number),
1220 maxQueuedTasks: expect.any(Number),
1221 queuedTasks: expect.any(Number),
1223 stolenTasks: expect.any(Number),
1224 failedTasks: expect.any(Number)
1226 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1227 await pool.destroy()
1230 it('Verify that destroy() waits for queued tasks to finish', async () => {
1231 const tasksFinishedTimeout = 2500
1232 const pool = new FixedThreadPool(
1234 './tests/worker-files/thread/asyncWorker.mjs',
1236 enableTasksQueue: true,
1237 tasksQueueOptions: { tasksFinishedTimeout }
1240 const maxMultiplier = 4
1241 let tasksFinished = 0
1242 for (const workerNode of pool.workerNodes) {
1243 workerNode.on('taskFinished', () => {
1247 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1250 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1251 const startTime = performance.now()
1252 await pool.destroy()
1253 const elapsedTime = performance.now() - startTime
1254 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1255 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1256 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1259 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1260 const tasksFinishedTimeout = 1000
1261 const pool = new FixedThreadPool(
1263 './tests/worker-files/thread/asyncWorker.mjs',
1265 enableTasksQueue: true,
1266 tasksQueueOptions: { tasksFinishedTimeout }
1269 const maxMultiplier = 4
1270 let tasksFinished = 0
1271 for (const workerNode of pool.workerNodes) {
1272 workerNode.on('taskFinished', () => {
1276 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1279 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1280 const startTime = performance.now()
1281 await pool.destroy()
1282 const elapsedTime = performance.now() - startTime
1283 expect(tasksFinished).toBe(0)
1284 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1287 it('Verify that pool asynchronous resource track tasks execution', async () => {
1292 let resolveCalls = 0
1293 const hook = createHook({
1294 init (asyncId, type) {
1295 if (type === 'poolifier:task') {
1297 taskAsyncId = asyncId
1301 if (asyncId === taskAsyncId) beforeCalls++
1304 if (asyncId === taskAsyncId) afterCalls++
1307 if (executionAsyncId() === taskAsyncId) resolveCalls++
1310 const pool = new FixedThreadPool(
1312 './tests/worker-files/thread/testWorker.mjs'
1315 await pool.execute()
1317 expect(initCalls).toBe(1)
1318 expect(beforeCalls).toBe(1)
1319 expect(afterCalls).toBe(1)
1320 expect(resolveCalls).toBe(1)
1321 await pool.destroy()
1324 it('Verify that hasTaskFunction() is working', async () => {
1325 const dynamicThreadPool = new DynamicThreadPool(
1326 Math.floor(numberOfWorkers / 2),
1328 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1330 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1331 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1335 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1338 await dynamicThreadPool.destroy()
1339 const fixedClusterPool = new FixedClusterPool(
1341 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1343 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1344 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1348 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1349 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1350 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1351 await fixedClusterPool.destroy()
1354 it('Verify that addTaskFunction() is working', async () => {
1355 const dynamicThreadPool = new DynamicThreadPool(
1356 Math.floor(numberOfWorkers / 2),
1358 './tests/worker-files/thread/testWorker.mjs'
1360 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1362 dynamicThreadPool.addTaskFunction(0, () => {})
1363 ).rejects.toThrow(new TypeError('name argument must be a string'))
1365 dynamicThreadPool.addTaskFunction('', () => {})
1367 new TypeError('name argument must not be an empty string')
1369 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1370 new TypeError('taskFunction property must be a function')
1372 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1373 new TypeError('taskFunction property must be a function')
1375 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1376 { name: DEFAULT_TASK_NAME },
1380 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1381 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1382 const echoTaskFunction = data => {
1386 dynamicThreadPool.addTaskFunction('echo', {
1387 taskFunction: echoTaskFunction,
1388 strategy: WorkerChoiceStrategies.LEAST_ELU
1390 ).resolves.toBe(true)
1391 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1392 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1393 taskFunction: echoTaskFunction,
1394 strategy: WorkerChoiceStrategies.LEAST_ELU
1397 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1399 WorkerChoiceStrategies.ROUND_ROBIN,
1400 WorkerChoiceStrategies.LEAST_ELU
1402 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1403 { name: DEFAULT_TASK_NAME },
1405 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1407 const taskFunctionData = { test: 'test' }
1408 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1409 expect(echoResult).toStrictEqual(taskFunctionData)
1410 for (const workerNode of dynamicThreadPool.workerNodes) {
1411 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1413 executed: expect.any(Number),
1416 sequentiallyStolen: 0,
1421 history: new CircularArray()
1424 history: new CircularArray()
1431 history: new CircularArray()
1437 history: new CircularArray()
1442 await dynamicThreadPool.destroy()
1445 it('Verify that removeTaskFunction() is working', async () => {
1446 const dynamicThreadPool = new DynamicThreadPool(
1447 Math.floor(numberOfWorkers / 2),
1449 './tests/worker-files/thread/testWorker.mjs'
1451 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1452 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1453 { name: DEFAULT_TASK_NAME },
1456 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1457 new Error('Cannot remove a task function not handled on the pool side')
1459 const echoTaskFunction = data => {
1462 await dynamicThreadPool.addTaskFunction('echo', {
1463 taskFunction: echoTaskFunction,
1464 strategy: WorkerChoiceStrategies.LEAST_ELU
1466 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1467 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1468 taskFunction: echoTaskFunction,
1469 strategy: WorkerChoiceStrategies.LEAST_ELU
1472 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1474 WorkerChoiceStrategies.ROUND_ROBIN,
1475 WorkerChoiceStrategies.LEAST_ELU
1477 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1478 { name: DEFAULT_TASK_NAME },
1480 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1482 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1485 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1486 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1488 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1489 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1490 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1491 { name: DEFAULT_TASK_NAME },
1494 await dynamicThreadPool.destroy()
1497 it('Verify that listTaskFunctionsProperties() is working', async () => {
1498 const dynamicThreadPool = new DynamicThreadPool(
1499 Math.floor(numberOfWorkers / 2),
1501 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1503 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1504 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1505 { name: DEFAULT_TASK_NAME },
1506 { name: 'jsonIntegerSerialization' },
1507 { name: 'factorial' },
1508 { name: 'fibonacci' }
1510 await dynamicThreadPool.destroy()
1511 const fixedClusterPool = new FixedClusterPool(
1513 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1515 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1516 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1517 { name: DEFAULT_TASK_NAME },
1518 { name: 'jsonIntegerSerialization' },
1519 { name: 'factorial' },
1520 { name: 'fibonacci' }
1522 await fixedClusterPool.destroy()
1525 it('Verify that setDefaultTaskFunction() is working', async () => {
1526 const dynamicThreadPool = new DynamicThreadPool(
1527 Math.floor(numberOfWorkers / 2),
1529 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1531 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1532 const workerId = dynamicThreadPool.workerNodes[0].info.id
1533 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1535 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1539 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1542 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1546 dynamicThreadPool.setDefaultTaskFunction('unknown')
1549 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1552 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1553 { name: DEFAULT_TASK_NAME },
1554 { name: 'jsonIntegerSerialization' },
1555 { name: 'factorial' },
1556 { name: 'fibonacci' }
1559 dynamicThreadPool.setDefaultTaskFunction('factorial')
1560 ).resolves.toBe(true)
1561 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1562 { name: DEFAULT_TASK_NAME },
1563 { name: 'factorial' },
1564 { name: 'jsonIntegerSerialization' },
1565 { name: 'fibonacci' }
1568 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1569 ).resolves.toBe(true)
1570 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1571 { name: DEFAULT_TASK_NAME },
1572 { name: 'fibonacci' },
1573 { name: 'jsonIntegerSerialization' },
1574 { name: 'factorial' }
1576 await dynamicThreadPool.destroy()
1579 it('Verify that multiple task functions worker is working', async () => {
1580 const pool = new DynamicClusterPool(
1581 Math.floor(numberOfWorkers / 2),
1583 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1585 const data = { n: 10 }
1586 const result0 = await pool.execute(data)
1587 expect(result0).toStrictEqual({ ok: 1 })
1588 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1589 expect(result1).toStrictEqual({ ok: 1 })
1590 const result2 = await pool.execute(data, 'factorial')
1591 expect(result2).toBe(3628800)
1592 const result3 = await pool.execute(data, 'fibonacci')
1593 expect(result3).toBe(55)
1594 expect(pool.info.executingTasks).toBe(0)
1595 expect(pool.info.executedTasks).toBe(4)
1596 for (const workerNode of pool.workerNodes) {
1597 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1598 { name: DEFAULT_TASK_NAME },
1599 { name: 'jsonIntegerSerialization' },
1600 { name: 'factorial' },
1601 { name: 'fibonacci' }
1603 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1604 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1606 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1609 executed: expect.any(Number),
1613 sequentiallyStolen: 0,
1617 history: expect.any(CircularArray)
1620 history: expect.any(CircularArray)
1624 history: expect.any(CircularArray)
1627 history: expect.any(CircularArray)
1632 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1634 ).toBeGreaterThan(0)
1637 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1639 workerNode.getTaskFunctionWorkerUsage(
1640 workerNode.info.taskFunctionsProperties[1].name
1644 await pool.destroy()
1647 it('Verify sendKillMessageToWorker()', async () => {
1648 const pool = new DynamicClusterPool(
1649 Math.floor(numberOfWorkers / 2),
1651 './tests/worker-files/cluster/testWorker.cjs'
1653 const workerNodeKey = 0
1655 pool.sendKillMessageToWorker(workerNodeKey)
1656 ).resolves.toBeUndefined()
1657 await pool.destroy()
1660 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1661 const pool = new DynamicClusterPool(
1662 Math.floor(numberOfWorkers / 2),
1664 './tests/worker-files/cluster/testWorker.cjs'
1666 const workerNodeKey = 0
1668 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1669 taskFunctionOperation: 'add',
1670 taskFunctionProperties: { name: 'empty' },
1671 taskFunction: (() => {}).toString()
1673 ).resolves.toBe(true)
1675 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1677 { name: DEFAULT_TASK_NAME },
1681 await pool.destroy()
1684 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1685 const pool = new DynamicClusterPool(
1686 Math.floor(numberOfWorkers / 2),
1688 './tests/worker-files/cluster/testWorker.cjs'
1691 pool.sendTaskFunctionOperationToWorkers({
1692 taskFunctionOperation: 'add',
1693 taskFunctionProperties: { name: 'empty' },
1694 taskFunction: (() => {}).toString()
1696 ).resolves.toBe(true)
1697 for (const workerNode of pool.workerNodes) {
1698 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1699 { name: DEFAULT_TASK_NAME },
1704 await pool.destroy()