1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Simulate pool creation from a non main thread/process', () => {
44 new StubPoolWithIsMain(
46 './tests/worker-files/thread/testWorker.mjs',
48 errorHandler: e => console.error(e)
53 'Cannot start a pool from a worker with the same type as the pool'
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
61 './tests/worker-files/thread/testWorker.mjs'
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
68 it('Verify that filePath is checked', () => {
69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
70 new Error("Cannot find the worker file 'undefined'")
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
74 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
77 it('Verify that numberOfWorkers is checked', () => {
82 './tests/worker-files/thread/testWorker.mjs'
86 'Cannot instantiate a pool without specifying the number of workers'
91 it('Verify that a negative number of workers is checked', () => {
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
97 'Cannot instantiate a pool with a negative number of workers'
102 it('Verify that a non integer number of workers is checked', () => {
105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
108 'Cannot instantiate a pool with a non safe integer number of workers'
113 it('Verify that dynamic pool sizing is checked', () => {
116 new DynamicClusterPool(
119 './tests/worker-files/cluster/testWorker.js'
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
128 new DynamicThreadPool(
131 './tests/worker-files/thread/testWorker.mjs'
135 'Cannot instantiate a pool with a non safe integer number of workers'
140 new DynamicClusterPool(
143 './tests/worker-files/cluster/testWorker.js'
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
152 new DynamicThreadPool(
155 './tests/worker-files/thread/testWorker.mjs'
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 new DynamicThreadPool(
167 './tests/worker-files/thread/testWorker.mjs'
171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
176 new DynamicClusterPool(
179 './tests/worker-files/cluster/testWorker.js'
183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
188 it('Verify that pool options are checked', async () => {
189 let pool = new FixedThreadPool(
191 './tests/worker-files/thread/testWorker.mjs'
193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
194 expect(pool.opts).toStrictEqual({
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
209 runTime: { median: false },
210 waitTime: { median: false },
211 elu: { median: false }
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
223 const testHandler = () => console.info('test handler executed')
224 pool = new FixedThreadPool(
226 './tests/worker-files/thread/testWorker.mjs',
228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
229 workerChoiceStrategyOptions: {
230 runTime: { median: true },
231 weights: { 0: 300, 1: 200 }
234 restartWorkerOnError: false,
235 enableTasksQueue: true,
236 tasksQueueOptions: { concurrency: 2 },
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
243 expect(pool.emitter).toBeUndefined()
244 expect(pool.opts).toStrictEqual({
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
251 size: Math.pow(numberOfWorkers, 2),
253 tasksStealingOnBackPressure: true
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
288 it('Verify that pool options are validated', () => {
293 './tests/worker-files/thread/testWorker.mjs',
295 workerChoiceStrategy: 'invalidStrategy'
298 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
303 './tests/worker-files/thread/testWorker.mjs',
305 workerChoiceStrategyOptions: {
306 retries: 'invalidChoiceRetries'
312 'Invalid worker choice strategy options: retries must be an integer'
319 './tests/worker-files/thread/testWorker.mjs',
321 workerChoiceStrategyOptions: {
328 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
335 './tests/worker-files/thread/testWorker.mjs',
337 workerChoiceStrategyOptions: { weights: {} }
342 'Invalid worker choice strategy options: must have a weight for each worker node'
349 './tests/worker-files/thread/testWorker.mjs',
351 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
356 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
363 './tests/worker-files/thread/testWorker.mjs',
365 enableTasksQueue: true,
366 tasksQueueOptions: 'invalidTasksQueueOptions'
370 new TypeError('Invalid tasks queue options: must be a plain object')
376 './tests/worker-files/thread/testWorker.mjs',
378 enableTasksQueue: true,
379 tasksQueueOptions: { concurrency: 0 }
384 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
391 './tests/worker-files/thread/testWorker.mjs',
393 enableTasksQueue: true,
394 tasksQueueOptions: { concurrency: -1 }
399 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
406 './tests/worker-files/thread/testWorker.mjs',
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: 0.2 }
413 new TypeError('Invalid worker node tasks concurrency: must be an integer')
419 './tests/worker-files/thread/testWorker.mjs',
421 enableTasksQueue: true,
422 tasksQueueOptions: { size: 0 }
427 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
434 './tests/worker-files/thread/testWorker.mjs',
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: -1 }
442 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
449 './tests/worker-files/thread/testWorker.mjs',
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: 0.2 }
456 new TypeError('Invalid worker node tasks queue size: must be an integer')
460 it('Verify that pool worker choice strategy options can be set', async () => {
461 const pool = new FixedThreadPool(
463 './tests/worker-files/thread/testWorker.mjs',
464 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
466 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
468 runTime: { median: false },
469 waitTime: { median: false },
470 elu: { median: false }
472 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
474 runTime: { median: false },
475 waitTime: { median: false },
476 elu: { median: false }
478 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
479 .workerChoiceStrategies) {
480 expect(workerChoiceStrategy.opts).toStrictEqual({
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
488 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
506 pool.setWorkerChoiceStrategyOptions({
507 runTime: { median: true },
508 elu: { median: true }
510 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
512 runTime: { median: true },
513 waitTime: { median: false },
514 elu: { median: true }
516 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
518 runTime: { median: true },
519 waitTime: { median: false },
520 elu: { median: true }
522 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
523 .workerChoiceStrategies) {
524 expect(workerChoiceStrategy.opts).toStrictEqual({
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
532 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
550 pool.setWorkerChoiceStrategyOptions({
551 runTime: { median: false },
552 elu: { median: false }
554 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
556 runTime: { median: false },
557 waitTime: { median: false },
558 elu: { median: false }
560 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
562 runTime: { median: false },
563 waitTime: { median: false },
564 elu: { median: false }
566 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
567 .workerChoiceStrategies) {
568 expect(workerChoiceStrategy.opts).toStrictEqual({
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
576 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
595 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
598 'Invalid worker choice strategy options: must be a plain object'
602 pool.setWorkerChoiceStrategyOptions({
603 retries: 'invalidChoiceRetries'
607 'Invalid worker choice strategy options: retries must be an integer'
610 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
615 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
617 'Invalid worker choice strategy options: must have a weight for each worker node'
621 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
624 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
630 it('Verify that pool tasks queue can be enabled/disabled', async () => {
631 const pool = new FixedThreadPool(
633 './tests/worker-files/thread/testWorker.mjs'
635 expect(pool.opts.enableTasksQueue).toBe(false)
636 expect(pool.opts.tasksQueueOptions).toBeUndefined()
637 pool.enableTasksQueue(true)
638 expect(pool.opts.enableTasksQueue).toBe(true)
639 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 size: Math.pow(numberOfWorkers, 2),
643 tasksStealingOnBackPressure: true
645 pool.enableTasksQueue(true, { concurrency: 2 })
646 expect(pool.opts.enableTasksQueue).toBe(true)
647 expect(pool.opts.tasksQueueOptions).toStrictEqual({
649 size: Math.pow(numberOfWorkers, 2),
651 tasksStealingOnBackPressure: true
653 pool.enableTasksQueue(false)
654 expect(pool.opts.enableTasksQueue).toBe(false)
655 expect(pool.opts.tasksQueueOptions).toBeUndefined()
659 it('Verify that pool tasks queue options can be set', async () => {
660 const pool = new FixedThreadPool(
662 './tests/worker-files/thread/testWorker.mjs',
663 { enableTasksQueue: true }
665 expect(pool.opts.tasksQueueOptions).toStrictEqual({
667 size: Math.pow(numberOfWorkers, 2),
669 tasksStealingOnBackPressure: true
671 for (const workerNode of pool.workerNodes) {
672 expect(workerNode.tasksQueueBackPressureSize).toBe(
673 pool.opts.tasksQueueOptions.size
676 pool.setTasksQueueOptions({
680 tasksStealingOnBackPressure: false
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
686 tasksStealingOnBackPressure: false
688 for (const workerNode of pool.workerNodes) {
689 expect(workerNode.tasksQueueBackPressureSize).toBe(
690 pool.opts.tasksQueueOptions.size
693 pool.setTasksQueueOptions({
696 tasksStealingOnBackPressure: true
698 expect(pool.opts.tasksQueueOptions).toStrictEqual({
700 size: Math.pow(numberOfWorkers, 2),
702 tasksStealingOnBackPressure: true
704 for (const workerNode of pool.workerNodes) {
705 expect(workerNode.tasksQueueBackPressureSize).toBe(
706 pool.opts.tasksQueueOptions.size
709 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
710 new TypeError('Invalid tasks queue options: must be a plain object')
712 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
714 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
717 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
719 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
722 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
723 new TypeError('Invalid worker node tasks concurrency: must be an integer')
725 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
727 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
730 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
732 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
735 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
736 new TypeError('Invalid worker node tasks queue size: must be an integer')
741 it('Verify that pool info is set', async () => {
742 let pool = new FixedThreadPool(
744 './tests/worker-files/thread/testWorker.mjs'
746 expect(pool.info).toStrictEqual({
748 type: PoolTypes.fixed,
749 worker: WorkerTypes.thread,
752 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
753 minSize: numberOfWorkers,
754 maxSize: numberOfWorkers,
755 workerNodes: numberOfWorkers,
756 idleWorkerNodes: numberOfWorkers,
763 pool = new DynamicClusterPool(
764 Math.floor(numberOfWorkers / 2),
766 './tests/worker-files/cluster/testWorker.js'
768 expect(pool.info).toStrictEqual({
770 type: PoolTypes.dynamic,
771 worker: WorkerTypes.cluster,
774 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
775 minSize: Math.floor(numberOfWorkers / 2),
776 maxSize: numberOfWorkers,
777 workerNodes: Math.floor(numberOfWorkers / 2),
778 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
787 it('Verify that pool worker tasks usage are initialized', async () => {
788 const pool = new FixedClusterPool(
790 './tests/worker-files/cluster/testWorker.js'
792 for (const workerNode of pool.workerNodes) {
793 expect(workerNode).toBeInstanceOf(WorkerNode)
794 expect(workerNode.usage).toStrictEqual({
804 history: new CircularArray()
807 history: new CircularArray()
811 history: new CircularArray()
814 history: new CircularArray()
822 it('Verify that pool worker tasks queue are initialized', async () => {
823 let pool = new FixedClusterPool(
825 './tests/worker-files/cluster/testWorker.js'
827 for (const workerNode of pool.workerNodes) {
828 expect(workerNode).toBeInstanceOf(WorkerNode)
829 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
830 expect(workerNode.tasksQueue.size).toBe(0)
831 expect(workerNode.tasksQueue.maxSize).toBe(0)
834 pool = new DynamicThreadPool(
835 Math.floor(numberOfWorkers / 2),
837 './tests/worker-files/thread/testWorker.mjs'
839 for (const workerNode of pool.workerNodes) {
840 expect(workerNode).toBeInstanceOf(WorkerNode)
841 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
842 expect(workerNode.tasksQueue.size).toBe(0)
843 expect(workerNode.tasksQueue.maxSize).toBe(0)
848 it('Verify that pool worker info are initialized', async () => {
849 let pool = new FixedClusterPool(
851 './tests/worker-files/cluster/testWorker.js'
853 for (const workerNode of pool.workerNodes) {
854 expect(workerNode).toBeInstanceOf(WorkerNode)
855 expect(workerNode.info).toStrictEqual({
856 id: expect.any(Number),
857 type: WorkerTypes.cluster,
863 pool = new DynamicThreadPool(
864 Math.floor(numberOfWorkers / 2),
866 './tests/worker-files/thread/testWorker.mjs'
868 for (const workerNode of pool.workerNodes) {
869 expect(workerNode).toBeInstanceOf(WorkerNode)
870 expect(workerNode.info).toStrictEqual({
871 id: expect.any(Number),
872 type: WorkerTypes.thread,
880 it('Verify that pool can be started after initialization', async () => {
881 const pool = new FixedClusterPool(
883 './tests/worker-files/cluster/testWorker.js',
888 expect(pool.info.started).toBe(false)
889 expect(pool.info.ready).toBe(false)
890 expect(pool.workerNodes).toStrictEqual([])
891 await expect(pool.execute()).rejects.toThrow(
892 new Error('Cannot execute a task on not started pool')
895 expect(pool.info.started).toBe(true)
896 expect(pool.info.ready).toBe(true)
897 expect(pool.workerNodes.length).toBe(numberOfWorkers)
898 for (const workerNode of pool.workerNodes) {
899 expect(workerNode).toBeInstanceOf(WorkerNode)
904 it('Verify that pool execute() arguments are checked', async () => {
905 const pool = new FixedClusterPool(
907 './tests/worker-files/cluster/testWorker.js'
909 await expect(pool.execute(undefined, 0)).rejects.toThrow(
910 new TypeError('name argument must be a string')
912 await expect(pool.execute(undefined, '')).rejects.toThrow(
913 new TypeError('name argument must not be an empty string')
915 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
916 new TypeError('transferList argument must be an array')
918 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
919 "Task function 'unknown' not found"
922 await expect(pool.execute()).rejects.toThrow(
923 new Error('Cannot execute a task on not started pool')
927 it('Verify that pool worker tasks usage are computed', async () => {
928 const pool = new FixedClusterPool(
930 './tests/worker-files/cluster/testWorker.js'
932 const promises = new Set()
933 const maxMultiplier = 2
934 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
935 promises.add(pool.execute())
937 for (const workerNode of pool.workerNodes) {
938 expect(workerNode.usage).toStrictEqual({
941 executing: maxMultiplier,
948 history: expect.any(CircularArray)
951 history: expect.any(CircularArray)
955 history: expect.any(CircularArray)
958 history: expect.any(CircularArray)
963 await Promise.all(promises)
964 for (const workerNode of pool.workerNodes) {
965 expect(workerNode.usage).toStrictEqual({
967 executed: maxMultiplier,
975 history: expect.any(CircularArray)
978 history: expect.any(CircularArray)
982 history: expect.any(CircularArray)
985 history: expect.any(CircularArray)
993 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
994 const pool = new DynamicThreadPool(
995 Math.floor(numberOfWorkers / 2),
997 './tests/worker-files/thread/testWorker.mjs'
999 const promises = new Set()
1000 const maxMultiplier = 2
1001 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1002 promises.add(pool.execute())
1004 await Promise.all(promises)
1005 for (const workerNode of pool.workerNodes) {
1006 expect(workerNode.usage).toStrictEqual({
1008 executed: expect.any(Number),
1016 history: expect.any(CircularArray)
1019 history: expect.any(CircularArray)
1023 history: expect.any(CircularArray)
1026 history: expect.any(CircularArray)
1030 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1031 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1032 numberOfWorkers * maxMultiplier
1034 expect(workerNode.usage.runTime.history.length).toBe(0)
1035 expect(workerNode.usage.waitTime.history.length).toBe(0)
1036 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1037 expect(workerNode.usage.elu.active.history.length).toBe(0)
1039 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1040 for (const workerNode of pool.workerNodes) {
1041 expect(workerNode.usage).toStrictEqual({
1051 history: expect.any(CircularArray)
1054 history: expect.any(CircularArray)
1058 history: expect.any(CircularArray)
1061 history: expect.any(CircularArray)
1065 expect(workerNode.usage.runTime.history.length).toBe(0)
1066 expect(workerNode.usage.waitTime.history.length).toBe(0)
1067 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1068 expect(workerNode.usage.elu.active.history.length).toBe(0)
1070 await pool.destroy()
1073 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1074 const pool = new DynamicClusterPool(
1075 Math.floor(numberOfWorkers / 2),
1077 './tests/worker-files/cluster/testWorker.js'
1079 expect(pool.emitter.eventNames()).toStrictEqual([])
1082 pool.emitter.on(PoolEvents.ready, info => {
1086 await waitPoolEvents(pool, PoolEvents.ready, 1)
1087 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1088 expect(poolReady).toBe(1)
1089 expect(poolInfo).toStrictEqual({
1091 type: PoolTypes.dynamic,
1092 worker: WorkerTypes.cluster,
1095 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1096 minSize: expect.any(Number),
1097 maxSize: expect.any(Number),
1098 workerNodes: expect.any(Number),
1099 idleWorkerNodes: expect.any(Number),
1100 busyWorkerNodes: expect.any(Number),
1101 executedTasks: expect.any(Number),
1102 executingTasks: expect.any(Number),
1103 failedTasks: expect.any(Number)
1105 await pool.destroy()
1108 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1109 const pool = new FixedThreadPool(
1111 './tests/worker-files/thread/testWorker.mjs'
1113 expect(pool.emitter.eventNames()).toStrictEqual([])
1114 const promises = new Set()
1117 pool.emitter.on(PoolEvents.busy, info => {
1121 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1122 for (let i = 0; i < numberOfWorkers * 2; i++) {
1123 promises.add(pool.execute())
1125 await Promise.all(promises)
1126 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1127 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1128 expect(poolBusy).toBe(numberOfWorkers + 1)
1129 expect(poolInfo).toStrictEqual({
1131 type: PoolTypes.fixed,
1132 worker: WorkerTypes.thread,
1135 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 minSize: expect.any(Number),
1137 maxSize: expect.any(Number),
1138 workerNodes: expect.any(Number),
1139 idleWorkerNodes: expect.any(Number),
1140 busyWorkerNodes: expect.any(Number),
1141 executedTasks: expect.any(Number),
1142 executingTasks: expect.any(Number),
1143 failedTasks: expect.any(Number)
1145 await pool.destroy()
1148 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1149 const pool = new DynamicThreadPool(
1150 Math.floor(numberOfWorkers / 2),
1152 './tests/worker-files/thread/testWorker.mjs'
1154 expect(pool.emitter.eventNames()).toStrictEqual([])
1155 const promises = new Set()
1158 pool.emitter.on(PoolEvents.full, info => {
1162 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1163 for (let i = 0; i < numberOfWorkers * 2; i++) {
1164 promises.add(pool.execute())
1166 await Promise.all(promises)
1167 expect(poolFull).toBe(1)
1168 expect(poolInfo).toStrictEqual({
1170 type: PoolTypes.dynamic,
1171 worker: WorkerTypes.thread,
1174 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1175 minSize: expect.any(Number),
1176 maxSize: expect.any(Number),
1177 workerNodes: expect.any(Number),
1178 idleWorkerNodes: expect.any(Number),
1179 busyWorkerNodes: expect.any(Number),
1180 executedTasks: expect.any(Number),
1181 executingTasks: expect.any(Number),
1182 failedTasks: expect.any(Number)
1184 await pool.destroy()
1187 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1188 const pool = new FixedThreadPool(
1190 './tests/worker-files/thread/testWorker.mjs',
1192 enableTasksQueue: true
1195 stub(pool, 'hasBackPressure').returns(true)
1196 expect(pool.emitter.eventNames()).toStrictEqual([])
1197 const promises = new Set()
1198 let poolBackPressure = 0
1200 pool.emitter.on(PoolEvents.backPressure, info => {
1204 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1205 for (let i = 0; i < numberOfWorkers + 1; i++) {
1206 promises.add(pool.execute())
1208 await Promise.all(promises)
1209 expect(poolBackPressure).toBe(1)
1210 expect(poolInfo).toStrictEqual({
1212 type: PoolTypes.fixed,
1213 worker: WorkerTypes.thread,
1216 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1217 minSize: expect.any(Number),
1218 maxSize: expect.any(Number),
1219 workerNodes: expect.any(Number),
1220 idleWorkerNodes: expect.any(Number),
1221 busyWorkerNodes: expect.any(Number),
1222 executedTasks: expect.any(Number),
1223 executingTasks: expect.any(Number),
1224 maxQueuedTasks: expect.any(Number),
1225 queuedTasks: expect.any(Number),
1227 stolenTasks: expect.any(Number),
1228 failedTasks: expect.any(Number)
1230 expect(pool.hasBackPressure.called).toBe(true)
1231 await pool.destroy()
1234 it('Verify that hasTaskFunction() is working', async () => {
1235 const dynamicThreadPool = new DynamicThreadPool(
1236 Math.floor(numberOfWorkers / 2),
1238 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1240 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1241 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1242 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1245 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1246 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1247 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1248 await dynamicThreadPool.destroy()
1249 const fixedClusterPool = new FixedClusterPool(
1251 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1253 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1254 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1255 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1258 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1259 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1260 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1261 await fixedClusterPool.destroy()
1264 it('Verify that addTaskFunction() is working', async () => {
1265 const dynamicThreadPool = new DynamicThreadPool(
1266 Math.floor(numberOfWorkers / 2),
1268 './tests/worker-files/thread/testWorker.mjs'
1270 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1272 dynamicThreadPool.addTaskFunction(0, () => {})
1273 ).rejects.toThrow(new TypeError('name argument must be a string'))
1275 dynamicThreadPool.addTaskFunction('', () => {})
1277 new TypeError('name argument must not be an empty string')
1279 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1280 new TypeError('fn argument must be a function')
1282 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1283 new TypeError('fn argument must be a function')
1285 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1289 const echoTaskFunction = data => {
1293 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1294 ).resolves.toBe(true)
1295 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1296 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1299 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1304 const taskFunctionData = { test: 'test' }
1305 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1306 expect(echoResult).toStrictEqual(taskFunctionData)
1307 for (const workerNode of dynamicThreadPool.workerNodes) {
1308 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1310 executed: expect.any(Number),
1317 history: new CircularArray()
1320 history: new CircularArray()
1324 history: new CircularArray()
1327 history: new CircularArray()
1332 await dynamicThreadPool.destroy()
1335 it('Verify that removeTaskFunction() is working', async () => {
1336 const dynamicThreadPool = new DynamicThreadPool(
1337 Math.floor(numberOfWorkers / 2),
1339 './tests/worker-files/thread/testWorker.mjs'
1341 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1342 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1346 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1347 new Error('Cannot remove a task function not handled on the pool side')
1349 const echoTaskFunction = data => {
1352 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1353 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1354 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1357 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1362 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1365 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1366 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1367 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1371 await dynamicThreadPool.destroy()
1374 it('Verify that listTaskFunctionNames() is working', async () => {
1375 const dynamicThreadPool = new DynamicThreadPool(
1376 Math.floor(numberOfWorkers / 2),
1378 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1380 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1381 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1383 'jsonIntegerSerialization',
1387 await dynamicThreadPool.destroy()
1388 const fixedClusterPool = new FixedClusterPool(
1390 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1392 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1393 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1395 'jsonIntegerSerialization',
1399 await fixedClusterPool.destroy()
1402 it('Verify that setDefaultTaskFunction() is working', async () => {
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1406 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1409 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1411 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1415 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1418 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1422 dynamicThreadPool.setDefaultTaskFunction('unknown')
1425 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1428 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1430 'jsonIntegerSerialization',
1435 dynamicThreadPool.setDefaultTaskFunction('factorial')
1436 ).resolves.toBe(true)
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1440 'jsonIntegerSerialization',
1444 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1449 'jsonIntegerSerialization',
1452 await dynamicThreadPool.destroy()
1455 it('Verify that multiple task functions worker is working', async () => {
1456 const pool = new DynamicClusterPool(
1457 Math.floor(numberOfWorkers / 2),
1459 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1461 const data = { n: 10 }
1462 const result0 = await pool.execute(data)
1463 expect(result0).toStrictEqual({ ok: 1 })
1464 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1465 expect(result1).toStrictEqual({ ok: 1 })
1466 const result2 = await pool.execute(data, 'factorial')
1467 expect(result2).toBe(3628800)
1468 const result3 = await pool.execute(data, 'fibonacci')
1469 expect(result3).toBe(55)
1470 expect(pool.info.executingTasks).toBe(0)
1471 expect(pool.info.executedTasks).toBe(4)
1472 for (const workerNode of pool.workerNodes) {
1473 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1475 'jsonIntegerSerialization',
1479 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1480 for (const name of pool.listTaskFunctionNames()) {
1481 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1483 executed: expect.any(Number),
1490 history: expect.any(CircularArray)
1493 history: expect.any(CircularArray)
1497 history: expect.any(CircularArray)
1500 history: expect.any(CircularArray)
1505 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1506 ).toBeGreaterThan(0)
1509 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1511 workerNode.getTaskFunctionWorkerUsage(
1512 workerNode.info.taskFunctionNames[1]
1516 await pool.destroy()
1519 it('Verify sendKillMessageToWorker()', async () => {
1520 const pool = new DynamicClusterPool(
1521 Math.floor(numberOfWorkers / 2),
1523 './tests/worker-files/cluster/testWorker.js'
1525 const workerNodeKey = 0
1527 pool.sendKillMessageToWorker(workerNodeKey)
1528 ).resolves.toBeUndefined()
1529 await pool.destroy()
1532 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1533 const pool = new DynamicClusterPool(
1534 Math.floor(numberOfWorkers / 2),
1536 './tests/worker-files/cluster/testWorker.js'
1538 const workerNodeKey = 0
1540 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1541 taskFunctionOperation: 'add',
1542 taskFunctionName: 'empty',
1543 taskFunction: (() => {}).toString()
1545 ).resolves.toBe(true)
1547 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1548 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1549 await pool.destroy()
1552 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1553 const pool = new DynamicClusterPool(
1554 Math.floor(numberOfWorkers / 2),
1556 './tests/worker-files/cluster/testWorker.js'
1559 pool.sendTaskFunctionOperationToWorkers({
1560 taskFunctionOperation: 'add',
1561 taskFunctionName: 'empty',
1562 taskFunction: (() => {}).toString()
1564 ).resolves.toBe(true)
1565 for (const workerNode of pool.workerNodes) {
1566 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1572 await pool.destroy()