1 import { createHook, executionAsyncId } from 'node:async_hooks'
2 import { EventEmitterAsyncResource } from 'node:events'
3 import { readFileSync } from 'node:fs'
4 import { dirname, join } from 'node:path'
5 import { fileURLToPath } from 'node:url'
7 import { expect } from 'expect'
8 import { restore, stub } from 'sinon'
10 import { CircularArray } from '../../lib/circular-array.cjs'
11 import { Deque } from '../../lib/deque.cjs'
19 WorkerChoiceStrategies,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
24 import { waitPoolEvents } from '../test-utils.cjs'
26 describe('Abstract pool test suite', () => {
27 const version = JSON.parse(
29 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
33 const numberOfWorkers = 2
34 class StubPoolWithIsMain extends FixedThreadPool {
44 it('Verify that pool can be created and destroyed', async () => {
45 const pool = new FixedThreadPool(
47 './tests/worker-files/thread/testWorker.mjs'
49 expect(pool).toBeInstanceOf(FixedThreadPool)
53 it('Verify that pool cannot be created from a non main thread/process', () => {
56 new StubPoolWithIsMain(
58 './tests/worker-files/thread/testWorker.mjs',
60 errorHandler: e => console.error(e)
65 'Cannot start a pool from a worker with the same type as the pool'
70 it('Verify that pool statuses properties are set', async () => {
71 const pool = new FixedThreadPool(
73 './tests/worker-files/thread/testWorker.mjs'
75 expect(pool.started).toBe(true)
76 expect(pool.starting).toBe(false)
77 expect(pool.destroying).toBe(false)
81 it('Verify that filePath is checked', () => {
82 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
83 new TypeError('The worker file path must be specified')
85 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
86 new TypeError('The worker file path must be a string')
89 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
90 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
93 it('Verify that numberOfWorkers is checked', () => {
98 './tests/worker-files/thread/testWorker.mjs'
102 'Cannot instantiate a pool without specifying the number of workers'
107 it('Verify that a negative number of workers is checked', () => {
110 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
113 'Cannot instantiate a pool with a negative number of workers'
118 it('Verify that a non integer number of workers is checked', () => {
121 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
124 'Cannot instantiate a pool with a non safe integer number of workers'
129 it('Verify that pool arguments number and pool type are checked', () => {
134 './tests/worker-files/thread/testWorker.mjs',
140 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
145 it('Verify that dynamic pool sizing is checked', () => {
148 new DynamicClusterPool(
151 './tests/worker-files/cluster/testWorker.cjs'
155 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
160 new DynamicThreadPool(
163 './tests/worker-files/thread/testWorker.mjs'
167 'Cannot instantiate a pool with a non safe integer number of workers'
172 new DynamicClusterPool(
175 './tests/worker-files/cluster/testWorker.cjs'
179 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
184 new DynamicThreadPool(
187 './tests/worker-files/thread/testWorker.mjs'
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
196 new DynamicThreadPool(
199 './tests/worker-files/thread/testWorker.mjs'
203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
208 new DynamicClusterPool(
211 './tests/worker-files/cluster/testWorker.cjs'
215 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
220 it('Verify that pool options are checked', async () => {
221 let pool = new FixedThreadPool(
223 './tests/worker-files/thread/testWorker.mjs'
225 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
226 expect(pool.emitter.eventNames()).toStrictEqual([])
227 expect(pool.opts).toStrictEqual({
230 restartWorkerOnError: true,
231 enableTasksQueue: false,
232 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
234 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
235 .workerChoiceStrategies) {
236 expect(workerChoiceStrategy.opts).toStrictEqual({
237 runTime: { median: false },
238 waitTime: { median: false },
239 elu: { median: false },
240 weights: expect.objectContaining({
241 0: expect.any(Number),
242 [pool.info.maxSize - 1]: expect.any(Number)
247 const testHandler = () => console.info('test handler executed')
248 pool = new FixedThreadPool(
250 './tests/worker-files/thread/testWorker.mjs',
252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
253 workerChoiceStrategyOptions: {
254 runTime: { median: true },
255 weights: { 0: 300, 1: 200 }
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
260 tasksQueueOptions: { concurrency: 2 },
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
267 expect(pool.emitter).toBeUndefined()
268 expect(pool.opts).toStrictEqual({
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
275 size: Math.pow(numberOfWorkers, 2),
277 tasksStealingOnBackPressure: true,
278 tasksFinishedTimeout: 2000
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
282 runTime: { median: true },
283 weights: { 0: 300, 1: 200 }
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
293 runTime: { median: true },
294 waitTime: { median: false },
295 elu: { median: false },
296 weights: { 0: 300, 1: 200 }
302 it('Verify that pool options are validated', () => {
307 './tests/worker-files/thread/testWorker.mjs',
309 workerChoiceStrategy: 'invalidStrategy'
312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
317 './tests/worker-files/thread/testWorker.mjs',
319 workerChoiceStrategyOptions: { weights: {} }
324 'Invalid worker choice strategy options: must have a weight for each worker node'
331 './tests/worker-files/thread/testWorker.mjs',
333 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
338 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
345 './tests/worker-files/thread/testWorker.mjs',
347 enableTasksQueue: true,
348 tasksQueueOptions: 'invalidTasksQueueOptions'
352 new TypeError('Invalid tasks queue options: must be a plain object')
358 './tests/worker-files/thread/testWorker.mjs',
360 enableTasksQueue: true,
361 tasksQueueOptions: { concurrency: 0 }
366 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
373 './tests/worker-files/thread/testWorker.mjs',
375 enableTasksQueue: true,
376 tasksQueueOptions: { concurrency: -1 }
381 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
388 './tests/worker-files/thread/testWorker.mjs',
390 enableTasksQueue: true,
391 tasksQueueOptions: { concurrency: 0.2 }
395 new TypeError('Invalid worker node tasks concurrency: must be an integer')
401 './tests/worker-files/thread/testWorker.mjs',
403 enableTasksQueue: true,
404 tasksQueueOptions: { size: 0 }
409 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
416 './tests/worker-files/thread/testWorker.mjs',
418 enableTasksQueue: true,
419 tasksQueueOptions: { size: -1 }
424 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
431 './tests/worker-files/thread/testWorker.mjs',
433 enableTasksQueue: true,
434 tasksQueueOptions: { size: 0.2 }
438 new TypeError('Invalid worker node tasks queue size: must be an integer')
442 it('Verify that pool worker choice strategy options can be set', async () => {
443 const pool = new FixedThreadPool(
445 './tests/worker-files/thread/testWorker.mjs',
446 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
448 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
449 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
450 .workerChoiceStrategies) {
451 expect(workerChoiceStrategy.opts).toStrictEqual({
452 runTime: { median: false },
453 waitTime: { median: false },
454 elu: { median: false },
455 weights: expect.objectContaining({
456 0: expect.any(Number),
457 [pool.info.maxSize - 1]: expect.any(Number)
462 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
480 pool.setWorkerChoiceStrategyOptions({
481 runTime: { median: true },
482 elu: { median: true }
484 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
485 runTime: { median: true },
486 elu: { median: true }
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
490 expect(workerChoiceStrategy.opts).toStrictEqual({
491 runTime: { median: true },
492 waitTime: { median: false },
493 elu: { median: true },
494 weights: expect.objectContaining({
495 0: expect.any(Number),
496 [pool.info.maxSize - 1]: expect.any(Number)
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: false },
521 elu: { median: false }
523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
524 runTime: { median: false },
525 elu: { median: false }
527 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
528 .workerChoiceStrategies) {
529 expect(workerChoiceStrategy.opts).toStrictEqual({
530 runTime: { median: false },
531 waitTime: { median: false },
532 elu: { median: false },
533 weights: expect.objectContaining({
534 0: expect.any(Number),
535 [pool.info.maxSize - 1]: expect.any(Number)
540 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
559 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
562 'Invalid worker choice strategy options: must be a plain object'
565 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
567 'Invalid worker choice strategy options: must have a weight for each worker node'
571 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
574 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
580 it('Verify that pool tasks queue can be enabled/disabled', async () => {
581 const pool = new FixedThreadPool(
583 './tests/worker-files/thread/testWorker.mjs'
585 expect(pool.opts.enableTasksQueue).toBe(false)
586 expect(pool.opts.tasksQueueOptions).toBeUndefined()
587 pool.enableTasksQueue(true)
588 expect(pool.opts.enableTasksQueue).toBe(true)
589 expect(pool.opts.tasksQueueOptions).toStrictEqual({
591 size: Math.pow(numberOfWorkers, 2),
593 tasksStealingOnBackPressure: true,
594 tasksFinishedTimeout: 2000
596 pool.enableTasksQueue(true, { concurrency: 2 })
597 expect(pool.opts.enableTasksQueue).toBe(true)
598 expect(pool.opts.tasksQueueOptions).toStrictEqual({
600 size: Math.pow(numberOfWorkers, 2),
602 tasksStealingOnBackPressure: true,
603 tasksFinishedTimeout: 2000
605 pool.enableTasksQueue(false)
606 expect(pool.opts.enableTasksQueue).toBe(false)
607 expect(pool.opts.tasksQueueOptions).toBeUndefined()
611 it('Verify that pool tasks queue options can be set', async () => {
612 const pool = new FixedThreadPool(
614 './tests/worker-files/thread/testWorker.mjs',
615 { enableTasksQueue: true }
617 expect(pool.opts.tasksQueueOptions).toStrictEqual({
619 size: Math.pow(numberOfWorkers, 2),
621 tasksStealingOnBackPressure: true,
622 tasksFinishedTimeout: 2000
624 for (const workerNode of pool.workerNodes) {
625 expect(workerNode.tasksQueueBackPressureSize).toBe(
626 pool.opts.tasksQueueOptions.size
629 pool.setTasksQueueOptions({
633 tasksStealingOnBackPressure: false,
634 tasksFinishedTimeout: 3000
636 expect(pool.opts.tasksQueueOptions).toStrictEqual({
640 tasksStealingOnBackPressure: false,
641 tasksFinishedTimeout: 3000
643 for (const workerNode of pool.workerNodes) {
644 expect(workerNode.tasksQueueBackPressureSize).toBe(
645 pool.opts.tasksQueueOptions.size
648 pool.setTasksQueueOptions({
651 tasksStealingOnBackPressure: true
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 size: Math.pow(numberOfWorkers, 2),
657 tasksStealingOnBackPressure: true,
658 tasksFinishedTimeout: 2000
660 for (const workerNode of pool.workerNodes) {
661 expect(workerNode.tasksQueueBackPressureSize).toBe(
662 pool.opts.tasksQueueOptions.size
665 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
666 new TypeError('Invalid tasks queue options: must be a plain object')
668 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
670 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
673 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
675 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
678 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
679 new TypeError('Invalid worker node tasks concurrency: must be an integer')
681 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
683 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
686 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
688 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
691 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
692 new TypeError('Invalid worker node tasks queue size: must be an integer')
697 it('Verify that pool info is set', async () => {
698 let pool = new FixedThreadPool(
700 './tests/worker-files/thread/testWorker.mjs'
702 expect(pool.info).toStrictEqual({
704 type: PoolTypes.fixed,
705 worker: WorkerTypes.thread,
708 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
710 minSize: numberOfWorkers,
711 maxSize: numberOfWorkers,
712 workerNodes: numberOfWorkers,
713 idleWorkerNodes: numberOfWorkers,
720 pool = new DynamicClusterPool(
721 Math.floor(numberOfWorkers / 2),
723 './tests/worker-files/cluster/testWorker.cjs'
725 expect(pool.info).toStrictEqual({
727 type: PoolTypes.dynamic,
728 worker: WorkerTypes.cluster,
731 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
733 minSize: Math.floor(numberOfWorkers / 2),
734 maxSize: numberOfWorkers,
735 workerNodes: Math.floor(numberOfWorkers / 2),
736 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
745 it('Verify that pool worker tasks usage are initialized', async () => {
746 const pool = new FixedClusterPool(
748 './tests/worker-files/cluster/testWorker.cjs'
750 for (const workerNode of pool.workerNodes) {
751 expect(workerNode).toBeInstanceOf(WorkerNode)
752 expect(workerNode.usage).toStrictEqual({
758 sequentiallyStolen: 0,
763 history: new CircularArray()
766 history: new CircularArray()
770 history: new CircularArray()
773 history: new CircularArray()
781 it('Verify that pool worker tasks queue are initialized', async () => {
782 let pool = new FixedClusterPool(
784 './tests/worker-files/cluster/testWorker.cjs'
786 for (const workerNode of pool.workerNodes) {
787 expect(workerNode).toBeInstanceOf(WorkerNode)
788 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
789 expect(workerNode.tasksQueue.size).toBe(0)
790 expect(workerNode.tasksQueue.maxSize).toBe(0)
793 pool = new DynamicThreadPool(
794 Math.floor(numberOfWorkers / 2),
796 './tests/worker-files/thread/testWorker.mjs'
798 for (const workerNode of pool.workerNodes) {
799 expect(workerNode).toBeInstanceOf(WorkerNode)
800 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
801 expect(workerNode.tasksQueue.size).toBe(0)
802 expect(workerNode.tasksQueue.maxSize).toBe(0)
807 it('Verify that pool worker info are initialized', async () => {
808 let pool = new FixedClusterPool(
810 './tests/worker-files/cluster/testWorker.cjs'
812 for (const workerNode of pool.workerNodes) {
813 expect(workerNode).toBeInstanceOf(WorkerNode)
814 expect(workerNode.info).toStrictEqual({
815 id: expect.any(Number),
816 type: WorkerTypes.cluster,
823 pool = new DynamicThreadPool(
824 Math.floor(numberOfWorkers / 2),
826 './tests/worker-files/thread/testWorker.mjs'
828 for (const workerNode of pool.workerNodes) {
829 expect(workerNode).toBeInstanceOf(WorkerNode)
830 expect(workerNode.info).toStrictEqual({
831 id: expect.any(Number),
832 type: WorkerTypes.thread,
841 it('Verify that pool statuses are checked at start or destroy', async () => {
842 const pool = new FixedThreadPool(
844 './tests/worker-files/thread/testWorker.mjs'
846 expect(pool.info.started).toBe(true)
847 expect(pool.info.ready).toBe(true)
848 expect(() => pool.start()).toThrow(
849 new Error('Cannot start an already started pool')
852 expect(pool.info.started).toBe(false)
853 expect(pool.info.ready).toBe(false)
854 await expect(pool.destroy()).rejects.toThrow(
855 new Error('Cannot destroy an already destroyed pool')
859 it('Verify that pool can be started after initialization', async () => {
860 const pool = new FixedClusterPool(
862 './tests/worker-files/cluster/testWorker.cjs',
867 expect(pool.info.started).toBe(false)
868 expect(pool.info.ready).toBe(false)
869 expect(pool.workerNodes).toStrictEqual([])
870 expect(pool.readyEventEmitted).toBe(false)
871 await expect(pool.execute()).rejects.toThrow(
872 new Error('Cannot execute a task on not started pool')
875 expect(pool.info.started).toBe(true)
876 expect(pool.info.ready).toBe(true)
877 await waitPoolEvents(pool, PoolEvents.ready, 1)
878 expect(pool.readyEventEmitted).toBe(true)
879 expect(pool.workerNodes.length).toBe(numberOfWorkers)
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
886 it('Verify that pool execute() arguments are checked', async () => {
887 const pool = new FixedClusterPool(
889 './tests/worker-files/cluster/testWorker.cjs'
891 await expect(pool.execute(undefined, 0)).rejects.toThrow(
892 new TypeError('name argument must be a string')
894 await expect(pool.execute(undefined, '')).rejects.toThrow(
895 new TypeError('name argument must not be an empty string')
897 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
898 new TypeError('transferList argument must be an array')
900 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
901 "Task function 'unknown' not found"
904 await expect(pool.execute()).rejects.toThrow(
905 new Error('Cannot execute a task on not started pool')
909 it('Verify that pool worker tasks usage are computed', async () => {
910 const pool = new FixedClusterPool(
912 './tests/worker-files/cluster/testWorker.cjs'
914 const promises = new Set()
915 const maxMultiplier = 2
916 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
917 promises.add(pool.execute())
919 for (const workerNode of pool.workerNodes) {
920 expect(workerNode.usage).toStrictEqual({
923 executing: maxMultiplier,
926 sequentiallyStolen: 0,
931 history: expect.any(CircularArray)
934 history: expect.any(CircularArray)
938 history: expect.any(CircularArray)
941 history: expect.any(CircularArray)
946 await Promise.all(promises)
947 for (const workerNode of pool.workerNodes) {
948 expect(workerNode.usage).toStrictEqual({
950 executed: maxMultiplier,
954 sequentiallyStolen: 0,
959 history: expect.any(CircularArray)
962 history: expect.any(CircularArray)
966 history: expect.any(CircularArray)
969 history: expect.any(CircularArray)
977 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
978 const pool = new DynamicThreadPool(
979 Math.floor(numberOfWorkers / 2),
981 './tests/worker-files/thread/testWorker.mjs'
983 const promises = new Set()
984 const maxMultiplier = 2
985 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
986 promises.add(pool.execute())
988 await Promise.all(promises)
989 for (const workerNode of pool.workerNodes) {
990 expect(workerNode.usage).toStrictEqual({
992 executed: expect.any(Number),
996 sequentiallyStolen: 0,
1001 history: expect.any(CircularArray)
1004 history: expect.any(CircularArray)
1008 history: expect.any(CircularArray)
1011 history: expect.any(CircularArray)
1015 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1016 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1017 numberOfWorkers * maxMultiplier
1019 expect(workerNode.usage.runTime.history.length).toBe(0)
1020 expect(workerNode.usage.waitTime.history.length).toBe(0)
1021 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1022 expect(workerNode.usage.elu.active.history.length).toBe(0)
1024 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1025 for (const workerNode of pool.workerNodes) {
1026 expect(workerNode.usage).toStrictEqual({
1032 sequentiallyStolen: 0,
1037 history: expect.any(CircularArray)
1040 history: expect.any(CircularArray)
1044 history: expect.any(CircularArray)
1047 history: expect.any(CircularArray)
1051 expect(workerNode.usage.runTime.history.length).toBe(0)
1052 expect(workerNode.usage.waitTime.history.length).toBe(0)
1053 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1054 expect(workerNode.usage.elu.active.history.length).toBe(0)
1056 await pool.destroy()
1059 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1060 const pool = new DynamicClusterPool(
1061 Math.floor(numberOfWorkers / 2),
1063 './tests/worker-files/cluster/testWorker.cjs'
1065 expect(pool.emitter.eventNames()).toStrictEqual([])
1068 pool.emitter.on(PoolEvents.ready, info => {
1072 await waitPoolEvents(pool, PoolEvents.ready, 1)
1073 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1074 expect(poolReady).toBe(1)
1075 expect(poolInfo).toStrictEqual({
1077 type: PoolTypes.dynamic,
1078 worker: WorkerTypes.cluster,
1081 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1082 strategyRetries: expect.any(Number),
1083 minSize: expect.any(Number),
1084 maxSize: expect.any(Number),
1085 workerNodes: expect.any(Number),
1086 idleWorkerNodes: expect.any(Number),
1087 busyWorkerNodes: expect.any(Number),
1088 executedTasks: expect.any(Number),
1089 executingTasks: expect.any(Number),
1090 failedTasks: expect.any(Number)
1092 await pool.destroy()
1095 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1096 const pool = new FixedThreadPool(
1098 './tests/worker-files/thread/testWorker.mjs'
1100 expect(pool.emitter.eventNames()).toStrictEqual([])
1101 const promises = new Set()
1104 pool.emitter.on(PoolEvents.busy, info => {
1108 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1109 for (let i = 0; i < numberOfWorkers * 2; i++) {
1110 promises.add(pool.execute())
1112 await Promise.all(promises)
1113 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1114 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1115 expect(poolBusy).toBe(numberOfWorkers + 1)
1116 expect(poolInfo).toStrictEqual({
1118 type: PoolTypes.fixed,
1119 worker: WorkerTypes.thread,
1122 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1123 strategyRetries: expect.any(Number),
1124 minSize: expect.any(Number),
1125 maxSize: expect.any(Number),
1126 workerNodes: expect.any(Number),
1127 idleWorkerNodes: expect.any(Number),
1128 busyWorkerNodes: expect.any(Number),
1129 executedTasks: expect.any(Number),
1130 executingTasks: expect.any(Number),
1131 failedTasks: expect.any(Number)
1133 await pool.destroy()
1136 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1137 const pool = new DynamicThreadPool(
1138 Math.floor(numberOfWorkers / 2),
1140 './tests/worker-files/thread/testWorker.mjs'
1142 expect(pool.emitter.eventNames()).toStrictEqual([])
1143 const promises = new Set()
1146 pool.emitter.on(PoolEvents.full, info => {
1150 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1151 for (let i = 0; i < numberOfWorkers * 2; i++) {
1152 promises.add(pool.execute())
1154 await Promise.all(promises)
1155 expect(poolFull).toBe(1)
1156 expect(poolInfo).toStrictEqual({
1158 type: PoolTypes.dynamic,
1159 worker: WorkerTypes.thread,
1162 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1163 strategyRetries: expect.any(Number),
1164 minSize: expect.any(Number),
1165 maxSize: expect.any(Number),
1166 workerNodes: expect.any(Number),
1167 idleWorkerNodes: expect.any(Number),
1168 busyWorkerNodes: expect.any(Number),
1169 executedTasks: expect.any(Number),
1170 executingTasks: expect.any(Number),
1171 failedTasks: expect.any(Number)
1173 await pool.destroy()
1176 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1177 const pool = new FixedThreadPool(
1179 './tests/worker-files/thread/testWorker.mjs',
1181 enableTasksQueue: true
1184 stub(pool, 'hasBackPressure').returns(true)
1185 expect(pool.emitter.eventNames()).toStrictEqual([])
1186 const promises = new Set()
1187 let poolBackPressure = 0
1189 pool.emitter.on(PoolEvents.backPressure, info => {
1193 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1194 for (let i = 0; i < numberOfWorkers + 1; i++) {
1195 promises.add(pool.execute())
1197 await Promise.all(promises)
1198 expect(poolBackPressure).toBe(1)
1199 expect(poolInfo).toStrictEqual({
1201 type: PoolTypes.fixed,
1202 worker: WorkerTypes.thread,
1205 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1206 strategyRetries: expect.any(Number),
1207 minSize: expect.any(Number),
1208 maxSize: expect.any(Number),
1209 workerNodes: expect.any(Number),
1210 idleWorkerNodes: expect.any(Number),
1211 stealingWorkerNodes: expect.any(Number),
1212 busyWorkerNodes: expect.any(Number),
1213 executedTasks: expect.any(Number),
1214 executingTasks: expect.any(Number),
1215 maxQueuedTasks: expect.any(Number),
1216 queuedTasks: expect.any(Number),
1218 stolenTasks: expect.any(Number),
1219 failedTasks: expect.any(Number)
1221 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1222 await pool.destroy()
1225 it('Verify that destroy() waits for queued tasks to finish', async () => {
1226 const tasksFinishedTimeout = 2500
1227 const pool = new FixedThreadPool(
1229 './tests/worker-files/thread/asyncWorker.mjs',
1231 enableTasksQueue: true,
1232 tasksQueueOptions: { tasksFinishedTimeout }
1235 const maxMultiplier = 4
1236 let tasksFinished = 0
1237 for (const workerNode of pool.workerNodes) {
1238 workerNode.on('taskFinished', () => {
1242 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1245 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1246 const startTime = performance.now()
1247 await pool.destroy()
1248 const elapsedTime = performance.now() - startTime
1249 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1250 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1251 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1254 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1255 const tasksFinishedTimeout = 1000
1256 const pool = new FixedThreadPool(
1258 './tests/worker-files/thread/asyncWorker.mjs',
1260 enableTasksQueue: true,
1261 tasksQueueOptions: { tasksFinishedTimeout }
1264 const maxMultiplier = 4
1265 let tasksFinished = 0
1266 for (const workerNode of pool.workerNodes) {
1267 workerNode.on('taskFinished', () => {
1271 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1274 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1275 const startTime = performance.now()
1276 await pool.destroy()
1277 const elapsedTime = performance.now() - startTime
1278 expect(tasksFinished).toBe(0)
1279 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1282 it('Verify that pool asynchronous resource track tasks execution', async () => {
1287 let resolveCalls = 0
1288 const hook = createHook({
1289 init (asyncId, type) {
1290 if (type === 'poolifier:task') {
1292 taskAsyncId = asyncId
1296 if (asyncId === taskAsyncId) beforeCalls++
1299 if (asyncId === taskAsyncId) afterCalls++
1302 if (executionAsyncId() === taskAsyncId) resolveCalls++
1305 const pool = new FixedThreadPool(
1307 './tests/worker-files/thread/testWorker.mjs'
1310 await pool.execute()
1312 expect(initCalls).toBe(1)
1313 expect(beforeCalls).toBe(1)
1314 expect(afterCalls).toBe(1)
1315 expect(resolveCalls).toBe(1)
1316 await pool.destroy()
1319 it('Verify that hasTaskFunction() is working', async () => {
1320 const dynamicThreadPool = new DynamicThreadPool(
1321 Math.floor(numberOfWorkers / 2),
1323 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1325 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1326 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1327 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1330 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1331 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1333 await dynamicThreadPool.destroy()
1334 const fixedClusterPool = new FixedClusterPool(
1336 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1338 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1339 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1340 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1343 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1344 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1346 await fixedClusterPool.destroy()
1349 it('Verify that addTaskFunction() is working', async () => {
1350 const dynamicThreadPool = new DynamicThreadPool(
1351 Math.floor(numberOfWorkers / 2),
1353 './tests/worker-files/thread/testWorker.mjs'
1355 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1357 dynamicThreadPool.addTaskFunction(0, () => {})
1358 ).rejects.toThrow(new TypeError('name argument must be a string'))
1360 dynamicThreadPool.addTaskFunction('', () => {})
1362 new TypeError('name argument must not be an empty string')
1364 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1365 new TypeError('fn argument must be a function')
1367 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1368 new TypeError('fn argument must be a function')
1370 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1374 const echoTaskFunction = data => {
1378 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1379 ).resolves.toBe(true)
1380 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1381 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1384 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1389 const taskFunctionData = { test: 'test' }
1390 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1391 expect(echoResult).toStrictEqual(taskFunctionData)
1392 for (const workerNode of dynamicThreadPool.workerNodes) {
1393 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1395 executed: expect.any(Number),
1398 sequentiallyStolen: 0,
1403 history: new CircularArray()
1406 history: new CircularArray()
1410 history: new CircularArray()
1413 history: new CircularArray()
1418 await dynamicThreadPool.destroy()
1421 it('Verify that removeTaskFunction() is working', async () => {
1422 const dynamicThreadPool = new DynamicThreadPool(
1423 Math.floor(numberOfWorkers / 2),
1425 './tests/worker-files/thread/testWorker.mjs'
1427 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1428 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1432 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1433 new Error('Cannot remove a task function not handled on the pool side')
1435 const echoTaskFunction = data => {
1438 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1439 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1440 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1443 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1448 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1451 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1452 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1453 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1457 await dynamicThreadPool.destroy()
1460 it('Verify that listTaskFunctionNames() is working', async () => {
1461 const dynamicThreadPool = new DynamicThreadPool(
1462 Math.floor(numberOfWorkers / 2),
1464 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1466 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 'jsonIntegerSerialization',
1473 await dynamicThreadPool.destroy()
1474 const fixedClusterPool = new FixedClusterPool(
1476 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1478 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1479 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1481 'jsonIntegerSerialization',
1485 await fixedClusterPool.destroy()
1488 it('Verify that setDefaultTaskFunction() is working', async () => {
1489 const dynamicThreadPool = new DynamicThreadPool(
1490 Math.floor(numberOfWorkers / 2),
1492 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1494 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1495 const workerId = dynamicThreadPool.workerNodes[0].info.id
1496 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1498 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1502 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1505 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1509 dynamicThreadPool.setDefaultTaskFunction('unknown')
1512 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 'jsonIntegerSerialization',
1522 dynamicThreadPool.setDefaultTaskFunction('factorial')
1523 ).resolves.toBe(true)
1524 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1527 'jsonIntegerSerialization',
1531 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1532 ).resolves.toBe(true)
1533 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1536 'jsonIntegerSerialization',
1539 await dynamicThreadPool.destroy()
1542 it('Verify that multiple task functions worker is working', async () => {
1543 const pool = new DynamicClusterPool(
1544 Math.floor(numberOfWorkers / 2),
1546 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1548 const data = { n: 10 }
1549 const result0 = await pool.execute(data)
1550 expect(result0).toStrictEqual({ ok: 1 })
1551 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1552 expect(result1).toStrictEqual({ ok: 1 })
1553 const result2 = await pool.execute(data, 'factorial')
1554 expect(result2).toBe(3628800)
1555 const result3 = await pool.execute(data, 'fibonacci')
1556 expect(result3).toBe(55)
1557 expect(pool.info.executingTasks).toBe(0)
1558 expect(pool.info.executedTasks).toBe(4)
1559 for (const workerNode of pool.workerNodes) {
1560 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1562 'jsonIntegerSerialization',
1566 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1567 for (const name of pool.listTaskFunctionNames()) {
1568 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1570 executed: expect.any(Number),
1574 sequentiallyStolen: 0,
1578 history: expect.any(CircularArray)
1581 history: expect.any(CircularArray)
1585 history: expect.any(CircularArray)
1588 history: expect.any(CircularArray)
1593 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1594 ).toBeGreaterThan(0)
1597 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1599 workerNode.getTaskFunctionWorkerUsage(
1600 workerNode.info.taskFunctionNames[1]
1604 await pool.destroy()
1607 it('Verify sendKillMessageToWorker()', async () => {
1608 const pool = new DynamicClusterPool(
1609 Math.floor(numberOfWorkers / 2),
1611 './tests/worker-files/cluster/testWorker.cjs'
1613 const workerNodeKey = 0
1615 pool.sendKillMessageToWorker(workerNodeKey)
1616 ).resolves.toBeUndefined()
1617 await pool.destroy()
1620 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1621 const pool = new DynamicClusterPool(
1622 Math.floor(numberOfWorkers / 2),
1624 './tests/worker-files/cluster/testWorker.cjs'
1626 const workerNodeKey = 0
1628 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1629 taskFunctionOperation: 'add',
1630 taskFunctionName: 'empty',
1631 taskFunction: (() => {}).toString()
1633 ).resolves.toBe(true)
1635 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1636 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1637 await pool.destroy()
1640 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1641 const pool = new DynamicClusterPool(
1642 Math.floor(numberOfWorkers / 2),
1644 './tests/worker-files/cluster/testWorker.cjs'
1647 pool.sendTaskFunctionOperationToWorkers({
1648 taskFunctionOperation: 'add',
1649 taskFunctionName: 'empty',
1650 taskFunction: (() => {}).toString()
1652 ).resolves.toBe(true)
1653 for (const workerNode of pool.workerNodes) {
1654 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1660 await pool.destroy()