1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
11 import { CircularArray } from '../../lib/circular-array.cjs'
19 WorkerChoiceStrategies,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
48 './tests/worker-files/thread/testWorker.mjs'
50 expect(pool).toBeInstanceOf(FixedThreadPool)
54 it('Verify that pool cannot be created from a non main thread/process', () => {
57 new StubPoolWithIsMain(
59 './tests/worker-files/thread/testWorker.mjs',
61 errorHandler: e => console.error(e)
66 'Cannot start a pool from a worker with the same type as the pool'
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
74 './tests/worker-files/thread/testWorker.mjs'
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
94 it('Verify that numberOfWorkers is checked', () => {
99 './tests/worker-files/thread/testWorker.mjs'
103 'Cannot instantiate a pool without specifying the number of workers'
108 it('Verify that a negative number of workers is checked', () => {
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
114 'Cannot instantiate a pool with a negative number of workers'
119 it('Verify that a non integer number of workers is checked', () => {
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
125 'Cannot instantiate a pool with a non safe integer number of workers'
130 it('Verify that pool arguments number and pool type are checked', () => {
135 './tests/worker-files/thread/testWorker.mjs',
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
146 it('Verify that dynamic pool sizing is checked', () => {
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.cjs'
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a pool with a non safe integer number of workers'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.cjs'
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 new DynamicThreadPool(
188 './tests/worker-files/thread/testWorker.mjs'
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 new DynamicThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
209 new DynamicClusterPool(
212 './tests/worker-files/cluster/testWorker.cjs'
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
224 './tests/worker-files/thread/testWorker.mjs'
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.mjs',
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 }
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
276 size: Math.pow(numberOfWorkers, 2),
278 tasksStealingOnBackPressure: true,
279 tasksFinishedTimeout: 2000
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 }
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: { weights: {} }
325 'Invalid worker choice strategy options: must have a weight for each worker node'
332 './tests/worker-files/thread/testWorker.mjs',
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 './tests/worker-files/thread/testWorker.mjs',
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
353 new TypeError('Invalid tasks queue options: must be a plain object')
359 './tests/worker-files/thread/testWorker.mjs',
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 }
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.mjs',
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.mjs',
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false }
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
563 'Invalid worker choice strategy options: must be a plain object'
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
584 './tests/worker-files/thread/testWorker.mjs'
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 size: Math.pow(numberOfWorkers, 2),
594 tasksStealingOnBackPressure: true,
595 tasksFinishedTimeout: 2000
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 size: Math.pow(numberOfWorkers, 2),
603 tasksStealingOnBackPressure: true,
604 tasksFinishedTimeout: 2000
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 size: Math.pow(numberOfWorkers, 2),
622 tasksStealingOnBackPressure: true,
623 tasksFinishedTimeout: 2000
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
630 pool.setTasksQueueOptions({
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
649 pool.setTasksQueueOptions({
652 tasksStealingOnBackPressure: true
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
701 './tests/worker-files/thread/testWorker.mjs'
703 expect(pool.info).toStrictEqual({
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
724 './tests/worker-files/cluster/testWorker.cjs'
726 expect(pool.info).toStrictEqual({
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
749 './tests/worker-files/cluster/testWorker.cjs'
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
759 sequentiallyStolen: 0,
764 history: new CircularArray()
767 history: new CircularArray()
771 history: new CircularArray()
774 history: new CircularArray()
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
785 './tests/worker-files/cluster/testWorker.cjs'
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
794 pool = new DynamicThreadPool(
795 Math.floor(numberOfWorkers / 2),
797 './tests/worker-files/thread/testWorker.mjs'
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode).toBeInstanceOf(WorkerNode)
801 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
802 expect(workerNode.tasksQueue.size).toBe(0)
803 expect(workerNode.tasksQueue.maxSize).toBe(0)
808 it('Verify that pool worker info are initialized', async () => {
809 let pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.cjs'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.info).toStrictEqual({
816 id: expect.any(Number),
817 type: WorkerTypes.cluster,
824 pool = new DynamicThreadPool(
825 Math.floor(numberOfWorkers / 2),
827 './tests/worker-files/thread/testWorker.mjs'
829 for (const workerNode of pool.workerNodes) {
830 expect(workerNode).toBeInstanceOf(WorkerNode)
831 expect(workerNode.info).toStrictEqual({
832 id: expect.any(Number),
833 type: WorkerTypes.thread,
842 it('Verify that pool statuses are checked at start or destroy', async () => {
843 const pool = new FixedThreadPool(
845 './tests/worker-files/thread/testWorker.mjs'
847 expect(pool.info.started).toBe(true)
848 expect(pool.info.ready).toBe(true)
849 expect(() => pool.start()).toThrow(
850 new Error('Cannot start an already started pool')
853 expect(pool.info.started).toBe(false)
854 expect(pool.info.ready).toBe(false)
855 await expect(pool.destroy()).rejects.toThrow(
856 new Error('Cannot destroy an already destroyed pool')
860 it('Verify that pool can be started after initialization', async () => {
861 const pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.cjs',
868 expect(pool.info.started).toBe(false)
869 expect(pool.info.ready).toBe(false)
870 expect(pool.workerNodes).toStrictEqual([])
871 expect(pool.readyEventEmitted).toBe(false)
872 await expect(pool.execute()).rejects.toThrow(
873 new Error('Cannot execute a task on not started pool')
876 expect(pool.info.started).toBe(true)
877 expect(pool.info.ready).toBe(true)
878 await waitPoolEvents(pool, PoolEvents.ready, 1)
879 expect(pool.readyEventEmitted).toBe(true)
880 expect(pool.workerNodes.length).toBe(numberOfWorkers)
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
887 it('Verify that pool execute() arguments are checked', async () => {
888 const pool = new FixedClusterPool(
890 './tests/worker-files/cluster/testWorker.cjs'
892 await expect(pool.execute(undefined, 0)).rejects.toThrow(
893 new TypeError('name argument must be a string')
895 await expect(pool.execute(undefined, '')).rejects.toThrow(
896 new TypeError('name argument must not be an empty string')
898 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
899 new TypeError('transferList argument must be an array')
901 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
902 "Task function 'unknown' not found"
905 await expect(pool.execute()).rejects.toThrow(
906 new Error('Cannot execute a task on not started pool')
910 it('Verify that pool worker tasks usage are computed', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.cjs'
915 const promises = new Set()
916 const maxMultiplier = 2
917 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
918 promises.add(pool.execute())
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode.usage).toStrictEqual({
924 executing: maxMultiplier,
927 sequentiallyStolen: 0,
932 history: expect.any(CircularArray)
935 history: expect.any(CircularArray)
939 history: expect.any(CircularArray)
942 history: expect.any(CircularArray)
947 await Promise.all(promises)
948 for (const workerNode of pool.workerNodes) {
949 expect(workerNode.usage).toStrictEqual({
951 executed: maxMultiplier,
955 sequentiallyStolen: 0,
960 history: expect.any(CircularArray)
963 history: expect.any(CircularArray)
967 history: expect.any(CircularArray)
970 history: expect.any(CircularArray)
978 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
979 const pool = new DynamicThreadPool(
980 Math.floor(numberOfWorkers / 2),
982 './tests/worker-files/thread/testWorker.mjs'
984 const promises = new Set()
985 const maxMultiplier = 2
986 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
987 promises.add(pool.execute())
989 await Promise.all(promises)
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
993 executed: expect.any(Number),
997 sequentiallyStolen: 0,
1002 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1016 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1017 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1018 numberOfWorkers * maxMultiplier
1020 expect(workerNode.usage.runTime.history.length).toBe(0)
1021 expect(workerNode.usage.waitTime.history.length).toBe(0)
1022 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1023 expect(workerNode.usage.elu.active.history.length).toBe(0)
1025 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1026 for (const workerNode of pool.workerNodes) {
1027 expect(workerNode.usage).toStrictEqual({
1029 executed: expect.any(Number),
1033 sequentiallyStolen: 0,
1038 history: expect.any(CircularArray)
1041 history: expect.any(CircularArray)
1045 history: expect.any(CircularArray)
1048 history: expect.any(CircularArray)
1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1056 expect(workerNode.usage.runTime.history.length).toBe(0)
1057 expect(workerNode.usage.waitTime.history.length).toBe(0)
1058 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1059 expect(workerNode.usage.elu.active.history.length).toBe(0)
1061 await pool.destroy()
1064 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1065 const pool = new DynamicClusterPool(
1066 Math.floor(numberOfWorkers / 2),
1068 './tests/worker-files/cluster/testWorker.cjs'
1070 expect(pool.emitter.eventNames()).toStrictEqual([])
1073 pool.emitter.on(PoolEvents.ready, info => {
1077 await waitPoolEvents(pool, PoolEvents.ready, 1)
1078 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1079 expect(poolReady).toBe(1)
1080 expect(poolInfo).toStrictEqual({
1082 type: PoolTypes.dynamic,
1083 worker: WorkerTypes.cluster,
1086 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1087 strategyRetries: expect.any(Number),
1088 minSize: expect.any(Number),
1089 maxSize: expect.any(Number),
1090 workerNodes: expect.any(Number),
1091 idleWorkerNodes: expect.any(Number),
1092 busyWorkerNodes: expect.any(Number),
1093 executedTasks: expect.any(Number),
1094 executingTasks: expect.any(Number),
1095 failedTasks: expect.any(Number)
1097 await pool.destroy()
1100 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1101 const pool = new FixedThreadPool(
1103 './tests/worker-files/thread/testWorker.mjs'
1105 expect(pool.emitter.eventNames()).toStrictEqual([])
1106 const promises = new Set()
1109 pool.emitter.on(PoolEvents.busy, info => {
1113 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1114 for (let i = 0; i < numberOfWorkers * 2; i++) {
1115 promises.add(pool.execute())
1117 await Promise.all(promises)
1118 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1119 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1120 expect(poolBusy).toBe(numberOfWorkers + 1)
1121 expect(poolInfo).toStrictEqual({
1123 type: PoolTypes.fixed,
1124 worker: WorkerTypes.thread,
1127 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1128 strategyRetries: expect.any(Number),
1129 minSize: expect.any(Number),
1130 maxSize: expect.any(Number),
1131 workerNodes: expect.any(Number),
1132 idleWorkerNodes: expect.any(Number),
1133 busyWorkerNodes: expect.any(Number),
1134 executedTasks: expect.any(Number),
1135 executingTasks: expect.any(Number),
1136 failedTasks: expect.any(Number)
1138 await pool.destroy()
1141 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1142 const pool = new DynamicThreadPool(
1143 Math.floor(numberOfWorkers / 2),
1145 './tests/worker-files/thread/testWorker.mjs'
1147 expect(pool.emitter.eventNames()).toStrictEqual([])
1148 const promises = new Set()
1151 pool.emitter.on(PoolEvents.full, info => {
1155 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1156 for (let i = 0; i < numberOfWorkers * 2; i++) {
1157 promises.add(pool.execute())
1159 await Promise.all(promises)
1160 expect(poolFull).toBe(1)
1161 expect(poolInfo).toStrictEqual({
1163 type: PoolTypes.dynamic,
1164 worker: WorkerTypes.thread,
1167 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1168 strategyRetries: expect.any(Number),
1169 minSize: expect.any(Number),
1170 maxSize: expect.any(Number),
1171 workerNodes: expect.any(Number),
1172 idleWorkerNodes: expect.any(Number),
1173 busyWorkerNodes: expect.any(Number),
1174 executedTasks: expect.any(Number),
1175 executingTasks: expect.any(Number),
1176 failedTasks: expect.any(Number)
1178 await pool.destroy()
1181 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1182 const pool = new FixedThreadPool(
1184 './tests/worker-files/thread/testWorker.mjs',
1186 enableTasksQueue: true
1189 stub(pool, 'hasBackPressure').returns(true)
1190 expect(pool.emitter.eventNames()).toStrictEqual([])
1191 const promises = new Set()
1192 let poolBackPressure = 0
1194 pool.emitter.on(PoolEvents.backPressure, info => {
1198 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1199 for (let i = 0; i < numberOfWorkers + 1; i++) {
1200 promises.add(pool.execute())
1202 await Promise.all(promises)
1203 expect(poolBackPressure).toBe(1)
1204 expect(poolInfo).toStrictEqual({
1206 type: PoolTypes.fixed,
1207 worker: WorkerTypes.thread,
1210 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1211 strategyRetries: expect.any(Number),
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 stealingWorkerNodes: expect.any(Number),
1217 busyWorkerNodes: expect.any(Number),
1218 executedTasks: expect.any(Number),
1219 executingTasks: expect.any(Number),
1220 maxQueuedTasks: expect.any(Number),
1221 queuedTasks: expect.any(Number),
1223 stolenTasks: expect.any(Number),
1224 failedTasks: expect.any(Number)
1226 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1227 await pool.destroy()
1230 it('Verify that destroy() waits for queued tasks to finish', async () => {
1231 const tasksFinishedTimeout = 2500
1232 const pool = new FixedThreadPool(
1234 './tests/worker-files/thread/asyncWorker.mjs',
1236 enableTasksQueue: true,
1237 tasksQueueOptions: { tasksFinishedTimeout }
1240 const maxMultiplier = 4
1241 let tasksFinished = 0
1242 for (const workerNode of pool.workerNodes) {
1243 workerNode.on('taskFinished', () => {
1247 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1250 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1251 const startTime = performance.now()
1252 await pool.destroy()
1253 const elapsedTime = performance.now() - startTime
1254 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1255 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1256 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1259 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1260 const tasksFinishedTimeout = 1000
1261 const pool = new FixedThreadPool(
1263 './tests/worker-files/thread/asyncWorker.mjs',
1265 enableTasksQueue: true,
1266 tasksQueueOptions: { tasksFinishedTimeout }
1269 const maxMultiplier = 4
1270 let tasksFinished = 0
1271 for (const workerNode of pool.workerNodes) {
1272 workerNode.on('taskFinished', () => {
1276 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1279 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1280 const startTime = performance.now()
1281 await pool.destroy()
1282 const elapsedTime = performance.now() - startTime
1283 expect(tasksFinished).toBe(0)
1284 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1287 it('Verify that pool asynchronous resource track tasks execution', async () => {
1292 let resolveCalls = 0
1293 const hook = createHook({
1294 init (asyncId, type) {
1295 if (type === 'poolifier:task') {
1297 taskAsyncId = asyncId
1301 if (asyncId === taskAsyncId) beforeCalls++
1304 if (asyncId === taskAsyncId) afterCalls++
1307 if (executionAsyncId() === taskAsyncId) resolveCalls++
1310 const pool = new FixedThreadPool(
1312 './tests/worker-files/thread/testWorker.mjs'
1315 await pool.execute()
1317 expect(initCalls).toBe(1)
1318 expect(beforeCalls).toBe(1)
1319 expect(afterCalls).toBe(1)
1320 expect(resolveCalls).toBe(1)
1321 await pool.destroy()
1324 it('Verify that hasTaskFunction() is working', async () => {
1325 const dynamicThreadPool = new DynamicThreadPool(
1326 Math.floor(numberOfWorkers / 2),
1328 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1330 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1331 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1335 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1338 await dynamicThreadPool.destroy()
1339 const fixedClusterPool = new FixedClusterPool(
1341 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1343 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1344 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1348 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1349 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1350 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1351 await fixedClusterPool.destroy()
1354 it('Verify that addTaskFunction() is working', async () => {
1355 const dynamicThreadPool = new DynamicThreadPool(
1356 Math.floor(numberOfWorkers / 2),
1358 './tests/worker-files/thread/testWorker.mjs'
1360 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1362 dynamicThreadPool.addTaskFunction(0, () => {})
1363 ).rejects.toThrow(new TypeError('name argument must be a string'))
1365 dynamicThreadPool.addTaskFunction('', () => {})
1367 new TypeError('name argument must not be an empty string')
1369 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1370 new TypeError('taskFunction property must be a function')
1372 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1373 new TypeError('taskFunction property must be a function')
1376 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1377 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1379 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1380 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1382 dynamicThreadPool.addTaskFunction('test', {
1383 taskFunction: () => {},
1387 new RangeError("Property 'priority' must be between -20 and 19")
1390 dynamicThreadPool.addTaskFunction('test', {
1391 taskFunction: () => {},
1395 new RangeError("Property 'priority' must be between -20 and 19")
1398 dynamicThreadPool.addTaskFunction('test', {
1399 taskFunction: () => {},
1400 strategy: 'invalidStrategy'
1403 new Error("Invalid worker choice strategy 'invalidStrategy'")
1405 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1406 { name: DEFAULT_TASK_NAME },
1410 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1411 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1412 const echoTaskFunction = data => {
1416 dynamicThreadPool.addTaskFunction('echo', {
1417 taskFunction: echoTaskFunction,
1418 strategy: WorkerChoiceStrategies.LEAST_ELU
1420 ).resolves.toBe(true)
1421 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1422 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1423 taskFunction: echoTaskFunction,
1424 strategy: WorkerChoiceStrategies.LEAST_ELU
1427 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1429 WorkerChoiceStrategies.ROUND_ROBIN,
1430 WorkerChoiceStrategies.LEAST_ELU
1432 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1433 { name: DEFAULT_TASK_NAME },
1435 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1437 const taskFunctionData = { test: 'test' }
1438 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1439 expect(echoResult).toStrictEqual(taskFunctionData)
1440 for (const workerNode of dynamicThreadPool.workerNodes) {
1441 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1443 executed: expect.any(Number),
1446 sequentiallyStolen: 0,
1451 history: new CircularArray()
1454 history: new CircularArray()
1456 elu: expect.objectContaining({
1457 idle: expect.objectContaining({
1458 history: expect.any(CircularArray)
1460 active: expect.objectContaining({
1461 history: expect.any(CircularArray)
1466 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1467 ).toBeGreaterThan(0)
1469 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1473 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1477 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1478 ).toBeGreaterThan(0)
1481 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1484 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1488 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1489 ).toBeGreaterThanOrEqual(0)
1492 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1495 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1499 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1500 ).toBeGreaterThanOrEqual(0)
1502 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1503 ).toBeLessThanOrEqual(1)
1506 await dynamicThreadPool.destroy()
1509 it('Verify that removeTaskFunction() is working', async () => {
1510 const dynamicThreadPool = new DynamicThreadPool(
1511 Math.floor(numberOfWorkers / 2),
1513 './tests/worker-files/thread/testWorker.mjs'
1515 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1516 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1517 { name: DEFAULT_TASK_NAME },
1520 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1521 new Error('Cannot remove a task function not handled on the pool side')
1523 const echoTaskFunction = data => {
1526 await dynamicThreadPool.addTaskFunction('echo', {
1527 taskFunction: echoTaskFunction,
1528 strategy: WorkerChoiceStrategies.LEAST_ELU
1530 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1531 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1532 taskFunction: echoTaskFunction,
1533 strategy: WorkerChoiceStrategies.LEAST_ELU
1536 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1538 WorkerChoiceStrategies.ROUND_ROBIN,
1539 WorkerChoiceStrategies.LEAST_ELU
1541 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1542 { name: DEFAULT_TASK_NAME },
1544 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1546 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1549 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1550 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1552 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1553 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1554 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1555 { name: DEFAULT_TASK_NAME },
1558 await dynamicThreadPool.destroy()
1561 it('Verify that listTaskFunctionsProperties() is working', async () => {
1562 const dynamicThreadPool = new DynamicThreadPool(
1563 Math.floor(numberOfWorkers / 2),
1565 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1567 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1568 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1569 { name: DEFAULT_TASK_NAME },
1570 { name: 'jsonIntegerSerialization' },
1571 { name: 'factorial' },
1572 { name: 'fibonacci' }
1574 await dynamicThreadPool.destroy()
1575 const fixedClusterPool = new FixedClusterPool(
1577 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1579 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1580 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1581 { name: DEFAULT_TASK_NAME },
1582 { name: 'jsonIntegerSerialization' },
1583 { name: 'factorial' },
1584 { name: 'fibonacci' }
1586 await fixedClusterPool.destroy()
1589 it('Verify that setDefaultTaskFunction() is working', async () => {
1590 const dynamicThreadPool = new DynamicThreadPool(
1591 Math.floor(numberOfWorkers / 2),
1593 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1595 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1596 const workerId = dynamicThreadPool.workerNodes[0].info.id
1597 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1599 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1603 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1606 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1610 dynamicThreadPool.setDefaultTaskFunction('unknown')
1613 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1616 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1617 { name: DEFAULT_TASK_NAME },
1618 { name: 'jsonIntegerSerialization' },
1619 { name: 'factorial' },
1620 { name: 'fibonacci' }
1623 dynamicThreadPool.setDefaultTaskFunction('factorial')
1624 ).resolves.toBe(true)
1625 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1626 { name: DEFAULT_TASK_NAME },
1627 { name: 'factorial' },
1628 { name: 'jsonIntegerSerialization' },
1629 { name: 'fibonacci' }
1632 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1633 ).resolves.toBe(true)
1634 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1635 { name: DEFAULT_TASK_NAME },
1636 { name: 'fibonacci' },
1637 { name: 'jsonIntegerSerialization' },
1638 { name: 'factorial' }
1640 await dynamicThreadPool.destroy()
1643 it('Verify that multiple task functions worker is working', async () => {
1644 const pool = new DynamicClusterPool(
1645 Math.floor(numberOfWorkers / 2),
1647 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1649 const data = { n: 10 }
1650 const result0 = await pool.execute(data)
1651 expect(result0).toStrictEqual({ ok: 1 })
1652 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1653 expect(result1).toStrictEqual({ ok: 1 })
1654 const result2 = await pool.execute(data, 'factorial')
1655 expect(result2).toBe(3628800)
1656 const result3 = await pool.execute(data, 'fibonacci')
1657 expect(result3).toBe(55)
1658 expect(pool.info.executingTasks).toBe(0)
1659 expect(pool.info.executedTasks).toBe(4)
1660 for (const workerNode of pool.workerNodes) {
1661 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1662 { name: DEFAULT_TASK_NAME },
1663 { name: 'jsonIntegerSerialization' },
1664 { name: 'factorial' },
1665 { name: 'fibonacci' }
1667 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1668 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1670 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1673 executed: expect.any(Number),
1677 sequentiallyStolen: 0,
1681 history: expect.any(CircularArray)
1684 history: expect.any(CircularArray)
1688 history: expect.any(CircularArray)
1691 history: expect.any(CircularArray)
1696 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1698 ).toBeGreaterThan(0)
1701 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1703 workerNode.getTaskFunctionWorkerUsage(
1704 workerNode.info.taskFunctionsProperties[1].name
1708 await pool.destroy()
1711 it('Verify sendKillMessageToWorker()', async () => {
1712 const pool = new DynamicClusterPool(
1713 Math.floor(numberOfWorkers / 2),
1715 './tests/worker-files/cluster/testWorker.cjs'
1717 const workerNodeKey = 0
1719 pool.sendKillMessageToWorker(workerNodeKey)
1720 ).resolves.toBeUndefined()
1721 await pool.destroy()
1724 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1725 const pool = new DynamicClusterPool(
1726 Math.floor(numberOfWorkers / 2),
1728 './tests/worker-files/cluster/testWorker.cjs'
1730 const workerNodeKey = 0
1732 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1733 taskFunctionOperation: 'add',
1734 taskFunctionProperties: { name: 'empty' },
1735 taskFunction: (() => {}).toString()
1737 ).resolves.toBe(true)
1739 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1741 { name: DEFAULT_TASK_NAME },
1745 await pool.destroy()
1748 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1749 const pool = new DynamicClusterPool(
1750 Math.floor(numberOfWorkers / 2),
1752 './tests/worker-files/cluster/testWorker.cjs'
1755 pool.sendTaskFunctionOperationToWorkers({
1756 taskFunctionOperation: 'add',
1757 taskFunctionProperties: { name: 'empty' },
1758 taskFunction: (() => {}).toString()
1760 ).resolves.toBe(true)
1761 for (const workerNode of pool.workerNodes) {
1762 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1763 { name: DEFAULT_TASK_NAME },
1768 await pool.destroy()