1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Simulate pool creation from a non main thread/process', () => {
44 new StubPoolWithIsMain(
46 './tests/worker-files/thread/testWorker.mjs',
48 errorHandler: e => console.error(e)
53 'Cannot start a pool from a worker with the same type as the pool'
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
61 './tests/worker-files/thread/testWorker.mjs'
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
68 it('Verify that filePath is checked', () => {
69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
70 new Error("Cannot find the worker file 'undefined'")
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
74 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
77 it('Verify that numberOfWorkers is checked', () => {
82 './tests/worker-files/thread/testWorker.mjs'
86 'Cannot instantiate a pool without specifying the number of workers'
91 it('Verify that a negative number of workers is checked', () => {
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
97 'Cannot instantiate a pool with a negative number of workers'
102 it('Verify that a non integer number of workers is checked', () => {
105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
108 'Cannot instantiate a pool with a non safe integer number of workers'
113 it('Verify that dynamic pool sizing is checked', () => {
116 new DynamicClusterPool(
119 './tests/worker-files/cluster/testWorker.js'
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
128 new DynamicThreadPool(
131 './tests/worker-files/thread/testWorker.mjs'
135 'Cannot instantiate a pool with a non safe integer number of workers'
140 new DynamicClusterPool(
143 './tests/worker-files/cluster/testWorker.js'
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
152 new DynamicThreadPool(
155 './tests/worker-files/thread/testWorker.mjs'
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 new DynamicThreadPool(
167 './tests/worker-files/thread/testWorker.mjs'
171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
176 new DynamicClusterPool(
179 './tests/worker-files/cluster/testWorker.js'
183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
188 it('Verify that pool options are checked', async () => {
189 let pool = new FixedThreadPool(
191 './tests/worker-files/thread/testWorker.mjs'
193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
194 expect(pool.opts).toStrictEqual({
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
209 runTime: { median: false },
210 waitTime: { median: false },
211 elu: { median: false }
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
223 const testHandler = () => console.info('test handler executed')
224 pool = new FixedThreadPool(
226 './tests/worker-files/thread/testWorker.mjs',
228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
229 workerChoiceStrategyOptions: {
230 runTime: { median: true },
231 weights: { 0: 300, 1: 200 }
234 restartWorkerOnError: false,
235 enableTasksQueue: true,
236 tasksQueueOptions: { concurrency: 2 },
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
243 expect(pool.emitter).toBeUndefined()
244 expect(pool.opts).toStrictEqual({
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
251 size: Math.pow(numberOfWorkers, 2),
253 tasksStealingOnBackPressure: true
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
288 it('Verify that pool options are validated', () => {
293 './tests/worker-files/thread/testWorker.mjs',
295 workerChoiceStrategy: 'invalidStrategy'
298 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
303 './tests/worker-files/thread/testWorker.mjs',
305 workerChoiceStrategyOptions: {
306 retries: 'invalidChoiceRetries'
312 'Invalid worker choice strategy options: retries must be an integer'
319 './tests/worker-files/thread/testWorker.mjs',
321 workerChoiceStrategyOptions: {
328 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
335 './tests/worker-files/thread/testWorker.mjs',
337 workerChoiceStrategyOptions: { weights: {} }
342 'Invalid worker choice strategy options: must have a weight for each worker node'
349 './tests/worker-files/thread/testWorker.mjs',
351 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
356 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
363 './tests/worker-files/thread/testWorker.mjs',
365 enableTasksQueue: true,
366 tasksQueueOptions: 'invalidTasksQueueOptions'
370 new TypeError('Invalid tasks queue options: must be a plain object')
376 './tests/worker-files/thread/testWorker.mjs',
378 enableTasksQueue: true,
379 tasksQueueOptions: { concurrency: 0 }
384 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
391 './tests/worker-files/thread/testWorker.mjs',
393 enableTasksQueue: true,
394 tasksQueueOptions: { concurrency: -1 }
399 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
406 './tests/worker-files/thread/testWorker.mjs',
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: 0.2 }
413 new TypeError('Invalid worker node tasks concurrency: must be an integer')
419 './tests/worker-files/thread/testWorker.mjs',
421 enableTasksQueue: true,
422 tasksQueueOptions: { size: 0 }
427 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
434 './tests/worker-files/thread/testWorker.mjs',
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: -1 }
442 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
449 './tests/worker-files/thread/testWorker.mjs',
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: 0.2 }
456 new TypeError('Invalid worker node tasks queue size: must be an integer')
460 it('Verify that pool worker choice strategy options can be set', async () => {
461 const pool = new FixedThreadPool(
463 './tests/worker-files/thread/testWorker.mjs',
464 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
466 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
468 runTime: { median: false },
469 waitTime: { median: false },
470 elu: { median: false }
472 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
474 runTime: { median: false },
475 waitTime: { median: false },
476 elu: { median: false }
478 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
479 .workerChoiceStrategies) {
480 expect(workerChoiceStrategy.opts).toStrictEqual({
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
488 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
506 pool.setWorkerChoiceStrategyOptions({
507 runTime: { median: true },
508 elu: { median: true }
510 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
512 runTime: { median: true },
513 waitTime: { median: false },
514 elu: { median: true }
516 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
518 runTime: { median: true },
519 waitTime: { median: false },
520 elu: { median: true }
522 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
523 .workerChoiceStrategies) {
524 expect(workerChoiceStrategy.opts).toStrictEqual({
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
532 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
550 pool.setWorkerChoiceStrategyOptions({
551 runTime: { median: false },
552 elu: { median: false }
554 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
556 runTime: { median: false },
557 waitTime: { median: false },
558 elu: { median: false }
560 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
562 runTime: { median: false },
563 waitTime: { median: false },
564 elu: { median: false }
566 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
567 .workerChoiceStrategies) {
568 expect(workerChoiceStrategy.opts).toStrictEqual({
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
576 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
595 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
598 'Invalid worker choice strategy options: must be a plain object'
602 pool.setWorkerChoiceStrategyOptions({
603 retries: 'invalidChoiceRetries'
607 'Invalid worker choice strategy options: retries must be an integer'
610 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
615 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
617 'Invalid worker choice strategy options: must have a weight for each worker node'
621 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
624 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
630 it('Verify that pool tasks queue can be enabled/disabled', async () => {
631 const pool = new FixedThreadPool(
633 './tests/worker-files/thread/testWorker.mjs'
635 expect(pool.opts.enableTasksQueue).toBe(false)
636 expect(pool.opts.tasksQueueOptions).toBeUndefined()
637 for (const workerNode of pool.workerNodes) {
638 expect(workerNode.onEmptyQueue).toBeUndefined()
639 expect(workerNode.onBackPressure).toBeUndefined()
641 pool.enableTasksQueue(true)
642 expect(pool.opts.enableTasksQueue).toBe(true)
643 expect(pool.opts.tasksQueueOptions).toStrictEqual({
645 size: Math.pow(numberOfWorkers, 2),
647 tasksStealingOnBackPressure: true
649 for (const workerNode of pool.workerNodes) {
650 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
651 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
653 pool.enableTasksQueue(true, { concurrency: 2 })
654 expect(pool.opts.enableTasksQueue).toBe(true)
655 expect(pool.opts.tasksQueueOptions).toStrictEqual({
657 size: Math.pow(numberOfWorkers, 2),
659 tasksStealingOnBackPressure: true
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
663 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
665 pool.enableTasksQueue(false)
666 expect(pool.opts.enableTasksQueue).toBe(false)
667 expect(pool.opts.tasksQueueOptions).toBeUndefined()
668 for (const workerNode of pool.workerNodes) {
669 expect(workerNode.onEmptyQueue).toBeUndefined()
670 expect(workerNode.onBackPressure).toBeUndefined()
675 it('Verify that pool tasks queue options can be set', async () => {
676 const pool = new FixedThreadPool(
678 './tests/worker-files/thread/testWorker.mjs',
679 { enableTasksQueue: true }
681 expect(pool.opts.tasksQueueOptions).toStrictEqual({
683 size: Math.pow(numberOfWorkers, 2),
685 tasksStealingOnBackPressure: true
687 for (const workerNode of pool.workerNodes) {
688 expect(workerNode.tasksQueueBackPressureSize).toBe(
689 pool.opts.tasksQueueOptions.size
691 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
692 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
694 pool.setTasksQueueOptions({
698 tasksStealingOnBackPressure: false
700 expect(pool.opts.tasksQueueOptions).toStrictEqual({
704 tasksStealingOnBackPressure: false
706 for (const workerNode of pool.workerNodes) {
707 expect(workerNode.tasksQueueBackPressureSize).toBe(
708 pool.opts.tasksQueueOptions.size
710 expect(workerNode.onEmptyQueue).toBeUndefined()
711 expect(workerNode.onBackPressure).toBeUndefined()
713 pool.setTasksQueueOptions({
716 tasksStealingOnBackPressure: true
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
720 size: Math.pow(numberOfWorkers, 2),
722 tasksStealingOnBackPressure: true
724 for (const workerNode of pool.workerNodes) {
725 expect(workerNode.tasksQueueBackPressureSize).toBe(
726 pool.opts.tasksQueueOptions.size
728 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
729 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
731 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
732 new TypeError('Invalid tasks queue options: must be a plain object')
734 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
736 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
739 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
741 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
744 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
745 new TypeError('Invalid worker node tasks concurrency: must be an integer')
747 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
749 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
752 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
754 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
757 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
758 new TypeError('Invalid worker node tasks queue size: must be an integer')
763 it('Verify that pool info is set', async () => {
764 let pool = new FixedThreadPool(
766 './tests/worker-files/thread/testWorker.mjs'
768 expect(pool.info).toStrictEqual({
770 type: PoolTypes.fixed,
771 worker: WorkerTypes.thread,
774 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
775 minSize: numberOfWorkers,
776 maxSize: numberOfWorkers,
777 workerNodes: numberOfWorkers,
778 idleWorkerNodes: numberOfWorkers,
785 pool = new DynamicClusterPool(
786 Math.floor(numberOfWorkers / 2),
788 './tests/worker-files/cluster/testWorker.js'
790 expect(pool.info).toStrictEqual({
792 type: PoolTypes.dynamic,
793 worker: WorkerTypes.cluster,
796 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
797 minSize: Math.floor(numberOfWorkers / 2),
798 maxSize: numberOfWorkers,
799 workerNodes: Math.floor(numberOfWorkers / 2),
800 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
809 it('Verify that pool worker tasks usage are initialized', async () => {
810 const pool = new FixedClusterPool(
812 './tests/worker-files/cluster/testWorker.js'
814 for (const workerNode of pool.workerNodes) {
815 expect(workerNode).toBeInstanceOf(WorkerNode)
816 expect(workerNode.usage).toStrictEqual({
826 history: new CircularArray()
829 history: new CircularArray()
833 history: new CircularArray()
836 history: new CircularArray()
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
847 './tests/worker-files/cluster/testWorker.js'
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
859 './tests/worker-files/thread/testWorker.mjs'
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
873 './tests/worker-files/cluster/testWorker.js'
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
888 './tests/worker-files/thread/testWorker.mjs'
890 for (const workerNode of pool.workerNodes) {
891 expect(workerNode).toBeInstanceOf(WorkerNode)
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
902 it('Verify that pool can be started after initialization', async () => {
903 const pool = new FixedClusterPool(
905 './tests/worker-files/cluster/testWorker.js',
910 expect(pool.info.started).toBe(false)
911 expect(pool.info.ready).toBe(false)
912 expect(pool.workerNodes).toStrictEqual([])
913 await expect(pool.execute()).rejects.toThrow(
914 new Error('Cannot execute a task on not started pool')
917 expect(pool.info.started).toBe(true)
918 expect(pool.info.ready).toBe(true)
919 expect(pool.workerNodes.length).toBe(numberOfWorkers)
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode).toBeInstanceOf(WorkerNode)
926 it('Verify that pool execute() arguments are checked', async () => {
927 const pool = new FixedClusterPool(
929 './tests/worker-files/cluster/testWorker.js'
931 await expect(pool.execute(undefined, 0)).rejects.toThrow(
932 new TypeError('name argument must be a string')
934 await expect(pool.execute(undefined, '')).rejects.toThrow(
935 new TypeError('name argument must not be an empty string')
937 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
938 new TypeError('transferList argument must be an array')
940 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
941 "Task function 'unknown' not found"
944 await expect(pool.execute()).rejects.toThrow(
945 new Error('Cannot execute a task on not started pool')
949 it('Verify that pool worker tasks usage are computed', async () => {
950 const pool = new FixedClusterPool(
952 './tests/worker-files/cluster/testWorker.js'
954 const promises = new Set()
955 const maxMultiplier = 2
956 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
957 promises.add(pool.execute())
959 for (const workerNode of pool.workerNodes) {
960 expect(workerNode.usage).toStrictEqual({
963 executing: maxMultiplier,
970 history: expect.any(CircularArray)
973 history: expect.any(CircularArray)
977 history: expect.any(CircularArray)
980 history: expect.any(CircularArray)
985 await Promise.all(promises)
986 for (const workerNode of pool.workerNodes) {
987 expect(workerNode.usage).toStrictEqual({
989 executed: maxMultiplier,
997 history: expect.any(CircularArray)
1000 history: expect.any(CircularArray)
1004 history: expect.any(CircularArray)
1007 history: expect.any(CircularArray)
1012 await pool.destroy()
1015 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1016 const pool = new DynamicThreadPool(
1017 Math.floor(numberOfWorkers / 2),
1019 './tests/worker-files/thread/testWorker.mjs'
1021 const promises = new Set()
1022 const maxMultiplier = 2
1023 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1024 promises.add(pool.execute())
1026 await Promise.all(promises)
1027 for (const workerNode of pool.workerNodes) {
1028 expect(workerNode.usage).toStrictEqual({
1030 executed: expect.any(Number),
1038 history: expect.any(CircularArray)
1041 history: expect.any(CircularArray)
1045 history: expect.any(CircularArray)
1048 history: expect.any(CircularArray)
1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1056 expect(workerNode.usage.runTime.history.length).toBe(0)
1057 expect(workerNode.usage.waitTime.history.length).toBe(0)
1058 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1059 expect(workerNode.usage.elu.active.history.length).toBe(0)
1061 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1062 for (const workerNode of pool.workerNodes) {
1063 expect(workerNode.usage).toStrictEqual({
1073 history: expect.any(CircularArray)
1076 history: expect.any(CircularArray)
1080 history: expect.any(CircularArray)
1083 history: expect.any(CircularArray)
1087 expect(workerNode.usage.runTime.history.length).toBe(0)
1088 expect(workerNode.usage.waitTime.history.length).toBe(0)
1089 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1090 expect(workerNode.usage.elu.active.history.length).toBe(0)
1092 await pool.destroy()
1095 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1096 const pool = new DynamicClusterPool(
1097 Math.floor(numberOfWorkers / 2),
1099 './tests/worker-files/cluster/testWorker.js'
1101 expect(pool.emitter.eventNames()).toStrictEqual([])
1104 pool.emitter.on(PoolEvents.ready, info => {
1108 await waitPoolEvents(pool, PoolEvents.ready, 1)
1109 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1110 expect(poolReady).toBe(1)
1111 expect(poolInfo).toStrictEqual({
1113 type: PoolTypes.dynamic,
1114 worker: WorkerTypes.cluster,
1117 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1118 minSize: expect.any(Number),
1119 maxSize: expect.any(Number),
1120 workerNodes: expect.any(Number),
1121 idleWorkerNodes: expect.any(Number),
1122 busyWorkerNodes: expect.any(Number),
1123 executedTasks: expect.any(Number),
1124 executingTasks: expect.any(Number),
1125 failedTasks: expect.any(Number)
1127 await pool.destroy()
1130 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1131 const pool = new FixedThreadPool(
1133 './tests/worker-files/thread/testWorker.mjs'
1135 expect(pool.emitter.eventNames()).toStrictEqual([])
1136 const promises = new Set()
1139 pool.emitter.on(PoolEvents.busy, info => {
1143 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
1145 promises.add(pool.execute())
1147 await Promise.all(promises)
1148 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1149 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1150 expect(poolBusy).toBe(numberOfWorkers + 1)
1151 expect(poolInfo).toStrictEqual({
1153 type: PoolTypes.fixed,
1154 worker: WorkerTypes.thread,
1157 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1158 minSize: expect.any(Number),
1159 maxSize: expect.any(Number),
1160 workerNodes: expect.any(Number),
1161 idleWorkerNodes: expect.any(Number),
1162 busyWorkerNodes: expect.any(Number),
1163 executedTasks: expect.any(Number),
1164 executingTasks: expect.any(Number),
1165 failedTasks: expect.any(Number)
1167 await pool.destroy()
1170 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1171 const pool = new DynamicThreadPool(
1172 Math.floor(numberOfWorkers / 2),
1174 './tests/worker-files/thread/testWorker.mjs'
1176 expect(pool.emitter.eventNames()).toStrictEqual([])
1177 const promises = new Set()
1180 pool.emitter.on(PoolEvents.full, info => {
1184 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1185 for (let i = 0; i < numberOfWorkers * 2; i++) {
1186 promises.add(pool.execute())
1188 await Promise.all(promises)
1189 expect(poolFull).toBe(1)
1190 expect(poolInfo).toStrictEqual({
1192 type: PoolTypes.dynamic,
1193 worker: WorkerTypes.thread,
1196 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1197 minSize: expect.any(Number),
1198 maxSize: expect.any(Number),
1199 workerNodes: expect.any(Number),
1200 idleWorkerNodes: expect.any(Number),
1201 busyWorkerNodes: expect.any(Number),
1202 executedTasks: expect.any(Number),
1203 executingTasks: expect.any(Number),
1204 failedTasks: expect.any(Number)
1206 await pool.destroy()
1209 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1210 const pool = new FixedThreadPool(
1212 './tests/worker-files/thread/testWorker.mjs',
1214 enableTasksQueue: true
1217 stub(pool, 'hasBackPressure').returns(true)
1218 expect(pool.emitter.eventNames()).toStrictEqual([])
1219 const promises = new Set()
1220 let poolBackPressure = 0
1222 pool.emitter.on(PoolEvents.backPressure, info => {
1226 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1227 for (let i = 0; i < numberOfWorkers + 1; i++) {
1228 promises.add(pool.execute())
1230 await Promise.all(promises)
1231 expect(poolBackPressure).toBe(1)
1232 expect(poolInfo).toStrictEqual({
1234 type: PoolTypes.fixed,
1235 worker: WorkerTypes.thread,
1238 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1239 minSize: expect.any(Number),
1240 maxSize: expect.any(Number),
1241 workerNodes: expect.any(Number),
1242 idleWorkerNodes: expect.any(Number),
1243 busyWorkerNodes: expect.any(Number),
1244 executedTasks: expect.any(Number),
1245 executingTasks: expect.any(Number),
1246 maxQueuedTasks: expect.any(Number),
1247 queuedTasks: expect.any(Number),
1249 stolenTasks: expect.any(Number),
1250 failedTasks: expect.any(Number)
1252 expect(pool.hasBackPressure.called).toBe(true)
1253 await pool.destroy()
1256 it('Verify that hasTaskFunction() is working', async () => {
1257 const dynamicThreadPool = new DynamicThreadPool(
1258 Math.floor(numberOfWorkers / 2),
1260 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1262 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1263 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1264 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1267 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1268 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1269 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1270 await dynamicThreadPool.destroy()
1271 const fixedClusterPool = new FixedClusterPool(
1273 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1275 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1276 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1277 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1280 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1281 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1282 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1283 await fixedClusterPool.destroy()
1286 it('Verify that addTaskFunction() is working', async () => {
1287 const dynamicThreadPool = new DynamicThreadPool(
1288 Math.floor(numberOfWorkers / 2),
1290 './tests/worker-files/thread/testWorker.mjs'
1292 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1294 dynamicThreadPool.addTaskFunction(0, () => {})
1295 ).rejects.toThrow(new TypeError('name argument must be a string'))
1297 dynamicThreadPool.addTaskFunction('', () => {})
1299 new TypeError('name argument must not be an empty string')
1301 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1302 new TypeError('fn argument must be a function')
1304 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1305 new TypeError('fn argument must be a function')
1307 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1311 const echoTaskFunction = data => {
1315 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1316 ).resolves.toBe(true)
1317 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1318 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1321 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1326 const taskFunctionData = { test: 'test' }
1327 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1328 expect(echoResult).toStrictEqual(taskFunctionData)
1329 for (const workerNode of dynamicThreadPool.workerNodes) {
1330 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1332 executed: expect.any(Number),
1339 history: new CircularArray()
1342 history: new CircularArray()
1346 history: new CircularArray()
1349 history: new CircularArray()
1354 await dynamicThreadPool.destroy()
1357 it('Verify that removeTaskFunction() is working', async () => {
1358 const dynamicThreadPool = new DynamicThreadPool(
1359 Math.floor(numberOfWorkers / 2),
1361 './tests/worker-files/thread/testWorker.mjs'
1363 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1364 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1368 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1369 new Error('Cannot remove a task function not handled on the pool side')
1371 const echoTaskFunction = data => {
1374 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1375 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1376 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1379 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1384 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1387 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1388 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1389 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1393 await dynamicThreadPool.destroy()
1396 it('Verify that listTaskFunctionNames() is working', async () => {
1397 const dynamicThreadPool = new DynamicThreadPool(
1398 Math.floor(numberOfWorkers / 2),
1400 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1402 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1403 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1405 'jsonIntegerSerialization',
1409 await dynamicThreadPool.destroy()
1410 const fixedClusterPool = new FixedClusterPool(
1412 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1414 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1415 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1417 'jsonIntegerSerialization',
1421 await fixedClusterPool.destroy()
1424 it('Verify that setDefaultTaskFunction() is working', async () => {
1425 const dynamicThreadPool = new DynamicThreadPool(
1426 Math.floor(numberOfWorkers / 2),
1428 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1430 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1431 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1433 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1437 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1440 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1444 dynamicThreadPool.setDefaultTaskFunction('unknown')
1447 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1450 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1452 'jsonIntegerSerialization',
1457 dynamicThreadPool.setDefaultTaskFunction('factorial')
1458 ).resolves.toBe(true)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1462 'jsonIntegerSerialization',
1466 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1467 ).resolves.toBe(true)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1471 'jsonIntegerSerialization',
1474 await dynamicThreadPool.destroy()
1477 it('Verify that multiple task functions worker is working', async () => {
1478 const pool = new DynamicClusterPool(
1479 Math.floor(numberOfWorkers / 2),
1481 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1483 const data = { n: 10 }
1484 const result0 = await pool.execute(data)
1485 expect(result0).toStrictEqual({ ok: 1 })
1486 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1487 expect(result1).toStrictEqual({ ok: 1 })
1488 const result2 = await pool.execute(data, 'factorial')
1489 expect(result2).toBe(3628800)
1490 const result3 = await pool.execute(data, 'fibonacci')
1491 expect(result3).toBe(55)
1492 expect(pool.info.executingTasks).toBe(0)
1493 expect(pool.info.executedTasks).toBe(4)
1494 for (const workerNode of pool.workerNodes) {
1495 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1497 'jsonIntegerSerialization',
1501 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1502 for (const name of pool.listTaskFunctionNames()) {
1503 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1505 executed: expect.any(Number),
1512 history: expect.any(CircularArray)
1515 history: expect.any(CircularArray)
1519 history: expect.any(CircularArray)
1522 history: expect.any(CircularArray)
1527 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1528 ).toBeGreaterThan(0)
1531 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1533 workerNode.getTaskFunctionWorkerUsage(
1534 workerNode.info.taskFunctionNames[1]
1538 await pool.destroy()
1541 it('Verify sendKillMessageToWorker()', async () => {
1542 const pool = new DynamicClusterPool(
1543 Math.floor(numberOfWorkers / 2),
1545 './tests/worker-files/cluster/testWorker.js'
1547 const workerNodeKey = 0
1549 pool.sendKillMessageToWorker(workerNodeKey)
1550 ).resolves.toBeUndefined()
1551 await pool.destroy()
1554 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1555 const pool = new DynamicClusterPool(
1556 Math.floor(numberOfWorkers / 2),
1558 './tests/worker-files/cluster/testWorker.js'
1560 const workerNodeKey = 0
1562 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1563 taskFunctionOperation: 'add',
1564 taskFunctionName: 'empty',
1565 taskFunction: (() => {}).toString()
1567 ).resolves.toBe(true)
1569 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1570 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1571 await pool.destroy()
1574 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1575 const pool = new DynamicClusterPool(
1576 Math.floor(numberOfWorkers / 2),
1578 './tests/worker-files/cluster/testWorker.js'
1581 pool.sendTaskFunctionOperationToWorkers({
1582 taskFunctionOperation: 'add',
1583 taskFunctionName: 'empty',
1584 taskFunction: (() => {}).toString()
1586 ).resolves.toBe(true)
1587 for (const workerNode of pool.workerNodes) {
1588 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1594 await pool.destroy()