1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
44 './tests/worker-files/thread/testWorker.mjs'
46 expect(pool).toBeInstanceOf(FixedThreadPool)
50 it('Verify that pool cannot be created from a non main thread/process', () => {
53 new StubPoolWithIsMain(
55 './tests/worker-files/thread/testWorker.mjs',
57 errorHandler: e => console.error(e)
62 'Cannot start a pool from a worker with the same type as the pool'
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
70 './tests/worker-files/thread/testWorker.mjs'
72 expect(pool.started).toBe(true)
73 expect(pool.starting).toBe(false)
74 expect(pool.destroying).toBe(false)
78 it('Verify that filePath is checked', () => {
79 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
80 new Error("Cannot find the worker file 'undefined'")
83 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
84 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
87 it('Verify that numberOfWorkers is checked', () => {
92 './tests/worker-files/thread/testWorker.mjs'
96 'Cannot instantiate a pool without specifying the number of workers'
101 it('Verify that a negative number of workers is checked', () => {
104 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
107 'Cannot instantiate a pool with a negative number of workers'
112 it('Verify that a non integer number of workers is checked', () => {
115 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
118 'Cannot instantiate a pool with a non safe integer number of workers'
123 it('Verify that dynamic pool sizing is checked', () => {
126 new DynamicClusterPool(
129 './tests/worker-files/cluster/testWorker.js'
133 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
138 new DynamicThreadPool(
141 './tests/worker-files/thread/testWorker.mjs'
145 'Cannot instantiate a pool with a non safe integer number of workers'
150 new DynamicClusterPool(
153 './tests/worker-files/cluster/testWorker.js'
157 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
162 new DynamicThreadPool(
165 './tests/worker-files/thread/testWorker.mjs'
169 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 new DynamicThreadPool(
177 './tests/worker-files/thread/testWorker.mjs'
181 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
186 new DynamicClusterPool(
189 './tests/worker-files/cluster/testWorker.js'
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
198 it('Verify that pool options are checked', async () => {
199 let pool = new FixedThreadPool(
201 './tests/worker-files/thread/testWorker.mjs'
203 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
204 expect(pool.opts).toStrictEqual({
207 restartWorkerOnError: true,
208 enableTasksQueue: false,
209 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
210 workerChoiceStrategyOptions: {
212 runTime: { median: false },
213 waitTime: { median: false },
214 elu: { median: false }
217 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
219 runTime: { median: false },
220 waitTime: { median: false },
221 elu: { median: false }
223 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
224 .workerChoiceStrategies) {
225 expect(workerChoiceStrategy.opts).toStrictEqual({
227 runTime: { median: false },
228 waitTime: { median: false },
229 elu: { median: false }
233 const testHandler = () => console.info('test handler executed')
234 pool = new FixedThreadPool(
236 './tests/worker-files/thread/testWorker.mjs',
238 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
239 workerChoiceStrategyOptions: {
240 runTime: { median: true },
241 weights: { 0: 300, 1: 200 }
244 restartWorkerOnError: false,
245 enableTasksQueue: true,
246 tasksQueueOptions: { concurrency: 2 },
247 messageHandler: testHandler,
248 errorHandler: testHandler,
249 onlineHandler: testHandler,
250 exitHandler: testHandler
253 expect(pool.emitter).toBeUndefined()
254 expect(pool.opts).toStrictEqual({
257 restartWorkerOnError: false,
258 enableTasksQueue: true,
261 size: Math.pow(numberOfWorkers, 2),
263 tasksStealingOnBackPressure: true
265 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
266 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 waitTime: { median: false },
270 elu: { median: false },
271 weights: { 0: 300, 1: 200 }
273 onlineHandler: testHandler,
274 messageHandler: testHandler,
275 errorHandler: testHandler,
276 exitHandler: testHandler
278 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
280 runTime: { median: true },
281 waitTime: { median: false },
282 elu: { median: false },
283 weights: { 0: 300, 1: 200 }
285 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
286 .workerChoiceStrategies) {
287 expect(workerChoiceStrategy.opts).toStrictEqual({
289 runTime: { median: true },
290 waitTime: { median: false },
291 elu: { median: false },
292 weights: { 0: 300, 1: 200 }
298 it('Verify that pool options are validated', () => {
303 './tests/worker-files/thread/testWorker.mjs',
305 workerChoiceStrategy: 'invalidStrategy'
308 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
313 './tests/worker-files/thread/testWorker.mjs',
315 workerChoiceStrategyOptions: {
316 retries: 'invalidChoiceRetries'
322 'Invalid worker choice strategy options: retries must be an integer'
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategyOptions: {
338 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
345 './tests/worker-files/thread/testWorker.mjs',
347 workerChoiceStrategyOptions: { weights: {} }
352 'Invalid worker choice strategy options: must have a weight for each worker node'
359 './tests/worker-files/thread/testWorker.mjs',
361 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
366 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
373 './tests/worker-files/thread/testWorker.mjs',
375 enableTasksQueue: true,
376 tasksQueueOptions: 'invalidTasksQueueOptions'
380 new TypeError('Invalid tasks queue options: must be a plain object')
386 './tests/worker-files/thread/testWorker.mjs',
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0 }
394 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
401 './tests/worker-files/thread/testWorker.mjs',
403 enableTasksQueue: true,
404 tasksQueueOptions: { concurrency: -1 }
409 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
416 './tests/worker-files/thread/testWorker.mjs',
418 enableTasksQueue: true,
419 tasksQueueOptions: { concurrency: 0.2 }
423 new TypeError('Invalid worker node tasks concurrency: must be an integer')
429 './tests/worker-files/thread/testWorker.mjs',
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0 }
437 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
444 './tests/worker-files/thread/testWorker.mjs',
446 enableTasksQueue: true,
447 tasksQueueOptions: { size: -1 }
452 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
459 './tests/worker-files/thread/testWorker.mjs',
461 enableTasksQueue: true,
462 tasksQueueOptions: { size: 0.2 }
466 new TypeError('Invalid worker node tasks queue size: must be an integer')
470 it('Verify that pool worker choice strategy options can be set', async () => {
471 const pool = new FixedThreadPool(
473 './tests/worker-files/thread/testWorker.mjs',
474 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
476 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
478 runTime: { median: false },
479 waitTime: { median: false },
480 elu: { median: false }
482 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
484 runTime: { median: false },
485 waitTime: { median: false },
486 elu: { median: false }
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
490 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: false },
493 waitTime: { median: false },
494 elu: { median: false }
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: true },
518 elu: { median: true }
520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: true },
523 waitTime: { median: false },
524 elu: { median: true }
526 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
528 runTime: { median: true },
529 waitTime: { median: false },
530 elu: { median: true }
532 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
533 .workerChoiceStrategies) {
534 expect(workerChoiceStrategy.opts).toStrictEqual({
536 runTime: { median: true },
537 waitTime: { median: false },
538 elu: { median: true }
542 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions({
561 runTime: { median: false },
562 elu: { median: false }
564 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
566 runTime: { median: false },
567 waitTime: { median: false },
568 elu: { median: false }
570 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
572 runTime: { median: false },
573 waitTime: { median: false },
574 elu: { median: false }
576 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
577 .workerChoiceStrategies) {
578 expect(workerChoiceStrategy.opts).toStrictEqual({
580 runTime: { median: false },
581 waitTime: { median: false },
582 elu: { median: false }
586 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
605 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
608 'Invalid worker choice strategy options: must be a plain object'
612 pool.setWorkerChoiceStrategyOptions({
613 retries: 'invalidChoiceRetries'
617 'Invalid worker choice strategy options: retries must be an integer'
620 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
622 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
625 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
627 'Invalid worker choice strategy options: must have a weight for each worker node'
631 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
634 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
640 it('Verify that pool tasks queue can be enabled/disabled', async () => {
641 const pool = new FixedThreadPool(
643 './tests/worker-files/thread/testWorker.mjs'
645 expect(pool.opts.enableTasksQueue).toBe(false)
646 expect(pool.opts.tasksQueueOptions).toBeUndefined()
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 size: Math.pow(numberOfWorkers, 2),
653 tasksStealingOnBackPressure: true
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 size: Math.pow(numberOfWorkers, 2),
661 tasksStealingOnBackPressure: true
663 pool.enableTasksQueue(false)
664 expect(pool.opts.enableTasksQueue).toBe(false)
665 expect(pool.opts.tasksQueueOptions).toBeUndefined()
669 it('Verify that pool tasks queue options can be set', async () => {
670 const pool = new FixedThreadPool(
672 './tests/worker-files/thread/testWorker.mjs',
673 { enableTasksQueue: true }
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
677 size: Math.pow(numberOfWorkers, 2),
679 tasksStealingOnBackPressure: true
681 for (const workerNode of pool.workerNodes) {
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
686 pool.setTasksQueueOptions({
690 tasksStealingOnBackPressure: false
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
696 tasksStealingOnBackPressure: false
698 for (const workerNode of pool.workerNodes) {
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
703 pool.setTasksQueueOptions({
706 tasksStealingOnBackPressure: true
708 expect(pool.opts.tasksQueueOptions).toStrictEqual({
710 size: Math.pow(numberOfWorkers, 2),
712 tasksStealingOnBackPressure: true
714 for (const workerNode of pool.workerNodes) {
715 expect(workerNode.tasksQueueBackPressureSize).toBe(
716 pool.opts.tasksQueueOptions.size
719 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
720 new TypeError('Invalid tasks queue options: must be a plain object')
722 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
724 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
727 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
729 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
732 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
733 new TypeError('Invalid worker node tasks concurrency: must be an integer')
735 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
737 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
740 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
742 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
745 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
746 new TypeError('Invalid worker node tasks queue size: must be an integer')
751 it('Verify that pool info is set', async () => {
752 let pool = new FixedThreadPool(
754 './tests/worker-files/thread/testWorker.mjs'
756 expect(pool.info).toStrictEqual({
758 type: PoolTypes.fixed,
759 worker: WorkerTypes.thread,
762 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
763 minSize: numberOfWorkers,
764 maxSize: numberOfWorkers,
765 workerNodes: numberOfWorkers,
766 idleWorkerNodes: numberOfWorkers,
773 pool = new DynamicClusterPool(
774 Math.floor(numberOfWorkers / 2),
776 './tests/worker-files/cluster/testWorker.js'
778 expect(pool.info).toStrictEqual({
780 type: PoolTypes.dynamic,
781 worker: WorkerTypes.cluster,
784 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
785 minSize: Math.floor(numberOfWorkers / 2),
786 maxSize: numberOfWorkers,
787 workerNodes: Math.floor(numberOfWorkers / 2),
788 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
797 it('Verify that pool worker tasks usage are initialized', async () => {
798 const pool = new FixedClusterPool(
800 './tests/worker-files/cluster/testWorker.js'
802 for (const workerNode of pool.workerNodes) {
803 expect(workerNode).toBeInstanceOf(WorkerNode)
804 expect(workerNode.usage).toStrictEqual({
814 history: new CircularArray()
817 history: new CircularArray()
821 history: new CircularArray()
824 history: new CircularArray()
832 it('Verify that pool worker tasks queue are initialized', async () => {
833 let pool = new FixedClusterPool(
835 './tests/worker-files/cluster/testWorker.js'
837 for (const workerNode of pool.workerNodes) {
838 expect(workerNode).toBeInstanceOf(WorkerNode)
839 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
840 expect(workerNode.tasksQueue.size).toBe(0)
841 expect(workerNode.tasksQueue.maxSize).toBe(0)
844 pool = new DynamicThreadPool(
845 Math.floor(numberOfWorkers / 2),
847 './tests/worker-files/thread/testWorker.mjs'
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
858 it('Verify that pool worker info are initialized', async () => {
859 let pool = new FixedClusterPool(
861 './tests/worker-files/cluster/testWorker.js'
863 for (const workerNode of pool.workerNodes) {
864 expect(workerNode).toBeInstanceOf(WorkerNode)
865 expect(workerNode.info).toStrictEqual({
866 id: expect.any(Number),
867 type: WorkerTypes.cluster,
873 pool = new DynamicThreadPool(
874 Math.floor(numberOfWorkers / 2),
876 './tests/worker-files/thread/testWorker.mjs'
878 for (const workerNode of pool.workerNodes) {
879 expect(workerNode).toBeInstanceOf(WorkerNode)
880 expect(workerNode.info).toStrictEqual({
881 id: expect.any(Number),
882 type: WorkerTypes.thread,
890 it('Verify that pool statuses are checked at start or destroy', async () => {
891 const pool = new FixedThreadPool(
893 './tests/worker-files/thread/testWorker.mjs'
895 expect(pool.info.started).toBe(true)
896 expect(pool.info.ready).toBe(true)
897 expect(() => pool.start()).toThrow(
898 new Error('Cannot start an already started pool')
901 expect(pool.info.started).toBe(false)
902 expect(pool.info.ready).toBe(false)
903 await expect(pool.destroy()).rejects.toThrow(
904 new Error('Cannot destroy an already destroyed pool')
908 it('Verify that pool can be started after initialization', async () => {
909 const pool = new FixedClusterPool(
911 './tests/worker-files/cluster/testWorker.js',
916 expect(pool.info.started).toBe(false)
917 expect(pool.info.ready).toBe(false)
918 expect(pool.readyEventEmitted).toBe(false)
919 expect(pool.workerNodes).toStrictEqual([])
920 await expect(pool.execute()).rejects.toThrow(
921 new Error('Cannot execute a task on not started pool')
924 expect(pool.info.started).toBe(true)
925 expect(pool.info.ready).toBe(true)
926 await waitPoolEvents(pool, PoolEvents.ready, 1)
927 expect(pool.readyEventEmitted).toBe(true)
928 expect(pool.workerNodes.length).toBe(numberOfWorkers)
929 for (const workerNode of pool.workerNodes) {
930 expect(workerNode).toBeInstanceOf(WorkerNode)
935 it('Verify that pool execute() arguments are checked', async () => {
936 const pool = new FixedClusterPool(
938 './tests/worker-files/cluster/testWorker.js'
940 await expect(pool.execute(undefined, 0)).rejects.toThrow(
941 new TypeError('name argument must be a string')
943 await expect(pool.execute(undefined, '')).rejects.toThrow(
944 new TypeError('name argument must not be an empty string')
946 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
947 new TypeError('transferList argument must be an array')
949 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
950 "Task function 'unknown' not found"
953 await expect(pool.execute()).rejects.toThrow(
954 new Error('Cannot execute a task on not started pool')
958 it('Verify that pool worker tasks usage are computed', async () => {
959 const pool = new FixedClusterPool(
961 './tests/worker-files/cluster/testWorker.js'
963 const promises = new Set()
964 const maxMultiplier = 2
965 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
966 promises.add(pool.execute())
968 for (const workerNode of pool.workerNodes) {
969 expect(workerNode.usage).toStrictEqual({
972 executing: maxMultiplier,
979 history: expect.any(CircularArray)
982 history: expect.any(CircularArray)
986 history: expect.any(CircularArray)
989 history: expect.any(CircularArray)
994 await Promise.all(promises)
995 for (const workerNode of pool.workerNodes) {
996 expect(workerNode.usage).toStrictEqual({
998 executed: maxMultiplier,
1006 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1013 history: expect.any(CircularArray)
1016 history: expect.any(CircularArray)
1021 await pool.destroy()
1024 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1025 const pool = new DynamicThreadPool(
1026 Math.floor(numberOfWorkers / 2),
1028 './tests/worker-files/thread/testWorker.mjs'
1030 const promises = new Set()
1031 const maxMultiplier = 2
1032 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1033 promises.add(pool.execute())
1035 await Promise.all(promises)
1036 for (const workerNode of pool.workerNodes) {
1037 expect(workerNode.usage).toStrictEqual({
1039 executed: expect.any(Number),
1047 history: expect.any(CircularArray)
1050 history: expect.any(CircularArray)
1054 history: expect.any(CircularArray)
1057 history: expect.any(CircularArray)
1061 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1062 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1063 numberOfWorkers * maxMultiplier
1065 expect(workerNode.usage.runTime.history.length).toBe(0)
1066 expect(workerNode.usage.waitTime.history.length).toBe(0)
1067 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1068 expect(workerNode.usage.elu.active.history.length).toBe(0)
1070 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1071 for (const workerNode of pool.workerNodes) {
1072 expect(workerNode.usage).toStrictEqual({
1082 history: expect.any(CircularArray)
1085 history: expect.any(CircularArray)
1089 history: expect.any(CircularArray)
1092 history: expect.any(CircularArray)
1096 expect(workerNode.usage.runTime.history.length).toBe(0)
1097 expect(workerNode.usage.waitTime.history.length).toBe(0)
1098 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1099 expect(workerNode.usage.elu.active.history.length).toBe(0)
1101 await pool.destroy()
1104 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1105 const pool = new DynamicClusterPool(
1106 Math.floor(numberOfWorkers / 2),
1108 './tests/worker-files/cluster/testWorker.js'
1110 expect(pool.emitter.eventNames()).toStrictEqual([])
1113 pool.emitter.on(PoolEvents.ready, info => {
1117 await waitPoolEvents(pool, PoolEvents.ready, 1)
1118 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1119 expect(poolReady).toBe(1)
1120 expect(poolInfo).toStrictEqual({
1122 type: PoolTypes.dynamic,
1123 worker: WorkerTypes.cluster,
1126 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1127 minSize: expect.any(Number),
1128 maxSize: expect.any(Number),
1129 workerNodes: expect.any(Number),
1130 idleWorkerNodes: expect.any(Number),
1131 busyWorkerNodes: expect.any(Number),
1132 executedTasks: expect.any(Number),
1133 executingTasks: expect.any(Number),
1134 failedTasks: expect.any(Number)
1136 await pool.destroy()
1139 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1140 const pool = new FixedThreadPool(
1142 './tests/worker-files/thread/testWorker.mjs'
1144 expect(pool.emitter.eventNames()).toStrictEqual([])
1145 const promises = new Set()
1148 pool.emitter.on(PoolEvents.busy, info => {
1152 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1153 for (let i = 0; i < numberOfWorkers * 2; i++) {
1154 promises.add(pool.execute())
1156 await Promise.all(promises)
1157 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1158 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1159 expect(poolBusy).toBe(numberOfWorkers + 1)
1160 expect(poolInfo).toStrictEqual({
1162 type: PoolTypes.fixed,
1163 worker: WorkerTypes.thread,
1166 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1167 minSize: expect.any(Number),
1168 maxSize: expect.any(Number),
1169 workerNodes: expect.any(Number),
1170 idleWorkerNodes: expect.any(Number),
1171 busyWorkerNodes: expect.any(Number),
1172 executedTasks: expect.any(Number),
1173 executingTasks: expect.any(Number),
1174 failedTasks: expect.any(Number)
1176 await pool.destroy()
1179 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1180 const pool = new DynamicThreadPool(
1181 Math.floor(numberOfWorkers / 2),
1183 './tests/worker-files/thread/testWorker.mjs'
1185 expect(pool.emitter.eventNames()).toStrictEqual([])
1186 const promises = new Set()
1189 pool.emitter.on(PoolEvents.full, info => {
1193 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1194 for (let i = 0; i < numberOfWorkers * 2; i++) {
1195 promises.add(pool.execute())
1197 await Promise.all(promises)
1198 expect(poolFull).toBe(1)
1199 expect(poolInfo).toStrictEqual({
1201 type: PoolTypes.dynamic,
1202 worker: WorkerTypes.thread,
1205 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1206 minSize: expect.any(Number),
1207 maxSize: expect.any(Number),
1208 workerNodes: expect.any(Number),
1209 idleWorkerNodes: expect.any(Number),
1210 busyWorkerNodes: expect.any(Number),
1211 executedTasks: expect.any(Number),
1212 executingTasks: expect.any(Number),
1213 failedTasks: expect.any(Number)
1215 await pool.destroy()
1218 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1219 const pool = new FixedThreadPool(
1221 './tests/worker-files/thread/testWorker.mjs',
1223 enableTasksQueue: true
1226 stub(pool, 'hasBackPressure').returns(true)
1227 expect(pool.emitter.eventNames()).toStrictEqual([])
1228 const promises = new Set()
1229 let poolBackPressure = 0
1231 pool.emitter.on(PoolEvents.backPressure, info => {
1235 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1236 for (let i = 0; i < numberOfWorkers + 1; i++) {
1237 promises.add(pool.execute())
1239 await Promise.all(promises)
1240 expect(poolBackPressure).toBe(1)
1241 expect(poolInfo).toStrictEqual({
1243 type: PoolTypes.fixed,
1244 worker: WorkerTypes.thread,
1247 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1248 minSize: expect.any(Number),
1249 maxSize: expect.any(Number),
1250 workerNodes: expect.any(Number),
1251 idleWorkerNodes: expect.any(Number),
1252 busyWorkerNodes: expect.any(Number),
1253 executedTasks: expect.any(Number),
1254 executingTasks: expect.any(Number),
1255 maxQueuedTasks: expect.any(Number),
1256 queuedTasks: expect.any(Number),
1258 stolenTasks: expect.any(Number),
1259 failedTasks: expect.any(Number)
1261 expect(pool.hasBackPressure.called).toBe(true)
1262 await pool.destroy()
1265 it('Verify that hasTaskFunction() is working', async () => {
1266 const dynamicThreadPool = new DynamicThreadPool(
1267 Math.floor(numberOfWorkers / 2),
1269 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1271 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1272 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1273 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1276 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1278 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1279 await dynamicThreadPool.destroy()
1280 const fixedClusterPool = new FixedClusterPool(
1282 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1284 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1285 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1286 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1289 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1291 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1292 await fixedClusterPool.destroy()
1295 it('Verify that addTaskFunction() is working', async () => {
1296 const dynamicThreadPool = new DynamicThreadPool(
1297 Math.floor(numberOfWorkers / 2),
1299 './tests/worker-files/thread/testWorker.mjs'
1301 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1303 dynamicThreadPool.addTaskFunction(0, () => {})
1304 ).rejects.toThrow(new TypeError('name argument must be a string'))
1306 dynamicThreadPool.addTaskFunction('', () => {})
1308 new TypeError('name argument must not be an empty string')
1310 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1311 new TypeError('fn argument must be a function')
1313 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1314 new TypeError('fn argument must be a function')
1316 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1320 const echoTaskFunction = data => {
1324 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1325 ).resolves.toBe(true)
1326 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1327 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1330 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1335 const taskFunctionData = { test: 'test' }
1336 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1337 expect(echoResult).toStrictEqual(taskFunctionData)
1338 for (const workerNode of dynamicThreadPool.workerNodes) {
1339 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1341 executed: expect.any(Number),
1348 history: new CircularArray()
1351 history: new CircularArray()
1355 history: new CircularArray()
1358 history: new CircularArray()
1363 await dynamicThreadPool.destroy()
1366 it('Verify that removeTaskFunction() is working', async () => {
1367 const dynamicThreadPool = new DynamicThreadPool(
1368 Math.floor(numberOfWorkers / 2),
1370 './tests/worker-files/thread/testWorker.mjs'
1372 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1373 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1378 new Error('Cannot remove a task function not handled on the pool side')
1380 const echoTaskFunction = data => {
1383 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1384 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1385 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1388 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1393 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1396 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1397 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1398 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1402 await dynamicThreadPool.destroy()
1405 it('Verify that listTaskFunctionNames() is working', async () => {
1406 const dynamicThreadPool = new DynamicThreadPool(
1407 Math.floor(numberOfWorkers / 2),
1409 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1411 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1412 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1414 'jsonIntegerSerialization',
1418 await dynamicThreadPool.destroy()
1419 const fixedClusterPool = new FixedClusterPool(
1421 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1423 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1424 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1426 'jsonIntegerSerialization',
1430 await fixedClusterPool.destroy()
1433 it('Verify that setDefaultTaskFunction() is working', async () => {
1434 const dynamicThreadPool = new DynamicThreadPool(
1435 Math.floor(numberOfWorkers / 2),
1437 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1439 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1440 const workerId = dynamicThreadPool.workerNodes[0].info.id
1441 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1443 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1447 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1450 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1454 dynamicThreadPool.setDefaultTaskFunction('unknown')
1457 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1460 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1462 'jsonIntegerSerialization',
1467 dynamicThreadPool.setDefaultTaskFunction('factorial')
1468 ).resolves.toBe(true)
1469 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1472 'jsonIntegerSerialization',
1476 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1477 ).resolves.toBe(true)
1478 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1481 'jsonIntegerSerialization',
1484 await dynamicThreadPool.destroy()
1487 it('Verify that multiple task functions worker is working', async () => {
1488 const pool = new DynamicClusterPool(
1489 Math.floor(numberOfWorkers / 2),
1491 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1493 const data = { n: 10 }
1494 const result0 = await pool.execute(data)
1495 expect(result0).toStrictEqual({ ok: 1 })
1496 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1497 expect(result1).toStrictEqual({ ok: 1 })
1498 const result2 = await pool.execute(data, 'factorial')
1499 expect(result2).toBe(3628800)
1500 const result3 = await pool.execute(data, 'fibonacci')
1501 expect(result3).toBe(55)
1502 expect(pool.info.executingTasks).toBe(0)
1503 expect(pool.info.executedTasks).toBe(4)
1504 for (const workerNode of pool.workerNodes) {
1505 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1507 'jsonIntegerSerialization',
1511 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1512 for (const name of pool.listTaskFunctionNames()) {
1513 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1515 executed: expect.any(Number),
1522 history: expect.any(CircularArray)
1525 history: expect.any(CircularArray)
1529 history: expect.any(CircularArray)
1532 history: expect.any(CircularArray)
1537 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1538 ).toBeGreaterThan(0)
1541 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1543 workerNode.getTaskFunctionWorkerUsage(
1544 workerNode.info.taskFunctionNames[1]
1548 await pool.destroy()
1551 it('Verify sendKillMessageToWorker()', async () => {
1552 const pool = new DynamicClusterPool(
1553 Math.floor(numberOfWorkers / 2),
1555 './tests/worker-files/cluster/testWorker.js'
1557 const workerNodeKey = 0
1559 pool.sendKillMessageToWorker(workerNodeKey)
1560 ).resolves.toBeUndefined()
1561 await pool.destroy()
1564 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1565 const pool = new DynamicClusterPool(
1566 Math.floor(numberOfWorkers / 2),
1568 './tests/worker-files/cluster/testWorker.js'
1570 const workerNodeKey = 0
1572 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1573 taskFunctionOperation: 'add',
1574 taskFunctionName: 'empty',
1575 taskFunction: (() => {}).toString()
1577 ).resolves.toBe(true)
1579 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1580 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1581 await pool.destroy()
1584 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1585 const pool = new DynamicClusterPool(
1586 Math.floor(numberOfWorkers / 2),
1588 './tests/worker-files/cluster/testWorker.js'
1591 pool.sendTaskFunctionOperationToWorkers({
1592 taskFunctionOperation: 'add',
1593 taskFunctionName: 'empty',
1594 taskFunction: (() => {}).toString()
1596 ).resolves.toBe(true)
1597 for (const workerNode of pool.workerNodes) {
1598 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1604 await pool.destroy()