1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.cjs'
18 import { CircularArray } from '../../lib/circular-array.cjs'
19 import { Deque } from '../../lib/deque.cjs'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21 import { waitPoolEvents } from '../test-utils.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.cjs'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.cjs'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.cjs'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.emitter.eventNames()).toStrictEqual([])
225 expect(pool.opts).toStrictEqual({
228 restartWorkerOnError: true,
229 enableTasksQueue: false,
230 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
232 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
233 .workerChoiceStrategies) {
234 expect(workerChoiceStrategy.opts).toStrictEqual({
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
245 const testHandler = () => console.info('test handler executed')
246 pool = new FixedThreadPool(
248 './tests/worker-files/thread/testWorker.mjs',
250 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
251 workerChoiceStrategyOptions: {
252 runTime: { median: true },
253 weights: { 0: 300, 1: 200 }
256 restartWorkerOnError: false,
257 enableTasksQueue: true,
258 tasksQueueOptions: { concurrency: 2 },
259 messageHandler: testHandler,
260 errorHandler: testHandler,
261 onlineHandler: testHandler,
262 exitHandler: testHandler
265 expect(pool.emitter).toBeUndefined()
266 expect(pool.opts).toStrictEqual({
269 restartWorkerOnError: false,
270 enableTasksQueue: true,
273 size: Math.pow(numberOfWorkers, 2),
275 tasksStealingOnBackPressure: true,
276 tasksFinishedTimeout: 2000
278 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
279 workerChoiceStrategyOptions: {
280 runTime: { median: true },
281 weights: { 0: 300, 1: 200 }
283 onlineHandler: testHandler,
284 messageHandler: testHandler,
285 errorHandler: testHandler,
286 exitHandler: testHandler
288 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
289 .workerChoiceStrategies) {
290 expect(workerChoiceStrategy.opts).toStrictEqual({
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
300 it('Verify that pool options are validated', () => {
305 './tests/worker-files/thread/testWorker.mjs',
307 workerChoiceStrategy: 'invalidStrategy'
310 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
315 './tests/worker-files/thread/testWorker.mjs',
317 workerChoiceStrategyOptions: { weights: {} }
322 'Invalid worker choice strategy options: must have a weight for each worker node'
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
336 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
343 './tests/worker-files/thread/testWorker.mjs',
345 enableTasksQueue: true,
346 tasksQueueOptions: 'invalidTasksQueueOptions'
350 new TypeError('Invalid tasks queue options: must be a plain object')
356 './tests/worker-files/thread/testWorker.mjs',
358 enableTasksQueue: true,
359 tasksQueueOptions: { concurrency: 0 }
364 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
371 './tests/worker-files/thread/testWorker.mjs',
373 enableTasksQueue: true,
374 tasksQueueOptions: { concurrency: -1 }
379 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
386 './tests/worker-files/thread/testWorker.mjs',
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0.2 }
393 new TypeError('Invalid worker node tasks concurrency: must be an integer')
399 './tests/worker-files/thread/testWorker.mjs',
401 enableTasksQueue: true,
402 tasksQueueOptions: { size: 0 }
407 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
414 './tests/worker-files/thread/testWorker.mjs',
416 enableTasksQueue: true,
417 tasksQueueOptions: { size: -1 }
422 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
429 './tests/worker-files/thread/testWorker.mjs',
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0.2 }
436 new TypeError('Invalid worker node tasks queue size: must be an integer')
440 it('Verify that pool worker choice strategy options can be set', async () => {
441 const pool = new FixedThreadPool(
443 './tests/worker-files/thread/testWorker.mjs',
444 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
446 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
447 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
448 .workerChoiceStrategies) {
449 expect(workerChoiceStrategy.opts).toStrictEqual({
450 runTime: { median: false },
451 waitTime: { median: false },
452 elu: { median: false },
453 weights: expect.objectContaining({
454 0: expect.any(Number),
455 [pool.info.maxSize - 1]: expect.any(Number)
460 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
478 pool.setWorkerChoiceStrategyOptions({
479 runTime: { median: true },
480 elu: { median: true }
482 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
483 runTime: { median: true },
484 elu: { median: true }
486 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
487 .workerChoiceStrategies) {
488 expect(workerChoiceStrategy.opts).toStrictEqual({
489 runTime: { median: true },
490 waitTime: { median: false },
491 elu: { median: true },
492 weights: expect.objectContaining({
493 0: expect.any(Number),
494 [pool.info.maxSize - 1]: expect.any(Number)
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: false },
519 elu: { median: false }
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: false },
523 elu: { median: false }
525 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
526 .workerChoiceStrategies) {
527 expect(workerChoiceStrategy.opts).toStrictEqual({
528 runTime: { median: false },
529 waitTime: { median: false },
530 elu: { median: false },
531 weights: expect.objectContaining({
532 0: expect.any(Number),
533 [pool.info.maxSize - 1]: expect.any(Number)
538 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
557 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
560 'Invalid worker choice strategy options: must be a plain object'
563 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
565 'Invalid worker choice strategy options: must have a weight for each worker node'
569 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
572 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
578 it('Verify that pool tasks queue can be enabled/disabled', async () => {
579 const pool = new FixedThreadPool(
581 './tests/worker-files/thread/testWorker.mjs'
583 expect(pool.opts.enableTasksQueue).toBe(false)
584 expect(pool.opts.tasksQueueOptions).toBeUndefined()
585 pool.enableTasksQueue(true)
586 expect(pool.opts.enableTasksQueue).toBe(true)
587 expect(pool.opts.tasksQueueOptions).toStrictEqual({
589 size: Math.pow(numberOfWorkers, 2),
591 tasksStealingOnBackPressure: true,
592 tasksFinishedTimeout: 2000
594 pool.enableTasksQueue(true, { concurrency: 2 })
595 expect(pool.opts.enableTasksQueue).toBe(true)
596 expect(pool.opts.tasksQueueOptions).toStrictEqual({
598 size: Math.pow(numberOfWorkers, 2),
600 tasksStealingOnBackPressure: true,
601 tasksFinishedTimeout: 2000
603 pool.enableTasksQueue(false)
604 expect(pool.opts.enableTasksQueue).toBe(false)
605 expect(pool.opts.tasksQueueOptions).toBeUndefined()
609 it('Verify that pool tasks queue options can be set', async () => {
610 const pool = new FixedThreadPool(
612 './tests/worker-files/thread/testWorker.mjs',
613 { enableTasksQueue: true }
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
617 size: Math.pow(numberOfWorkers, 2),
619 tasksStealingOnBackPressure: true,
620 tasksFinishedTimeout: 2000
622 for (const workerNode of pool.workerNodes) {
623 expect(workerNode.tasksQueueBackPressureSize).toBe(
624 pool.opts.tasksQueueOptions.size
627 pool.setTasksQueueOptions({
631 tasksStealingOnBackPressure: false,
632 tasksFinishedTimeout: 3000
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
638 tasksStealingOnBackPressure: false,
639 tasksFinishedTimeout: 3000
641 for (const workerNode of pool.workerNodes) {
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
646 pool.setTasksQueueOptions({
649 tasksStealingOnBackPressure: true
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
653 size: Math.pow(numberOfWorkers, 2),
655 tasksStealingOnBackPressure: true,
656 tasksFinishedTimeout: 2000
658 for (const workerNode of pool.workerNodes) {
659 expect(workerNode.tasksQueueBackPressureSize).toBe(
660 pool.opts.tasksQueueOptions.size
663 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
664 new TypeError('Invalid tasks queue options: must be a plain object')
666 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
671 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
676 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
679 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
681 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
684 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
686 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
689 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
690 new TypeError('Invalid worker node tasks queue size: must be an integer')
695 it('Verify that pool info is set', async () => {
696 let pool = new FixedThreadPool(
698 './tests/worker-files/thread/testWorker.mjs'
700 expect(pool.info).toStrictEqual({
702 type: PoolTypes.fixed,
703 worker: WorkerTypes.thread,
706 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
707 minSize: numberOfWorkers,
708 maxSize: numberOfWorkers,
709 workerNodes: numberOfWorkers,
710 idleWorkerNodes: numberOfWorkers,
717 pool = new DynamicClusterPool(
718 Math.floor(numberOfWorkers / 2),
720 './tests/worker-files/cluster/testWorker.cjs'
722 expect(pool.info).toStrictEqual({
724 type: PoolTypes.dynamic,
725 worker: WorkerTypes.cluster,
728 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
729 minSize: Math.floor(numberOfWorkers / 2),
730 maxSize: numberOfWorkers,
731 workerNodes: Math.floor(numberOfWorkers / 2),
732 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
741 it('Verify that pool worker tasks usage are initialized', async () => {
742 const pool = new FixedClusterPool(
744 './tests/worker-files/cluster/testWorker.cjs'
746 for (const workerNode of pool.workerNodes) {
747 expect(workerNode).toBeInstanceOf(WorkerNode)
748 expect(workerNode.usage).toStrictEqual({
754 sequentiallyStolen: 0,
759 history: new CircularArray()
762 history: new CircularArray()
766 history: new CircularArray()
769 history: new CircularArray()
777 it('Verify that pool worker tasks queue are initialized', async () => {
778 let pool = new FixedClusterPool(
780 './tests/worker-files/cluster/testWorker.cjs'
782 for (const workerNode of pool.workerNodes) {
783 expect(workerNode).toBeInstanceOf(WorkerNode)
784 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
785 expect(workerNode.tasksQueue.size).toBe(0)
786 expect(workerNode.tasksQueue.maxSize).toBe(0)
789 pool = new DynamicThreadPool(
790 Math.floor(numberOfWorkers / 2),
792 './tests/worker-files/thread/testWorker.mjs'
794 for (const workerNode of pool.workerNodes) {
795 expect(workerNode).toBeInstanceOf(WorkerNode)
796 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
797 expect(workerNode.tasksQueue.size).toBe(0)
798 expect(workerNode.tasksQueue.maxSize).toBe(0)
803 it('Verify that pool worker info are initialized', async () => {
804 let pool = new FixedClusterPool(
806 './tests/worker-files/cluster/testWorker.cjs'
808 for (const workerNode of pool.workerNodes) {
809 expect(workerNode).toBeInstanceOf(WorkerNode)
810 expect(workerNode.info).toStrictEqual({
811 id: expect.any(Number),
812 type: WorkerTypes.cluster,
819 pool = new DynamicThreadPool(
820 Math.floor(numberOfWorkers / 2),
822 './tests/worker-files/thread/testWorker.mjs'
824 for (const workerNode of pool.workerNodes) {
825 expect(workerNode).toBeInstanceOf(WorkerNode)
826 expect(workerNode.info).toStrictEqual({
827 id: expect.any(Number),
828 type: WorkerTypes.thread,
837 it('Verify that pool statuses are checked at start or destroy', async () => {
838 const pool = new FixedThreadPool(
840 './tests/worker-files/thread/testWorker.mjs'
842 expect(pool.info.started).toBe(true)
843 expect(pool.info.ready).toBe(true)
844 expect(() => pool.start()).toThrow(
845 new Error('Cannot start an already started pool')
848 expect(pool.info.started).toBe(false)
849 expect(pool.info.ready).toBe(false)
850 await expect(pool.destroy()).rejects.toThrow(
851 new Error('Cannot destroy an already destroyed pool')
855 it('Verify that pool can be started after initialization', async () => {
856 const pool = new FixedClusterPool(
858 './tests/worker-files/cluster/testWorker.cjs',
863 expect(pool.info.started).toBe(false)
864 expect(pool.info.ready).toBe(false)
865 expect(pool.readyEventEmitted).toBe(false)
866 expect(pool.workerNodes).toStrictEqual([])
867 await expect(pool.execute()).rejects.toThrow(
868 new Error('Cannot execute a task on not started pool')
871 expect(pool.info.started).toBe(true)
872 expect(pool.info.ready).toBe(true)
873 await waitPoolEvents(pool, PoolEvents.ready, 1)
874 expect(pool.readyEventEmitted).toBe(true)
875 expect(pool.workerNodes.length).toBe(numberOfWorkers)
876 for (const workerNode of pool.workerNodes) {
877 expect(workerNode).toBeInstanceOf(WorkerNode)
882 it('Verify that pool execute() arguments are checked', async () => {
883 const pool = new FixedClusterPool(
885 './tests/worker-files/cluster/testWorker.cjs'
887 await expect(pool.execute(undefined, 0)).rejects.toThrow(
888 new TypeError('name argument must be a string')
890 await expect(pool.execute(undefined, '')).rejects.toThrow(
891 new TypeError('name argument must not be an empty string')
893 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
894 new TypeError('transferList argument must be an array')
896 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
897 "Task function 'unknown' not found"
900 await expect(pool.execute()).rejects.toThrow(
901 new Error('Cannot execute a task on not started pool')
905 it('Verify that pool worker tasks usage are computed', async () => {
906 const pool = new FixedClusterPool(
908 './tests/worker-files/cluster/testWorker.cjs'
910 const promises = new Set()
911 const maxMultiplier = 2
912 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
913 promises.add(pool.execute())
915 for (const workerNode of pool.workerNodes) {
916 expect(workerNode.usage).toStrictEqual({
919 executing: maxMultiplier,
922 sequentiallyStolen: 0,
927 history: expect.any(CircularArray)
930 history: expect.any(CircularArray)
934 history: expect.any(CircularArray)
937 history: expect.any(CircularArray)
942 await Promise.all(promises)
943 for (const workerNode of pool.workerNodes) {
944 expect(workerNode.usage).toStrictEqual({
946 executed: maxMultiplier,
950 sequentiallyStolen: 0,
955 history: expect.any(CircularArray)
958 history: expect.any(CircularArray)
962 history: expect.any(CircularArray)
965 history: expect.any(CircularArray)
973 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
974 const pool = new DynamicThreadPool(
975 Math.floor(numberOfWorkers / 2),
977 './tests/worker-files/thread/testWorker.mjs'
979 const promises = new Set()
980 const maxMultiplier = 2
981 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
982 promises.add(pool.execute())
984 await Promise.all(promises)
985 for (const workerNode of pool.workerNodes) {
986 expect(workerNode.usage).toStrictEqual({
988 executed: expect.any(Number),
992 sequentiallyStolen: 0,
997 history: expect.any(CircularArray)
1000 history: expect.any(CircularArray)
1004 history: expect.any(CircularArray)
1007 history: expect.any(CircularArray)
1011 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1012 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1013 numberOfWorkers * maxMultiplier
1015 expect(workerNode.usage.runTime.history.length).toBe(0)
1016 expect(workerNode.usage.waitTime.history.length).toBe(0)
1017 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1018 expect(workerNode.usage.elu.active.history.length).toBe(0)
1020 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1021 for (const workerNode of pool.workerNodes) {
1022 expect(workerNode.usage).toStrictEqual({
1028 sequentiallyStolen: 0,
1033 history: expect.any(CircularArray)
1036 history: expect.any(CircularArray)
1040 history: expect.any(CircularArray)
1043 history: expect.any(CircularArray)
1047 expect(workerNode.usage.runTime.history.length).toBe(0)
1048 expect(workerNode.usage.waitTime.history.length).toBe(0)
1049 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1050 expect(workerNode.usage.elu.active.history.length).toBe(0)
1052 await pool.destroy()
1055 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1056 const pool = new DynamicClusterPool(
1057 Math.floor(numberOfWorkers / 2),
1059 './tests/worker-files/cluster/testWorker.cjs'
1061 expect(pool.emitter.eventNames()).toStrictEqual([])
1064 pool.emitter.on(PoolEvents.ready, info => {
1068 await waitPoolEvents(pool, PoolEvents.ready, 1)
1069 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1070 expect(poolReady).toBe(1)
1071 expect(poolInfo).toStrictEqual({
1073 type: PoolTypes.dynamic,
1074 worker: WorkerTypes.cluster,
1077 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1078 minSize: expect.any(Number),
1079 maxSize: expect.any(Number),
1080 workerNodes: expect.any(Number),
1081 idleWorkerNodes: expect.any(Number),
1082 busyWorkerNodes: expect.any(Number),
1083 executedTasks: expect.any(Number),
1084 executingTasks: expect.any(Number),
1085 failedTasks: expect.any(Number)
1087 await pool.destroy()
1090 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1091 const pool = new FixedThreadPool(
1093 './tests/worker-files/thread/testWorker.mjs'
1095 expect(pool.emitter.eventNames()).toStrictEqual([])
1096 const promises = new Set()
1099 pool.emitter.on(PoolEvents.busy, info => {
1103 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1104 for (let i = 0; i < numberOfWorkers * 2; i++) {
1105 promises.add(pool.execute())
1107 await Promise.all(promises)
1108 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1109 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1110 expect(poolBusy).toBe(numberOfWorkers + 1)
1111 expect(poolInfo).toStrictEqual({
1113 type: PoolTypes.fixed,
1114 worker: WorkerTypes.thread,
1117 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1118 minSize: expect.any(Number),
1119 maxSize: expect.any(Number),
1120 workerNodes: expect.any(Number),
1121 idleWorkerNodes: expect.any(Number),
1122 busyWorkerNodes: expect.any(Number),
1123 executedTasks: expect.any(Number),
1124 executingTasks: expect.any(Number),
1125 failedTasks: expect.any(Number)
1127 await pool.destroy()
1130 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1131 const pool = new DynamicThreadPool(
1132 Math.floor(numberOfWorkers / 2),
1134 './tests/worker-files/thread/testWorker.mjs'
1136 expect(pool.emitter.eventNames()).toStrictEqual([])
1137 const promises = new Set()
1140 pool.emitter.on(PoolEvents.full, info => {
1144 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1145 for (let i = 0; i < numberOfWorkers * 2; i++) {
1146 promises.add(pool.execute())
1148 await Promise.all(promises)
1149 expect(poolFull).toBe(1)
1150 expect(poolInfo).toStrictEqual({
1152 type: PoolTypes.dynamic,
1153 worker: WorkerTypes.thread,
1156 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1157 minSize: expect.any(Number),
1158 maxSize: expect.any(Number),
1159 workerNodes: expect.any(Number),
1160 idleWorkerNodes: expect.any(Number),
1161 busyWorkerNodes: expect.any(Number),
1162 executedTasks: expect.any(Number),
1163 executingTasks: expect.any(Number),
1164 failedTasks: expect.any(Number)
1166 await pool.destroy()
1169 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1170 const pool = new FixedThreadPool(
1172 './tests/worker-files/thread/testWorker.mjs',
1174 enableTasksQueue: true
1177 stub(pool, 'hasBackPressure').returns(true)
1178 expect(pool.emitter.eventNames()).toStrictEqual([])
1179 const promises = new Set()
1180 let poolBackPressure = 0
1182 pool.emitter.on(PoolEvents.backPressure, info => {
1186 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1187 for (let i = 0; i < numberOfWorkers + 1; i++) {
1188 promises.add(pool.execute())
1190 await Promise.all(promises)
1191 expect(poolBackPressure).toBe(1)
1192 expect(poolInfo).toStrictEqual({
1194 type: PoolTypes.fixed,
1195 worker: WorkerTypes.thread,
1198 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1199 minSize: expect.any(Number),
1200 maxSize: expect.any(Number),
1201 workerNodes: expect.any(Number),
1202 idleWorkerNodes: expect.any(Number),
1203 stealingWorkerNodes: expect.any(Number),
1204 busyWorkerNodes: expect.any(Number),
1205 executedTasks: expect.any(Number),
1206 executingTasks: expect.any(Number),
1207 maxQueuedTasks: expect.any(Number),
1208 queuedTasks: expect.any(Number),
1210 stolenTasks: expect.any(Number),
1211 failedTasks: expect.any(Number)
1213 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1214 await pool.destroy()
1217 it('Verify that destroy() waits for queued tasks to finish', async () => {
1218 const tasksFinishedTimeout = 2500
1219 const pool = new FixedThreadPool(
1221 './tests/worker-files/thread/asyncWorker.mjs',
1223 enableTasksQueue: true,
1224 tasksQueueOptions: { tasksFinishedTimeout }
1227 const maxMultiplier = 4
1228 let tasksFinished = 0
1229 for (const workerNode of pool.workerNodes) {
1230 workerNode.on('taskFinished', () => {
1234 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1237 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1238 const startTime = performance.now()
1239 await pool.destroy()
1240 const elapsedTime = performance.now() - startTime
1241 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1242 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1243 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1246 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1247 const tasksFinishedTimeout = 1000
1248 const pool = new FixedThreadPool(
1250 './tests/worker-files/thread/asyncWorker.mjs',
1252 enableTasksQueue: true,
1253 tasksQueueOptions: { tasksFinishedTimeout }
1256 const maxMultiplier = 4
1257 let tasksFinished = 0
1258 for (const workerNode of pool.workerNodes) {
1259 workerNode.on('taskFinished', () => {
1263 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1266 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1267 const startTime = performance.now()
1268 await pool.destroy()
1269 const elapsedTime = performance.now() - startTime
1270 expect(tasksFinished).toBe(0)
1271 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1274 it('Verify that pool asynchronous resource track tasks execution', async () => {
1279 let resolveCalls = 0
1280 const hook = createHook({
1281 init (asyncId, type) {
1282 if (type === 'poolifier:task') {
1284 taskAsyncId = asyncId
1288 if (asyncId === taskAsyncId) beforeCalls++
1291 if (asyncId === taskAsyncId) afterCalls++
1294 if (executionAsyncId() === taskAsyncId) resolveCalls++
1297 const pool = new FixedThreadPool(
1299 './tests/worker-files/thread/testWorker.mjs'
1302 await pool.execute()
1304 expect(initCalls).toBe(1)
1305 expect(beforeCalls).toBe(1)
1306 expect(afterCalls).toBe(1)
1307 expect(resolveCalls).toBe(1)
1308 await pool.destroy()
1311 it('Verify that hasTaskFunction() is working', async () => {
1312 const dynamicThreadPool = new DynamicThreadPool(
1313 Math.floor(numberOfWorkers / 2),
1315 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1317 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1318 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1322 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1324 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1325 await dynamicThreadPool.destroy()
1326 const fixedClusterPool = new FixedClusterPool(
1328 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1330 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1331 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1335 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1338 await fixedClusterPool.destroy()
1341 it('Verify that addTaskFunction() is working', async () => {
1342 const dynamicThreadPool = new DynamicThreadPool(
1343 Math.floor(numberOfWorkers / 2),
1345 './tests/worker-files/thread/testWorker.mjs'
1347 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1349 dynamicThreadPool.addTaskFunction(0, () => {})
1350 ).rejects.toThrow(new TypeError('name argument must be a string'))
1352 dynamicThreadPool.addTaskFunction('', () => {})
1354 new TypeError('name argument must not be an empty string')
1356 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1359 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1360 new TypeError('fn argument must be a function')
1362 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1366 const echoTaskFunction = data => {
1370 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1371 ).resolves.toBe(true)
1372 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1373 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1381 const taskFunctionData = { test: 'test' }
1382 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1383 expect(echoResult).toStrictEqual(taskFunctionData)
1384 for (const workerNode of dynamicThreadPool.workerNodes) {
1385 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1387 executed: expect.any(Number),
1390 sequentiallyStolen: 0,
1395 history: new CircularArray()
1398 history: new CircularArray()
1402 history: new CircularArray()
1405 history: new CircularArray()
1410 await dynamicThreadPool.destroy()
1413 it('Verify that removeTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1417 './tests/worker-files/thread/testWorker.mjs'
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1420 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1424 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1425 new Error('Cannot remove a task function not handled on the pool side')
1427 const echoTaskFunction = data => {
1430 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1432 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1435 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1440 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1445 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1449 await dynamicThreadPool.destroy()
1452 it('Verify that listTaskFunctionNames() is working', async () => {
1453 const dynamicThreadPool = new DynamicThreadPool(
1454 Math.floor(numberOfWorkers / 2),
1456 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1458 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1461 'jsonIntegerSerialization',
1465 await dynamicThreadPool.destroy()
1466 const fixedClusterPool = new FixedClusterPool(
1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1470 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1471 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1473 'jsonIntegerSerialization',
1477 await fixedClusterPool.destroy()
1480 it('Verify that setDefaultTaskFunction() is working', async () => {
1481 const dynamicThreadPool = new DynamicThreadPool(
1482 Math.floor(numberOfWorkers / 2),
1484 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1486 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1487 const workerId = dynamicThreadPool.workerNodes[0].info.id
1488 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1490 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1494 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1497 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1501 dynamicThreadPool.setDefaultTaskFunction('unknown')
1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1509 'jsonIntegerSerialization',
1514 dynamicThreadPool.setDefaultTaskFunction('factorial')
1515 ).resolves.toBe(true)
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1519 'jsonIntegerSerialization',
1523 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1528 'jsonIntegerSerialization',
1531 await dynamicThreadPool.destroy()
1534 it('Verify that multiple task functions worker is working', async () => {
1535 const pool = new DynamicClusterPool(
1536 Math.floor(numberOfWorkers / 2),
1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1540 const data = { n: 10 }
1541 const result0 = await pool.execute(data)
1542 expect(result0).toStrictEqual({ ok: 1 })
1543 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1544 expect(result1).toStrictEqual({ ok: 1 })
1545 const result2 = await pool.execute(data, 'factorial')
1546 expect(result2).toBe(3628800)
1547 const result3 = await pool.execute(data, 'fibonacci')
1548 expect(result3).toBe(55)
1549 expect(pool.info.executingTasks).toBe(0)
1550 expect(pool.info.executedTasks).toBe(4)
1551 for (const workerNode of pool.workerNodes) {
1552 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1554 'jsonIntegerSerialization',
1558 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1559 for (const name of pool.listTaskFunctionNames()) {
1560 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1562 executed: expect.any(Number),
1566 sequentiallyStolen: 0,
1570 history: expect.any(CircularArray)
1573 history: expect.any(CircularArray)
1577 history: expect.any(CircularArray)
1580 history: expect.any(CircularArray)
1585 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1586 ).toBeGreaterThan(0)
1589 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1591 workerNode.getTaskFunctionWorkerUsage(
1592 workerNode.info.taskFunctionNames[1]
1596 await pool.destroy()
1599 it('Verify sendKillMessageToWorker()', async () => {
1600 const pool = new DynamicClusterPool(
1601 Math.floor(numberOfWorkers / 2),
1603 './tests/worker-files/cluster/testWorker.cjs'
1605 const workerNodeKey = 0
1607 pool.sendKillMessageToWorker(workerNodeKey)
1608 ).resolves.toBeUndefined()
1609 await pool.destroy()
1612 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1613 const pool = new DynamicClusterPool(
1614 Math.floor(numberOfWorkers / 2),
1616 './tests/worker-files/cluster/testWorker.cjs'
1618 const workerNodeKey = 0
1620 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1621 taskFunctionOperation: 'add',
1622 taskFunctionName: 'empty',
1623 taskFunction: (() => {}).toString()
1625 ).resolves.toBe(true)
1627 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1628 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1629 await pool.destroy()
1632 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1633 const pool = new DynamicClusterPool(
1634 Math.floor(numberOfWorkers / 2),
1636 './tests/worker-files/cluster/testWorker.cjs'
1639 pool.sendTaskFunctionOperationToWorkers({
1640 taskFunctionOperation: 'add',
1641 taskFunctionName: 'empty',
1642 taskFunction: (() => {}).toString()
1644 ).resolves.toBe(true)
1645 for (const workerNode of pool.workerNodes) {
1646 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1652 await pool.destroy()