1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Simulate pool creation from a non main thread/process', () => {
44 new StubPoolWithIsMain(
46 './tests/worker-files/thread/testWorker.mjs',
48 errorHandler: e => console.error(e)
53 'Cannot start a pool from a worker with the same type as the pool'
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
61 './tests/worker-files/thread/testWorker.mjs'
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
68 it('Verify that filePath is checked', () => {
69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
70 new Error("Cannot find the worker file 'undefined'")
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
74 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
77 it('Verify that numberOfWorkers is checked', () => {
82 './tests/worker-files/thread/testWorker.mjs'
86 'Cannot instantiate a pool without specifying the number of workers'
91 it('Verify that a negative number of workers is checked', () => {
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
97 'Cannot instantiate a pool with a negative number of workers'
102 it('Verify that a non integer number of workers is checked', () => {
105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
108 'Cannot instantiate a pool with a non safe integer number of workers'
113 it('Verify that dynamic pool sizing is checked', () => {
116 new DynamicClusterPool(
119 './tests/worker-files/cluster/testWorker.js'
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
128 new DynamicThreadPool(
131 './tests/worker-files/thread/testWorker.mjs'
135 'Cannot instantiate a pool with a non safe integer number of workers'
140 new DynamicClusterPool(
143 './tests/worker-files/cluster/testWorker.js'
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
152 new DynamicThreadPool(
155 './tests/worker-files/thread/testWorker.mjs'
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 new DynamicThreadPool(
167 './tests/worker-files/thread/testWorker.mjs'
171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
176 new DynamicClusterPool(
179 './tests/worker-files/cluster/testWorker.js'
183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
188 it('Verify that pool options are checked', async () => {
189 let pool = new FixedThreadPool(
191 './tests/worker-files/thread/testWorker.mjs'
193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
194 expect(pool.opts).toStrictEqual({
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
209 runTime: { median: false },
210 waitTime: { median: false },
211 elu: { median: false }
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
223 const testHandler = () => console.info('test handler executed')
224 pool = new FixedThreadPool(
226 './tests/worker-files/thread/testWorker.mjs',
228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
229 workerChoiceStrategyOptions: {
230 runTime: { median: true },
231 weights: { 0: 300, 1: 200 }
234 restartWorkerOnError: false,
235 enableTasksQueue: true,
236 tasksQueueOptions: { concurrency: 2 },
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
243 expect(pool.emitter).toBeUndefined()
244 expect(pool.opts).toStrictEqual({
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
251 size: Math.pow(numberOfWorkers, 2),
253 tasksStealingOnBackPressure: true
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
288 it('Verify that pool options are validated', () => {
293 './tests/worker-files/thread/testWorker.mjs',
295 workerChoiceStrategy: 'invalidStrategy'
299 new Error("Invalid worker choice strategy 'invalidStrategy'")
305 './tests/worker-files/thread/testWorker.mjs',
307 workerChoiceStrategyOptions: {
308 retries: 'invalidChoiceRetries'
314 'Invalid worker choice strategy options: retries must be an integer'
321 './tests/worker-files/thread/testWorker.mjs',
323 workerChoiceStrategyOptions: {
330 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
337 './tests/worker-files/thread/testWorker.mjs',
339 workerChoiceStrategyOptions: { weights: {} }
344 'Invalid worker choice strategy options: must have a weight for each worker node'
351 './tests/worker-files/thread/testWorker.mjs',
353 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
358 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
365 './tests/worker-files/thread/testWorker.mjs',
367 enableTasksQueue: true,
368 tasksQueueOptions: 'invalidTasksQueueOptions'
372 new TypeError('Invalid tasks queue options: must be a plain object')
378 './tests/worker-files/thread/testWorker.mjs',
380 enableTasksQueue: true,
381 tasksQueueOptions: { concurrency: 0 }
386 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
393 './tests/worker-files/thread/testWorker.mjs',
395 enableTasksQueue: true,
396 tasksQueueOptions: { concurrency: -1 }
401 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
408 './tests/worker-files/thread/testWorker.mjs',
410 enableTasksQueue: true,
411 tasksQueueOptions: { concurrency: 0.2 }
415 new TypeError('Invalid worker node tasks concurrency: must be an integer')
421 './tests/worker-files/thread/testWorker.mjs',
423 enableTasksQueue: true,
424 tasksQueueOptions: { size: 0 }
429 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
436 './tests/worker-files/thread/testWorker.mjs',
438 enableTasksQueue: true,
439 tasksQueueOptions: { size: -1 }
444 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
451 './tests/worker-files/thread/testWorker.mjs',
453 enableTasksQueue: true,
454 tasksQueueOptions: { size: 0.2 }
458 new TypeError('Invalid worker node tasks queue size: must be an integer')
462 it('Verify that pool worker choice strategy options can be set', async () => {
463 const pool = new FixedThreadPool(
465 './tests/worker-files/thread/testWorker.mjs',
466 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
468 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
470 runTime: { median: false },
471 waitTime: { median: false },
472 elu: { median: false }
474 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
476 runTime: { median: false },
477 waitTime: { median: false },
478 elu: { median: false }
480 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
481 .workerChoiceStrategies) {
482 expect(workerChoiceStrategy.opts).toStrictEqual({
484 runTime: { median: false },
485 waitTime: { median: false },
486 elu: { median: false }
490 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
508 pool.setWorkerChoiceStrategyOptions({
509 runTime: { median: true },
510 elu: { median: true }
512 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
514 runTime: { median: true },
515 waitTime: { median: false },
516 elu: { median: true }
518 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
520 runTime: { median: true },
521 waitTime: { median: false },
522 elu: { median: true }
524 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
525 .workerChoiceStrategies) {
526 expect(workerChoiceStrategy.opts).toStrictEqual({
528 runTime: { median: true },
529 waitTime: { median: false },
530 elu: { median: true }
534 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
552 pool.setWorkerChoiceStrategyOptions({
553 runTime: { median: false },
554 elu: { median: false }
556 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
558 runTime: { median: false },
559 waitTime: { median: false },
560 elu: { median: false }
562 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
564 runTime: { median: false },
565 waitTime: { median: false },
566 elu: { median: false }
568 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
569 .workerChoiceStrategies) {
570 expect(workerChoiceStrategy.opts).toStrictEqual({
572 runTime: { median: false },
573 waitTime: { median: false },
574 elu: { median: false }
578 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
597 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
600 'Invalid worker choice strategy options: must be a plain object'
604 pool.setWorkerChoiceStrategyOptions({
605 retries: 'invalidChoiceRetries'
609 'Invalid worker choice strategy options: retries must be an integer'
613 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
616 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
620 pool.setWorkerChoiceStrategyOptions({ weights: {} })
623 'Invalid worker choice strategy options: must have a weight for each worker node'
627 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
630 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
636 it('Verify that pool tasks queue can be enabled/disabled', async () => {
637 const pool = new FixedThreadPool(
639 './tests/worker-files/thread/testWorker.mjs'
641 expect(pool.opts.enableTasksQueue).toBe(false)
642 expect(pool.opts.tasksQueueOptions).toBeUndefined()
643 for (const workerNode of pool.workerNodes) {
644 expect(workerNode.onEmptyQueue).toBeUndefined()
645 expect(workerNode.onBackPressure).toBeUndefined()
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 size: Math.pow(numberOfWorkers, 2),
653 tasksStealingOnBackPressure: true
655 for (const workerNode of pool.workerNodes) {
656 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
657 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
663 size: Math.pow(numberOfWorkers, 2),
665 tasksStealingOnBackPressure: true
667 for (const workerNode of pool.workerNodes) {
668 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
669 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
671 pool.enableTasksQueue(false)
672 expect(pool.opts.enableTasksQueue).toBe(false)
673 expect(pool.opts.tasksQueueOptions).toBeUndefined()
674 for (const workerNode of pool.workerNodes) {
675 expect(workerNode.onEmptyQueue).toBeUndefined()
676 expect(workerNode.onBackPressure).toBeUndefined()
681 it('Verify that pool tasks queue options can be set', async () => {
682 const pool = new FixedThreadPool(
684 './tests/worker-files/thread/testWorker.mjs',
685 { enableTasksQueue: true }
687 expect(pool.opts.tasksQueueOptions).toStrictEqual({
689 size: Math.pow(numberOfWorkers, 2),
691 tasksStealingOnBackPressure: true
693 for (const workerNode of pool.workerNodes) {
694 expect(workerNode.tasksQueueBackPressureSize).toBe(
695 pool.opts.tasksQueueOptions.size
697 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
698 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
700 pool.setTasksQueueOptions({
704 tasksStealingOnBackPressure: false
706 expect(pool.opts.tasksQueueOptions).toStrictEqual({
710 tasksStealingOnBackPressure: false
712 for (const workerNode of pool.workerNodes) {
713 expect(workerNode.tasksQueueBackPressureSize).toBe(
714 pool.opts.tasksQueueOptions.size
716 expect(workerNode.onEmptyQueue).toBeUndefined()
717 expect(workerNode.onBackPressure).toBeUndefined()
719 pool.setTasksQueueOptions({
722 tasksStealingOnBackPressure: true
724 expect(pool.opts.tasksQueueOptions).toStrictEqual({
726 size: Math.pow(numberOfWorkers, 2),
728 tasksStealingOnBackPressure: true
730 for (const workerNode of pool.workerNodes) {
731 expect(workerNode.tasksQueueBackPressureSize).toBe(
732 pool.opts.tasksQueueOptions.size
734 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
735 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
738 pool.setTasksQueueOptions('invalidTasksQueueOptions')
740 new TypeError('Invalid tasks queue options: must be a plain object')
742 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
744 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
747 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
749 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
752 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
753 new TypeError('Invalid worker node tasks concurrency: must be an integer')
755 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
757 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
760 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
762 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
765 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
766 new TypeError('Invalid worker node tasks queue size: must be an integer')
771 it('Verify that pool info is set', async () => {
772 let pool = new FixedThreadPool(
774 './tests/worker-files/thread/testWorker.mjs'
776 expect(pool.info).toStrictEqual({
778 type: PoolTypes.fixed,
779 worker: WorkerTypes.thread,
782 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
783 minSize: numberOfWorkers,
784 maxSize: numberOfWorkers,
785 workerNodes: numberOfWorkers,
786 idleWorkerNodes: numberOfWorkers,
793 pool = new DynamicClusterPool(
794 Math.floor(numberOfWorkers / 2),
796 './tests/worker-files/cluster/testWorker.js'
798 expect(pool.info).toStrictEqual({
800 type: PoolTypes.dynamic,
801 worker: WorkerTypes.cluster,
804 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
805 minSize: Math.floor(numberOfWorkers / 2),
806 maxSize: numberOfWorkers,
807 workerNodes: Math.floor(numberOfWorkers / 2),
808 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
817 it('Verify that pool worker tasks usage are initialized', async () => {
818 const pool = new FixedClusterPool(
820 './tests/worker-files/cluster/testWorker.js'
822 for (const workerNode of pool.workerNodes) {
823 expect(workerNode).toBeInstanceOf(WorkerNode)
824 expect(workerNode.usage).toStrictEqual({
834 history: new CircularArray()
837 history: new CircularArray()
841 history: new CircularArray()
844 history: new CircularArray()
852 it('Verify that pool worker tasks queue are initialized', async () => {
853 let pool = new FixedClusterPool(
855 './tests/worker-files/cluster/testWorker.js'
857 for (const workerNode of pool.workerNodes) {
858 expect(workerNode).toBeInstanceOf(WorkerNode)
859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
860 expect(workerNode.tasksQueue.size).toBe(0)
861 expect(workerNode.tasksQueue.maxSize).toBe(0)
864 pool = new DynamicThreadPool(
865 Math.floor(numberOfWorkers / 2),
867 './tests/worker-files/thread/testWorker.mjs'
869 for (const workerNode of pool.workerNodes) {
870 expect(workerNode).toBeInstanceOf(WorkerNode)
871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
872 expect(workerNode.tasksQueue.size).toBe(0)
873 expect(workerNode.tasksQueue.maxSize).toBe(0)
878 it('Verify that pool worker info are initialized', async () => {
879 let pool = new FixedClusterPool(
881 './tests/worker-files/cluster/testWorker.js'
883 for (const workerNode of pool.workerNodes) {
884 expect(workerNode).toBeInstanceOf(WorkerNode)
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.cluster,
893 pool = new DynamicThreadPool(
894 Math.floor(numberOfWorkers / 2),
896 './tests/worker-files/thread/testWorker.mjs'
898 for (const workerNode of pool.workerNodes) {
899 expect(workerNode).toBeInstanceOf(WorkerNode)
900 expect(workerNode.info).toStrictEqual({
901 id: expect.any(Number),
902 type: WorkerTypes.thread,
910 it('Verify that pool can be started after initialization', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.js',
918 expect(pool.info.started).toBe(false)
919 expect(pool.info.ready).toBe(false)
920 expect(pool.workerNodes).toStrictEqual([])
921 await expect(pool.execute()).rejects.toThrowError(
922 new Error('Cannot execute a task on not started pool')
925 expect(pool.info.started).toBe(true)
926 expect(pool.info.ready).toBe(true)
927 expect(pool.workerNodes.length).toBe(numberOfWorkers)
928 for (const workerNode of pool.workerNodes) {
929 expect(workerNode).toBeInstanceOf(WorkerNode)
934 it('Verify that pool execute() arguments are checked', async () => {
935 const pool = new FixedClusterPool(
937 './tests/worker-files/cluster/testWorker.js'
939 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
940 new TypeError('name argument must be a string')
942 await expect(pool.execute(undefined, '')).rejects.toThrowError(
943 new TypeError('name argument must not be an empty string')
945 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
946 new TypeError('transferList argument must be an array')
948 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
949 "Task function 'unknown' not found"
952 await expect(pool.execute()).rejects.toThrowError(
953 new Error('Cannot execute a task on not started pool')
957 it('Verify that pool worker tasks usage are computed', async () => {
958 const pool = new FixedClusterPool(
960 './tests/worker-files/cluster/testWorker.js'
962 const promises = new Set()
963 const maxMultiplier = 2
964 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
965 promises.add(pool.execute())
967 for (const workerNode of pool.workerNodes) {
968 expect(workerNode.usage).toStrictEqual({
971 executing: maxMultiplier,
978 history: expect.any(CircularArray)
981 history: expect.any(CircularArray)
985 history: expect.any(CircularArray)
988 history: expect.any(CircularArray)
993 await Promise.all(promises)
994 for (const workerNode of pool.workerNodes) {
995 expect(workerNode.usage).toStrictEqual({
997 executed: maxMultiplier,
1005 history: expect.any(CircularArray)
1008 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1015 history: expect.any(CircularArray)
1020 await pool.destroy()
1023 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1024 const pool = new DynamicThreadPool(
1025 Math.floor(numberOfWorkers / 2),
1027 './tests/worker-files/thread/testWorker.mjs'
1029 const promises = new Set()
1030 const maxMultiplier = 2
1031 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1032 promises.add(pool.execute())
1034 await Promise.all(promises)
1035 for (const workerNode of pool.workerNodes) {
1036 expect(workerNode.usage).toStrictEqual({
1038 executed: expect.any(Number),
1046 history: expect.any(CircularArray)
1049 history: expect.any(CircularArray)
1053 history: expect.any(CircularArray)
1056 history: expect.any(CircularArray)
1060 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1061 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1062 numberOfWorkers * maxMultiplier
1064 expect(workerNode.usage.runTime.history.length).toBe(0)
1065 expect(workerNode.usage.waitTime.history.length).toBe(0)
1066 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1067 expect(workerNode.usage.elu.active.history.length).toBe(0)
1069 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1070 for (const workerNode of pool.workerNodes) {
1071 expect(workerNode.usage).toStrictEqual({
1081 history: expect.any(CircularArray)
1084 history: expect.any(CircularArray)
1088 history: expect.any(CircularArray)
1091 history: expect.any(CircularArray)
1095 expect(workerNode.usage.runTime.history.length).toBe(0)
1096 expect(workerNode.usage.waitTime.history.length).toBe(0)
1097 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1098 expect(workerNode.usage.elu.active.history.length).toBe(0)
1100 await pool.destroy()
1103 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1104 const pool = new DynamicClusterPool(
1105 Math.floor(numberOfWorkers / 2),
1107 './tests/worker-files/cluster/testWorker.js'
1109 expect(pool.emitter.eventNames()).toStrictEqual([])
1112 pool.emitter.on(PoolEvents.ready, info => {
1116 await waitPoolEvents(pool, PoolEvents.ready, 1)
1117 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1118 expect(poolReady).toBe(1)
1119 expect(poolInfo).toStrictEqual({
1121 type: PoolTypes.dynamic,
1122 worker: WorkerTypes.cluster,
1125 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1126 minSize: expect.any(Number),
1127 maxSize: expect.any(Number),
1128 workerNodes: expect.any(Number),
1129 idleWorkerNodes: expect.any(Number),
1130 busyWorkerNodes: expect.any(Number),
1131 executedTasks: expect.any(Number),
1132 executingTasks: expect.any(Number),
1133 failedTasks: expect.any(Number)
1135 await pool.destroy()
1138 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1139 const pool = new FixedThreadPool(
1141 './tests/worker-files/thread/testWorker.mjs'
1143 expect(pool.emitter.eventNames()).toStrictEqual([])
1144 const promises = new Set()
1147 pool.emitter.on(PoolEvents.busy, info => {
1151 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1152 for (let i = 0; i < numberOfWorkers * 2; i++) {
1153 promises.add(pool.execute())
1155 await Promise.all(promises)
1156 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1157 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1158 expect(poolBusy).toBe(numberOfWorkers + 1)
1159 expect(poolInfo).toStrictEqual({
1161 type: PoolTypes.fixed,
1162 worker: WorkerTypes.thread,
1165 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1166 minSize: expect.any(Number),
1167 maxSize: expect.any(Number),
1168 workerNodes: expect.any(Number),
1169 idleWorkerNodes: expect.any(Number),
1170 busyWorkerNodes: expect.any(Number),
1171 executedTasks: expect.any(Number),
1172 executingTasks: expect.any(Number),
1173 failedTasks: expect.any(Number)
1175 await pool.destroy()
1178 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1179 const pool = new DynamicThreadPool(
1180 Math.floor(numberOfWorkers / 2),
1182 './tests/worker-files/thread/testWorker.mjs'
1184 expect(pool.emitter.eventNames()).toStrictEqual([])
1185 const promises = new Set()
1188 pool.emitter.on(PoolEvents.full, info => {
1192 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1193 for (let i = 0; i < numberOfWorkers * 2; i++) {
1194 promises.add(pool.execute())
1196 await Promise.all(promises)
1197 expect(poolFull).toBe(1)
1198 expect(poolInfo).toStrictEqual({
1200 type: PoolTypes.dynamic,
1201 worker: WorkerTypes.thread,
1204 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1205 minSize: expect.any(Number),
1206 maxSize: expect.any(Number),
1207 workerNodes: expect.any(Number),
1208 idleWorkerNodes: expect.any(Number),
1209 busyWorkerNodes: expect.any(Number),
1210 executedTasks: expect.any(Number),
1211 executingTasks: expect.any(Number),
1212 failedTasks: expect.any(Number)
1214 await pool.destroy()
1217 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1218 const pool = new FixedThreadPool(
1220 './tests/worker-files/thread/testWorker.mjs',
1222 enableTasksQueue: true
1225 stub(pool, 'hasBackPressure').returns(true)
1226 expect(pool.emitter.eventNames()).toStrictEqual([])
1227 const promises = new Set()
1228 let poolBackPressure = 0
1230 pool.emitter.on(PoolEvents.backPressure, info => {
1234 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1235 for (let i = 0; i < numberOfWorkers + 1; i++) {
1236 promises.add(pool.execute())
1238 await Promise.all(promises)
1239 expect(poolBackPressure).toBe(1)
1240 expect(poolInfo).toStrictEqual({
1242 type: PoolTypes.fixed,
1243 worker: WorkerTypes.thread,
1246 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1247 minSize: expect.any(Number),
1248 maxSize: expect.any(Number),
1249 workerNodes: expect.any(Number),
1250 idleWorkerNodes: expect.any(Number),
1251 busyWorkerNodes: expect.any(Number),
1252 executedTasks: expect.any(Number),
1253 executingTasks: expect.any(Number),
1254 maxQueuedTasks: expect.any(Number),
1255 queuedTasks: expect.any(Number),
1257 stolenTasks: expect.any(Number),
1258 failedTasks: expect.any(Number)
1260 expect(pool.hasBackPressure.called).toBe(true)
1261 await pool.destroy()
1264 it('Verify that hasTaskFunction() is working', async () => {
1265 const dynamicThreadPool = new DynamicThreadPool(
1266 Math.floor(numberOfWorkers / 2),
1268 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1270 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1271 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1272 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1275 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1276 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1278 await dynamicThreadPool.destroy()
1279 const fixedClusterPool = new FixedClusterPool(
1281 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1283 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1284 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1285 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1288 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1289 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1291 await fixedClusterPool.destroy()
1294 it('Verify that addTaskFunction() is working', async () => {
1295 const dynamicThreadPool = new DynamicThreadPool(
1296 Math.floor(numberOfWorkers / 2),
1298 './tests/worker-files/thread/testWorker.mjs'
1300 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1302 dynamicThreadPool.addTaskFunction(0, () => {})
1303 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1305 dynamicThreadPool.addTaskFunction('', () => {})
1306 ).rejects.toThrowError(
1307 new TypeError('name argument must not be an empty string')
1310 dynamicThreadPool.addTaskFunction('test', 0)
1311 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1313 dynamicThreadPool.addTaskFunction('test', '')
1314 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1315 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1319 const echoTaskFunction = data => {
1323 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1324 ).resolves.toBe(true)
1325 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1326 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1329 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1334 const taskFunctionData = { test: 'test' }
1335 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1336 expect(echoResult).toStrictEqual(taskFunctionData)
1337 for (const workerNode of dynamicThreadPool.workerNodes) {
1338 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1340 executed: expect.any(Number),
1347 history: new CircularArray()
1350 history: new CircularArray()
1354 history: new CircularArray()
1357 history: new CircularArray()
1362 await dynamicThreadPool.destroy()
1365 it('Verify that removeTaskFunction() is working', async () => {
1366 const dynamicThreadPool = new DynamicThreadPool(
1367 Math.floor(numberOfWorkers / 2),
1369 './tests/worker-files/thread/testWorker.mjs'
1371 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1372 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 dynamicThreadPool.removeTaskFunction('test')
1378 ).rejects.toThrowError(
1379 new Error('Cannot remove a task function not handled on the pool side')
1381 const echoTaskFunction = data => {
1384 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1385 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1386 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1389 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1394 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1397 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1398 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1399 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1403 await dynamicThreadPool.destroy()
1406 it('Verify that listTaskFunctionNames() is working', async () => {
1407 const dynamicThreadPool = new DynamicThreadPool(
1408 Math.floor(numberOfWorkers / 2),
1410 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1412 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1413 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1415 'jsonIntegerSerialization',
1419 await dynamicThreadPool.destroy()
1420 const fixedClusterPool = new FixedClusterPool(
1422 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1424 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1425 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1427 'jsonIntegerSerialization',
1431 await fixedClusterPool.destroy()
1434 it('Verify that setDefaultTaskFunction() is working', async () => {
1435 const dynamicThreadPool = new DynamicThreadPool(
1436 Math.floor(numberOfWorkers / 2),
1438 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1440 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1442 dynamicThreadPool.setDefaultTaskFunction(0)
1443 ).rejects.toThrowError(
1445 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1449 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1450 ).rejects.toThrowError(
1452 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1456 dynamicThreadPool.setDefaultTaskFunction('unknown')
1457 ).rejects.toThrowError(
1459 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1462 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1464 'jsonIntegerSerialization',
1469 dynamicThreadPool.setDefaultTaskFunction('factorial')
1470 ).resolves.toBe(true)
1471 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1474 'jsonIntegerSerialization',
1478 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1479 ).resolves.toBe(true)
1480 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1483 'jsonIntegerSerialization',
1488 it('Verify that multiple task functions worker is working', async () => {
1489 const pool = new DynamicClusterPool(
1490 Math.floor(numberOfWorkers / 2),
1492 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1494 const data = { n: 10 }
1495 const result0 = await pool.execute(data)
1496 expect(result0).toStrictEqual({ ok: 1 })
1497 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1498 expect(result1).toStrictEqual({ ok: 1 })
1499 const result2 = await pool.execute(data, 'factorial')
1500 expect(result2).toBe(3628800)
1501 const result3 = await pool.execute(data, 'fibonacci')
1502 expect(result3).toBe(55)
1503 expect(pool.info.executingTasks).toBe(0)
1504 expect(pool.info.executedTasks).toBe(4)
1505 for (const workerNode of pool.workerNodes) {
1506 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1508 'jsonIntegerSerialization',
1512 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1513 for (const name of pool.listTaskFunctionNames()) {
1514 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1516 executed: expect.any(Number),
1523 history: expect.any(CircularArray)
1526 history: expect.any(CircularArray)
1530 history: expect.any(CircularArray)
1533 history: expect.any(CircularArray)
1538 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1539 ).toBeGreaterThan(0)
1542 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1544 workerNode.getTaskFunctionWorkerUsage(
1545 workerNode.info.taskFunctionNames[1]
1549 await pool.destroy()
1552 it('Verify sendKillMessageToWorker()', async () => {
1553 const pool = new DynamicClusterPool(
1554 Math.floor(numberOfWorkers / 2),
1556 './tests/worker-files/cluster/testWorker.js'
1558 const workerNodeKey = 0
1560 pool.sendKillMessageToWorker(workerNodeKey)
1561 ).resolves.toBeUndefined()
1562 await pool.destroy()
1565 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1566 const pool = new DynamicClusterPool(
1567 Math.floor(numberOfWorkers / 2),
1569 './tests/worker-files/cluster/testWorker.js'
1571 const workerNodeKey = 0
1573 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1574 taskFunctionOperation: 'add',
1575 taskFunctionName: 'empty',
1576 taskFunction: (() => {}).toString()
1578 ).resolves.toBe(true)
1580 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1581 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1582 await pool.destroy()
1585 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1586 const pool = new DynamicClusterPool(
1587 Math.floor(numberOfWorkers / 2),
1589 './tests/worker-files/cluster/testWorker.js'
1592 pool.sendTaskFunctionOperationToWorkers({
1593 taskFunctionOperation: 'add',
1594 taskFunctionName: 'empty',
1595 taskFunction: (() => {}).toString()
1597 ).resolves.toBe(true)
1598 for (const workerNode of pool.workerNodes) {
1599 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1605 await pool.destroy()