1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
11 import { CircularArray } from '../../lib/circular-array.cjs'
12 import { Deque } from '../../lib/deque.cjs'
20 WorkerChoiceStrategies,
22 } from '../../lib/index.cjs'
23 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
48 './tests/worker-files/thread/testWorker.mjs'
50 expect(pool).toBeInstanceOf(FixedThreadPool)
54 it('Verify that pool cannot be created from a non main thread/process', () => {
57 new StubPoolWithIsMain(
59 './tests/worker-files/thread/testWorker.mjs',
61 errorHandler: e => console.error(e)
66 'Cannot start a pool from a worker with the same type as the pool'
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
74 './tests/worker-files/thread/testWorker.mjs'
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
94 it('Verify that numberOfWorkers is checked', () => {
99 './tests/worker-files/thread/testWorker.mjs'
103 'Cannot instantiate a pool without specifying the number of workers'
108 it('Verify that a negative number of workers is checked', () => {
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
114 'Cannot instantiate a pool with a negative number of workers'
119 it('Verify that a non integer number of workers is checked', () => {
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
125 'Cannot instantiate a pool with a non safe integer number of workers'
130 it('Verify that pool arguments number and pool type are checked', () => {
135 './tests/worker-files/thread/testWorker.mjs',
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
146 it('Verify that dynamic pool sizing is checked', () => {
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.cjs'
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a pool with a non safe integer number of workers'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.cjs'
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 new DynamicThreadPool(
188 './tests/worker-files/thread/testWorker.mjs'
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 new DynamicThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
209 new DynamicClusterPool(
212 './tests/worker-files/cluster/testWorker.cjs'
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
224 './tests/worker-files/thread/testWorker.mjs'
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.mjs',
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 }
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
276 size: Math.pow(numberOfWorkers, 2),
278 tasksStealingOnBackPressure: true,
279 tasksFinishedTimeout: 2000
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 }
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: { weights: {} }
325 'Invalid worker choice strategy options: must have a weight for each worker node'
332 './tests/worker-files/thread/testWorker.mjs',
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 './tests/worker-files/thread/testWorker.mjs',
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
353 new TypeError('Invalid tasks queue options: must be a plain object')
359 './tests/worker-files/thread/testWorker.mjs',
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 }
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.mjs',
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.mjs',
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false }
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
563 'Invalid worker choice strategy options: must be a plain object'
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
584 './tests/worker-files/thread/testWorker.mjs'
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 size: Math.pow(numberOfWorkers, 2),
594 tasksStealingOnBackPressure: true,
595 tasksFinishedTimeout: 2000
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 size: Math.pow(numberOfWorkers, 2),
603 tasksStealingOnBackPressure: true,
604 tasksFinishedTimeout: 2000
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 size: Math.pow(numberOfWorkers, 2),
622 tasksStealingOnBackPressure: true,
623 tasksFinishedTimeout: 2000
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
630 pool.setTasksQueueOptions({
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
649 pool.setTasksQueueOptions({
652 tasksStealingOnBackPressure: true
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
701 './tests/worker-files/thread/testWorker.mjs'
703 expect(pool.info).toStrictEqual({
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
724 './tests/worker-files/cluster/testWorker.cjs'
726 expect(pool.info).toStrictEqual({
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
749 './tests/worker-files/cluster/testWorker.cjs'
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
759 sequentiallyStolen: 0,
764 history: new CircularArray()
767 history: new CircularArray()
771 history: new CircularArray()
774 history: new CircularArray()
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
785 './tests/worker-files/cluster/testWorker.cjs'
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
794 pool = new DynamicThreadPool(
795 Math.floor(numberOfWorkers / 2),
797 './tests/worker-files/thread/testWorker.mjs'
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode).toBeInstanceOf(WorkerNode)
801 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
802 expect(workerNode.tasksQueue.size).toBe(0)
803 expect(workerNode.tasksQueue.maxSize).toBe(0)
808 it('Verify that pool worker info are initialized', async () => {
809 let pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.cjs'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.info).toStrictEqual({
816 id: expect.any(Number),
817 type: WorkerTypes.cluster,
824 pool = new DynamicThreadPool(
825 Math.floor(numberOfWorkers / 2),
827 './tests/worker-files/thread/testWorker.mjs'
829 for (const workerNode of pool.workerNodes) {
830 expect(workerNode).toBeInstanceOf(WorkerNode)
831 expect(workerNode.info).toStrictEqual({
832 id: expect.any(Number),
833 type: WorkerTypes.thread,
842 it('Verify that pool statuses are checked at start or destroy', async () => {
843 const pool = new FixedThreadPool(
845 './tests/worker-files/thread/testWorker.mjs'
847 expect(pool.info.started).toBe(true)
848 expect(pool.info.ready).toBe(true)
849 expect(() => pool.start()).toThrow(
850 new Error('Cannot start an already started pool')
853 expect(pool.info.started).toBe(false)
854 expect(pool.info.ready).toBe(false)
855 await expect(pool.destroy()).rejects.toThrow(
856 new Error('Cannot destroy an already destroyed pool')
860 it('Verify that pool can be started after initialization', async () => {
861 const pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.cjs',
868 expect(pool.info.started).toBe(false)
869 expect(pool.info.ready).toBe(false)
870 expect(pool.workerNodes).toStrictEqual([])
871 expect(pool.readyEventEmitted).toBe(false)
872 await expect(pool.execute()).rejects.toThrow(
873 new Error('Cannot execute a task on not started pool')
876 expect(pool.info.started).toBe(true)
877 expect(pool.info.ready).toBe(true)
878 await waitPoolEvents(pool, PoolEvents.ready, 1)
879 expect(pool.readyEventEmitted).toBe(true)
880 expect(pool.workerNodes.length).toBe(numberOfWorkers)
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
887 it('Verify that pool execute() arguments are checked', async () => {
888 const pool = new FixedClusterPool(
890 './tests/worker-files/cluster/testWorker.cjs'
892 await expect(pool.execute(undefined, 0)).rejects.toThrow(
893 new TypeError('name argument must be a string')
895 await expect(pool.execute(undefined, '')).rejects.toThrow(
896 new TypeError('name argument must not be an empty string')
898 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
899 new TypeError('transferList argument must be an array')
901 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
902 "Task function 'unknown' not found"
905 await expect(pool.execute()).rejects.toThrow(
906 new Error('Cannot execute a task on not started pool')
910 it('Verify that pool worker tasks usage are computed', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.cjs'
915 const promises = new Set()
916 const maxMultiplier = 2
917 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
918 promises.add(pool.execute())
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode.usage).toStrictEqual({
924 executing: maxMultiplier,
927 sequentiallyStolen: 0,
932 history: expect.any(CircularArray)
935 history: expect.any(CircularArray)
939 history: expect.any(CircularArray)
942 history: expect.any(CircularArray)
947 await Promise.all(promises)
948 for (const workerNode of pool.workerNodes) {
949 expect(workerNode.usage).toStrictEqual({
951 executed: maxMultiplier,
955 sequentiallyStolen: 0,
960 history: expect.any(CircularArray)
963 history: expect.any(CircularArray)
967 history: expect.any(CircularArray)
970 history: expect.any(CircularArray)
978 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
979 const pool = new DynamicThreadPool(
980 Math.floor(numberOfWorkers / 2),
982 './tests/worker-files/thread/testWorker.mjs'
984 const promises = new Set()
985 const maxMultiplier = 2
986 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
987 promises.add(pool.execute())
989 await Promise.all(promises)
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
993 executed: expect.any(Number),
997 sequentiallyStolen: 0,
1002 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1016 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1017 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1018 numberOfWorkers * maxMultiplier
1020 expect(workerNode.usage.runTime.history.length).toBe(0)
1021 expect(workerNode.usage.waitTime.history.length).toBe(0)
1022 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1023 expect(workerNode.usage.elu.active.history.length).toBe(0)
1025 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1026 for (const workerNode of pool.workerNodes) {
1027 expect(workerNode.usage).toStrictEqual({
1029 executed: expect.any(Number),
1033 sequentiallyStolen: 0,
1038 history: expect.any(CircularArray)
1041 history: expect.any(CircularArray)
1045 history: expect.any(CircularArray)
1048 history: expect.any(CircularArray)
1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1056 expect(workerNode.usage.runTime.history.length).toBe(0)
1057 expect(workerNode.usage.waitTime.history.length).toBe(0)
1058 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1059 expect(workerNode.usage.elu.active.history.length).toBe(0)
1061 await pool.destroy()
1064 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1065 const pool = new DynamicClusterPool(
1066 Math.floor(numberOfWorkers / 2),
1068 './tests/worker-files/cluster/testWorker.cjs'
1070 expect(pool.emitter.eventNames()).toStrictEqual([])
1073 pool.emitter.on(PoolEvents.ready, info => {
1077 await waitPoolEvents(pool, PoolEvents.ready, 1)
1078 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1079 expect(poolReady).toBe(1)
1080 expect(poolInfo).toStrictEqual({
1082 type: PoolTypes.dynamic,
1083 worker: WorkerTypes.cluster,
1086 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1087 strategyRetries: expect.any(Number),
1088 minSize: expect.any(Number),
1089 maxSize: expect.any(Number),
1090 workerNodes: expect.any(Number),
1091 idleWorkerNodes: expect.any(Number),
1092 busyWorkerNodes: expect.any(Number),
1093 executedTasks: expect.any(Number),
1094 executingTasks: expect.any(Number),
1095 failedTasks: expect.any(Number)
1097 await pool.destroy()
1100 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1101 const pool = new FixedThreadPool(
1103 './tests/worker-files/thread/testWorker.mjs'
1105 expect(pool.emitter.eventNames()).toStrictEqual([])
1106 const promises = new Set()
1109 pool.emitter.on(PoolEvents.busy, info => {
1113 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1114 for (let i = 0; i < numberOfWorkers * 2; i++) {
1115 promises.add(pool.execute())
1117 await Promise.all(promises)
1118 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1119 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1120 expect(poolBusy).toBe(numberOfWorkers + 1)
1121 expect(poolInfo).toStrictEqual({
1123 type: PoolTypes.fixed,
1124 worker: WorkerTypes.thread,
1127 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1128 strategyRetries: expect.any(Number),
1129 minSize: expect.any(Number),
1130 maxSize: expect.any(Number),
1131 workerNodes: expect.any(Number),
1132 idleWorkerNodes: expect.any(Number),
1133 busyWorkerNodes: expect.any(Number),
1134 executedTasks: expect.any(Number),
1135 executingTasks: expect.any(Number),
1136 failedTasks: expect.any(Number)
1138 await pool.destroy()
1141 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1142 const pool = new DynamicThreadPool(
1143 Math.floor(numberOfWorkers / 2),
1145 './tests/worker-files/thread/testWorker.mjs'
1147 expect(pool.emitter.eventNames()).toStrictEqual([])
1148 const promises = new Set()
1151 pool.emitter.on(PoolEvents.full, info => {
1155 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1156 for (let i = 0; i < numberOfWorkers * 2; i++) {
1157 promises.add(pool.execute())
1159 await Promise.all(promises)
1160 expect(poolFull).toBe(1)
1161 expect(poolInfo).toStrictEqual({
1163 type: PoolTypes.dynamic,
1164 worker: WorkerTypes.thread,
1167 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1168 strategyRetries: expect.any(Number),
1169 minSize: expect.any(Number),
1170 maxSize: expect.any(Number),
1171 workerNodes: expect.any(Number),
1172 idleWorkerNodes: expect.any(Number),
1173 busyWorkerNodes: expect.any(Number),
1174 executedTasks: expect.any(Number),
1175 executingTasks: expect.any(Number),
1176 failedTasks: expect.any(Number)
1178 await pool.destroy()
1181 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1182 const pool = new FixedThreadPool(
1184 './tests/worker-files/thread/testWorker.mjs',
1186 enableTasksQueue: true
1189 stub(pool, 'hasBackPressure').returns(true)
1190 expect(pool.emitter.eventNames()).toStrictEqual([])
1191 const promises = new Set()
1192 let poolBackPressure = 0
1194 pool.emitter.on(PoolEvents.backPressure, info => {
1198 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1199 for (let i = 0; i < numberOfWorkers + 1; i++) {
1200 promises.add(pool.execute())
1202 await Promise.all(promises)
1203 expect(poolBackPressure).toBe(1)
1204 expect(poolInfo).toStrictEqual({
1206 type: PoolTypes.fixed,
1207 worker: WorkerTypes.thread,
1210 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1211 strategyRetries: expect.any(Number),
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 stealingWorkerNodes: expect.any(Number),
1217 busyWorkerNodes: expect.any(Number),
1218 executedTasks: expect.any(Number),
1219 executingTasks: expect.any(Number),
1220 maxQueuedTasks: expect.any(Number),
1221 queuedTasks: expect.any(Number),
1223 stolenTasks: expect.any(Number),
1224 failedTasks: expect.any(Number)
1226 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1227 await pool.destroy()
1230 it('Verify that destroy() waits for queued tasks to finish', async () => {
1231 const tasksFinishedTimeout = 2500
1232 const pool = new FixedThreadPool(
1234 './tests/worker-files/thread/asyncWorker.mjs',
1236 enableTasksQueue: true,
1237 tasksQueueOptions: { tasksFinishedTimeout }
1240 const maxMultiplier = 4
1241 let tasksFinished = 0
1242 for (const workerNode of pool.workerNodes) {
1243 workerNode.on('taskFinished', () => {
1247 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1250 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1251 const startTime = performance.now()
1252 await pool.destroy()
1253 const elapsedTime = performance.now() - startTime
1254 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1255 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1256 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1259 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1260 const tasksFinishedTimeout = 1000
1261 const pool = new FixedThreadPool(
1263 './tests/worker-files/thread/asyncWorker.mjs',
1265 enableTasksQueue: true,
1266 tasksQueueOptions: { tasksFinishedTimeout }
1269 const maxMultiplier = 4
1270 let tasksFinished = 0
1271 for (const workerNode of pool.workerNodes) {
1272 workerNode.on('taskFinished', () => {
1276 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1279 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1280 const startTime = performance.now()
1281 await pool.destroy()
1282 const elapsedTime = performance.now() - startTime
1283 expect(tasksFinished).toBe(0)
1284 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1287 it('Verify that pool asynchronous resource track tasks execution', async () => {
1292 let resolveCalls = 0
1293 const hook = createHook({
1294 init (asyncId, type) {
1295 if (type === 'poolifier:task') {
1297 taskAsyncId = asyncId
1301 if (asyncId === taskAsyncId) beforeCalls++
1304 if (asyncId === taskAsyncId) afterCalls++
1307 if (executionAsyncId() === taskAsyncId) resolveCalls++
1310 const pool = new FixedThreadPool(
1312 './tests/worker-files/thread/testWorker.mjs'
1315 await pool.execute()
1317 expect(initCalls).toBe(1)
1318 expect(beforeCalls).toBe(1)
1319 expect(afterCalls).toBe(1)
1320 expect(resolveCalls).toBe(1)
1321 await pool.destroy()
1324 it('Verify that hasTaskFunction() is working', async () => {
1325 const dynamicThreadPool = new DynamicThreadPool(
1326 Math.floor(numberOfWorkers / 2),
1328 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1330 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1331 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1335 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1338 await dynamicThreadPool.destroy()
1339 const fixedClusterPool = new FixedClusterPool(
1341 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1343 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1344 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1348 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1349 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1350 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1351 await fixedClusterPool.destroy()
1354 it('Verify that addTaskFunction() is working', async () => {
1355 const dynamicThreadPool = new DynamicThreadPool(
1356 Math.floor(numberOfWorkers / 2),
1358 './tests/worker-files/thread/testWorker.mjs'
1360 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1362 dynamicThreadPool.addTaskFunction(0, () => {})
1363 ).rejects.toThrow(new TypeError('name argument must be a string'))
1365 dynamicThreadPool.addTaskFunction('', () => {})
1367 new TypeError('name argument must not be an empty string')
1369 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1370 new TypeError('taskFunction property must be a function')
1372 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1373 new TypeError('taskFunction property must be a function')
1375 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1376 { name: DEFAULT_TASK_NAME },
1379 const echoTaskFunction = data => {
1383 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1384 ).resolves.toBe(true)
1385 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1386 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1387 taskFunction: echoTaskFunction
1389 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1390 { name: DEFAULT_TASK_NAME },
1394 const taskFunctionData = { test: 'test' }
1395 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1396 expect(echoResult).toStrictEqual(taskFunctionData)
1397 for (const workerNode of dynamicThreadPool.workerNodes) {
1398 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1400 executed: expect.any(Number),
1403 sequentiallyStolen: 0,
1408 history: new CircularArray()
1411 history: new CircularArray()
1415 history: new CircularArray()
1418 history: new CircularArray()
1423 await dynamicThreadPool.destroy()
1426 it('Verify that removeTaskFunction() is working', async () => {
1427 const dynamicThreadPool = new DynamicThreadPool(
1428 Math.floor(numberOfWorkers / 2),
1430 './tests/worker-files/thread/testWorker.mjs'
1432 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1433 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1434 { name: DEFAULT_TASK_NAME },
1437 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1438 new Error('Cannot remove a task function not handled on the pool side')
1440 const echoTaskFunction = data => {
1443 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1444 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1445 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1446 taskFunction: echoTaskFunction
1448 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1449 { name: DEFAULT_TASK_NAME },
1453 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1456 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1457 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1458 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1459 { name: DEFAULT_TASK_NAME },
1462 await dynamicThreadPool.destroy()
1465 it('Verify that listTaskFunctionNames() is working', async () => {
1466 const dynamicThreadPool = new DynamicThreadPool(
1467 Math.floor(numberOfWorkers / 2),
1469 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1471 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1472 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1473 { name: DEFAULT_TASK_NAME },
1474 { name: 'jsonIntegerSerialization' },
1475 { name: 'factorial' },
1476 { name: 'fibonacci' }
1478 await dynamicThreadPool.destroy()
1479 const fixedClusterPool = new FixedClusterPool(
1481 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1483 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1484 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1485 { name: DEFAULT_TASK_NAME },
1486 { name: 'jsonIntegerSerialization' },
1487 { name: 'factorial' },
1488 { name: 'fibonacci' }
1490 await fixedClusterPool.destroy()
1493 it('Verify that setDefaultTaskFunction() is working', async () => {
1494 const dynamicThreadPool = new DynamicThreadPool(
1495 Math.floor(numberOfWorkers / 2),
1497 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1499 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1500 const workerId = dynamicThreadPool.workerNodes[0].info.id
1501 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1503 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1507 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1510 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1514 dynamicThreadPool.setDefaultTaskFunction('unknown')
1517 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1520 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1521 { name: DEFAULT_TASK_NAME },
1522 { name: 'jsonIntegerSerialization' },
1523 { name: 'factorial' },
1524 { name: 'fibonacci' }
1527 dynamicThreadPool.setDefaultTaskFunction('factorial')
1528 ).resolves.toBe(true)
1529 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1530 { name: DEFAULT_TASK_NAME },
1531 { name: 'factorial' },
1532 { name: 'jsonIntegerSerialization' },
1533 { name: 'fibonacci' }
1536 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1537 ).resolves.toBe(true)
1538 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1539 { name: DEFAULT_TASK_NAME },
1540 { name: 'fibonacci' },
1541 { name: 'jsonIntegerSerialization' },
1542 { name: 'factorial' }
1544 await dynamicThreadPool.destroy()
1547 it('Verify that multiple task functions worker is working', async () => {
1548 const pool = new DynamicClusterPool(
1549 Math.floor(numberOfWorkers / 2),
1551 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1553 const data = { n: 10 }
1554 const result0 = await pool.execute(data)
1555 expect(result0).toStrictEqual({ ok: 1 })
1556 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1557 expect(result1).toStrictEqual({ ok: 1 })
1558 const result2 = await pool.execute(data, 'factorial')
1559 expect(result2).toBe(3628800)
1560 const result3 = await pool.execute(data, 'fibonacci')
1561 expect(result3).toBe(55)
1562 expect(pool.info.executingTasks).toBe(0)
1563 expect(pool.info.executedTasks).toBe(4)
1564 for (const workerNode of pool.workerNodes) {
1565 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1566 { name: DEFAULT_TASK_NAME },
1567 { name: 'jsonIntegerSerialization' },
1568 { name: 'factorial' },
1569 { name: 'fibonacci' }
1571 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1572 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1574 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1577 executed: expect.any(Number),
1581 sequentiallyStolen: 0,
1585 history: expect.any(CircularArray)
1588 history: expect.any(CircularArray)
1592 history: expect.any(CircularArray)
1595 history: expect.any(CircularArray)
1600 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1602 ).toBeGreaterThan(0)
1605 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1607 workerNode.getTaskFunctionWorkerUsage(
1608 workerNode.info.taskFunctionsProperties[1].name
1612 await pool.destroy()
1615 it('Verify sendKillMessageToWorker()', async () => {
1616 const pool = new DynamicClusterPool(
1617 Math.floor(numberOfWorkers / 2),
1619 './tests/worker-files/cluster/testWorker.cjs'
1621 const workerNodeKey = 0
1623 pool.sendKillMessageToWorker(workerNodeKey)
1624 ).resolves.toBeUndefined()
1625 await pool.destroy()
1628 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1629 const pool = new DynamicClusterPool(
1630 Math.floor(numberOfWorkers / 2),
1632 './tests/worker-files/cluster/testWorker.cjs'
1634 const workerNodeKey = 0
1636 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1637 taskFunctionOperation: 'add',
1638 taskFunctionProperties: { name: 'empty' },
1639 taskFunction: (() => {}).toString()
1641 ).resolves.toBe(true)
1643 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1645 { name: DEFAULT_TASK_NAME },
1649 await pool.destroy()
1652 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1653 const pool = new DynamicClusterPool(
1654 Math.floor(numberOfWorkers / 2),
1656 './tests/worker-files/cluster/testWorker.cjs'
1659 pool.sendTaskFunctionOperationToWorkers({
1660 taskFunctionOperation: 'add',
1661 taskFunctionProperties: { name: 'empty' },
1662 taskFunction: (() => {}).toString()
1664 ).resolves.toBe(true)
1665 for (const workerNode of pool.workerNodes) {
1666 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1667 { name: DEFAULT_TASK_NAME },
1672 await pool.destroy()