1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
44 './tests/worker-files/thread/testWorker.mjs'
46 expect(pool).toBeInstanceOf(FixedThreadPool)
50 it('Verify that pool cannot be created from a non main thread/process', () => {
53 new StubPoolWithIsMain(
55 './tests/worker-files/thread/testWorker.mjs',
57 errorHandler: e => console.error(e)
62 'Cannot start a pool from a worker with the same type as the pool'
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
70 './tests/worker-files/thread/testWorker.mjs'
72 expect(pool.started).toBe(true)
73 expect(pool.starting).toBe(false)
74 expect(pool.destroying).toBe(false)
78 it('Verify that filePath is checked', () => {
79 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
80 new Error("Cannot find the worker file 'undefined'")
83 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
84 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
87 it('Verify that numberOfWorkers is checked', () => {
92 './tests/worker-files/thread/testWorker.mjs'
96 'Cannot instantiate a pool without specifying the number of workers'
101 it('Verify that a negative number of workers is checked', () => {
104 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
107 'Cannot instantiate a pool with a negative number of workers'
112 it('Verify that a non integer number of workers is checked', () => {
115 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
118 'Cannot instantiate a pool with a non safe integer number of workers'
123 it('Verify that dynamic pool sizing is checked', () => {
126 new DynamicClusterPool(
129 './tests/worker-files/cluster/testWorker.js'
133 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
138 new DynamicThreadPool(
141 './tests/worker-files/thread/testWorker.mjs'
145 'Cannot instantiate a pool with a non safe integer number of workers'
150 new DynamicClusterPool(
153 './tests/worker-files/cluster/testWorker.js'
157 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
162 new DynamicThreadPool(
165 './tests/worker-files/thread/testWorker.mjs'
169 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 new DynamicThreadPool(
177 './tests/worker-files/thread/testWorker.mjs'
181 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
186 new DynamicClusterPool(
189 './tests/worker-files/cluster/testWorker.js'
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
198 it('Verify that pool options are checked', async () => {
199 let pool = new FixedThreadPool(
201 './tests/worker-files/thread/testWorker.mjs'
203 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
204 expect(pool.opts).toStrictEqual({
207 restartWorkerOnError: true,
208 enableTasksQueue: false,
209 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
210 workerChoiceStrategyOptions: {
212 runTime: { median: false },
213 waitTime: { median: false },
214 elu: { median: false }
217 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
219 runTime: { median: false },
220 waitTime: { median: false },
221 elu: { median: false }
223 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
224 .workerChoiceStrategies) {
225 expect(workerChoiceStrategy.opts).toStrictEqual({
227 runTime: { median: false },
228 waitTime: { median: false },
229 elu: { median: false }
233 const testHandler = () => console.info('test handler executed')
234 pool = new FixedThreadPool(
236 './tests/worker-files/thread/testWorker.mjs',
238 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
239 workerChoiceStrategyOptions: {
240 runTime: { median: true },
241 weights: { 0: 300, 1: 200 }
244 restartWorkerOnError: false,
245 enableTasksQueue: true,
246 tasksQueueOptions: { concurrency: 2 },
247 messageHandler: testHandler,
248 errorHandler: testHandler,
249 onlineHandler: testHandler,
250 exitHandler: testHandler
253 expect(pool.emitter).toBeUndefined()
254 expect(pool.opts).toStrictEqual({
257 restartWorkerOnError: false,
258 enableTasksQueue: true,
261 size: Math.pow(numberOfWorkers, 2),
263 tasksStealingOnBackPressure: true
265 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
266 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 waitTime: { median: false },
270 elu: { median: false },
271 weights: { 0: 300, 1: 200 }
273 onlineHandler: testHandler,
274 messageHandler: testHandler,
275 errorHandler: testHandler,
276 exitHandler: testHandler
278 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
280 runTime: { median: true },
281 waitTime: { median: false },
282 elu: { median: false },
283 weights: { 0: 300, 1: 200 }
285 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
286 .workerChoiceStrategies) {
287 expect(workerChoiceStrategy.opts).toStrictEqual({
289 runTime: { median: true },
290 waitTime: { median: false },
291 elu: { median: false },
292 weights: { 0: 300, 1: 200 }
298 it('Verify that pool options are validated', () => {
303 './tests/worker-files/thread/testWorker.mjs',
305 workerChoiceStrategy: 'invalidStrategy'
308 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
313 './tests/worker-files/thread/testWorker.mjs',
315 workerChoiceStrategyOptions: {
316 retries: 'invalidChoiceRetries'
322 'Invalid worker choice strategy options: retries must be an integer'
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategyOptions: {
338 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
345 './tests/worker-files/thread/testWorker.mjs',
347 workerChoiceStrategyOptions: { weights: {} }
352 'Invalid worker choice strategy options: must have a weight for each worker node'
359 './tests/worker-files/thread/testWorker.mjs',
361 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
366 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
373 './tests/worker-files/thread/testWorker.mjs',
375 enableTasksQueue: true,
376 tasksQueueOptions: 'invalidTasksQueueOptions'
380 new TypeError('Invalid tasks queue options: must be a plain object')
386 './tests/worker-files/thread/testWorker.mjs',
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0 }
394 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
401 './tests/worker-files/thread/testWorker.mjs',
403 enableTasksQueue: true,
404 tasksQueueOptions: { concurrency: -1 }
409 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
416 './tests/worker-files/thread/testWorker.mjs',
418 enableTasksQueue: true,
419 tasksQueueOptions: { concurrency: 0.2 }
423 new TypeError('Invalid worker node tasks concurrency: must be an integer')
429 './tests/worker-files/thread/testWorker.mjs',
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0 }
437 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
444 './tests/worker-files/thread/testWorker.mjs',
446 enableTasksQueue: true,
447 tasksQueueOptions: { size: -1 }
452 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
459 './tests/worker-files/thread/testWorker.mjs',
461 enableTasksQueue: true,
462 tasksQueueOptions: { size: 0.2 }
466 new TypeError('Invalid worker node tasks queue size: must be an integer')
470 it('Verify that pool worker choice strategy options can be set', async () => {
471 const pool = new FixedThreadPool(
473 './tests/worker-files/thread/testWorker.mjs',
474 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
476 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
478 runTime: { median: false },
479 waitTime: { median: false },
480 elu: { median: false }
482 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
484 runTime: { median: false },
485 waitTime: { median: false },
486 elu: { median: false }
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
490 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: false },
493 waitTime: { median: false },
494 elu: { median: false }
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: true },
518 elu: { median: true }
520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: true },
523 waitTime: { median: false },
524 elu: { median: true }
526 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
528 runTime: { median: true },
529 waitTime: { median: false },
530 elu: { median: true }
532 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
533 .workerChoiceStrategies) {
534 expect(workerChoiceStrategy.opts).toStrictEqual({
536 runTime: { median: true },
537 waitTime: { median: false },
538 elu: { median: true }
542 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions({
561 runTime: { median: false },
562 elu: { median: false }
564 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
566 runTime: { median: false },
567 waitTime: { median: false },
568 elu: { median: false }
570 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
572 runTime: { median: false },
573 waitTime: { median: false },
574 elu: { median: false }
576 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
577 .workerChoiceStrategies) {
578 expect(workerChoiceStrategy.opts).toStrictEqual({
580 runTime: { median: false },
581 waitTime: { median: false },
582 elu: { median: false }
586 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
605 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
608 'Invalid worker choice strategy options: must be a plain object'
612 pool.setWorkerChoiceStrategyOptions({
613 retries: 'invalidChoiceRetries'
617 'Invalid worker choice strategy options: retries must be an integer'
620 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
622 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
625 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
627 'Invalid worker choice strategy options: must have a weight for each worker node'
631 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
634 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
640 it('Verify that pool tasks queue can be enabled/disabled', async () => {
641 const pool = new FixedThreadPool(
643 './tests/worker-files/thread/testWorker.mjs'
645 expect(pool.opts.enableTasksQueue).toBe(false)
646 expect(pool.opts.tasksQueueOptions).toBeUndefined()
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 size: Math.pow(numberOfWorkers, 2),
653 tasksStealingOnBackPressure: true
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 size: Math.pow(numberOfWorkers, 2),
661 tasksStealingOnBackPressure: true
663 pool.enableTasksQueue(false)
664 expect(pool.opts.enableTasksQueue).toBe(false)
665 expect(pool.opts.tasksQueueOptions).toBeUndefined()
669 it('Verify that pool tasks queue options can be set', async () => {
670 const pool = new FixedThreadPool(
672 './tests/worker-files/thread/testWorker.mjs',
673 { enableTasksQueue: true }
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
677 size: Math.pow(numberOfWorkers, 2),
679 tasksStealingOnBackPressure: true
681 for (const workerNode of pool.workerNodes) {
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
686 pool.setTasksQueueOptions({
690 tasksStealingOnBackPressure: false
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
696 tasksStealingOnBackPressure: false
698 for (const workerNode of pool.workerNodes) {
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
703 pool.setTasksQueueOptions({
706 tasksStealingOnBackPressure: true
708 expect(pool.opts.tasksQueueOptions).toStrictEqual({
710 size: Math.pow(numberOfWorkers, 2),
712 tasksStealingOnBackPressure: true
714 for (const workerNode of pool.workerNodes) {
715 expect(workerNode.tasksQueueBackPressureSize).toBe(
716 pool.opts.tasksQueueOptions.size
719 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
720 new TypeError('Invalid tasks queue options: must be a plain object')
722 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
724 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
727 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
729 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
732 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
733 new TypeError('Invalid worker node tasks concurrency: must be an integer')
735 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
737 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
740 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
742 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
745 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
746 new TypeError('Invalid worker node tasks queue size: must be an integer')
751 it('Verify that pool info is set', async () => {
752 let pool = new FixedThreadPool(
754 './tests/worker-files/thread/testWorker.mjs'
756 expect(pool.info).toStrictEqual({
758 type: PoolTypes.fixed,
759 worker: WorkerTypes.thread,
762 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
763 minSize: numberOfWorkers,
764 maxSize: numberOfWorkers,
765 workerNodes: numberOfWorkers,
766 idleWorkerNodes: numberOfWorkers,
773 pool = new DynamicClusterPool(
774 Math.floor(numberOfWorkers / 2),
776 './tests/worker-files/cluster/testWorker.js'
778 expect(pool.info).toStrictEqual({
780 type: PoolTypes.dynamic,
781 worker: WorkerTypes.cluster,
784 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
785 minSize: Math.floor(numberOfWorkers / 2),
786 maxSize: numberOfWorkers,
787 workerNodes: Math.floor(numberOfWorkers / 2),
788 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
797 it('Verify that pool worker tasks usage are initialized', async () => {
798 const pool = new FixedClusterPool(
800 './tests/worker-files/cluster/testWorker.js'
802 for (const workerNode of pool.workerNodes) {
803 expect(workerNode).toBeInstanceOf(WorkerNode)
804 expect(workerNode.usage).toStrictEqual({
814 history: new CircularArray()
817 history: new CircularArray()
821 history: new CircularArray()
824 history: new CircularArray()
832 it('Verify that pool worker tasks queue are initialized', async () => {
833 let pool = new FixedClusterPool(
835 './tests/worker-files/cluster/testWorker.js'
837 for (const workerNode of pool.workerNodes) {
838 expect(workerNode).toBeInstanceOf(WorkerNode)
839 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
840 expect(workerNode.tasksQueue.size).toBe(0)
841 expect(workerNode.tasksQueue.maxSize).toBe(0)
844 pool = new DynamicThreadPool(
845 Math.floor(numberOfWorkers / 2),
847 './tests/worker-files/thread/testWorker.mjs'
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
858 it('Verify that pool worker info are initialized', async () => {
859 let pool = new FixedClusterPool(
861 './tests/worker-files/cluster/testWorker.js'
863 for (const workerNode of pool.workerNodes) {
864 expect(workerNode).toBeInstanceOf(WorkerNode)
865 expect(workerNode.info).toStrictEqual({
866 id: expect.any(Number),
867 type: WorkerTypes.cluster,
873 pool = new DynamicThreadPool(
874 Math.floor(numberOfWorkers / 2),
876 './tests/worker-files/thread/testWorker.mjs'
878 for (const workerNode of pool.workerNodes) {
879 expect(workerNode).toBeInstanceOf(WorkerNode)
880 expect(workerNode.info).toStrictEqual({
881 id: expect.any(Number),
882 type: WorkerTypes.thread,
890 it('Verify that pool statuses are checked at start or destroy', async () => {
891 const pool = new FixedThreadPool(
893 './tests/worker-files/thread/testWorker.mjs'
895 expect(pool.info.started).toBe(true)
896 expect(pool.info.ready).toBe(true)
897 expect(() => pool.start()).toThrow(
898 new Error('Cannot start an already started pool')
901 expect(pool.info.started).toBe(false)
902 expect(pool.info.ready).toBe(false)
903 await expect(pool.destroy()).rejects.toThrow(
904 new Error('Cannot destroy an already destroyed pool')
908 it('Verify that pool can be started after initialization', async () => {
909 const pool = new FixedClusterPool(
911 './tests/worker-files/cluster/testWorker.js',
916 expect(pool.info.started).toBe(false)
917 expect(pool.info.ready).toBe(false)
918 expect(pool.workerNodes).toStrictEqual([])
919 await expect(pool.execute()).rejects.toThrow(
920 new Error('Cannot execute a task on not started pool')
923 expect(pool.info.started).toBe(true)
924 expect(pool.info.ready).toBe(true)
925 expect(pool.workerNodes.length).toBe(numberOfWorkers)
926 for (const workerNode of pool.workerNodes) {
927 expect(workerNode).toBeInstanceOf(WorkerNode)
932 it('Verify that pool execute() arguments are checked', async () => {
933 const pool = new FixedClusterPool(
935 './tests/worker-files/cluster/testWorker.js'
937 await expect(pool.execute(undefined, 0)).rejects.toThrow(
938 new TypeError('name argument must be a string')
940 await expect(pool.execute(undefined, '')).rejects.toThrow(
941 new TypeError('name argument must not be an empty string')
943 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
944 new TypeError('transferList argument must be an array')
946 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
947 "Task function 'unknown' not found"
950 await expect(pool.execute()).rejects.toThrow(
951 new Error('Cannot execute a task on not started pool')
955 it('Verify that pool worker tasks usage are computed', async () => {
956 const pool = new FixedClusterPool(
958 './tests/worker-files/cluster/testWorker.js'
960 const promises = new Set()
961 const maxMultiplier = 2
962 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
963 promises.add(pool.execute())
965 for (const workerNode of pool.workerNodes) {
966 expect(workerNode.usage).toStrictEqual({
969 executing: maxMultiplier,
976 history: expect.any(CircularArray)
979 history: expect.any(CircularArray)
983 history: expect.any(CircularArray)
986 history: expect.any(CircularArray)
991 await Promise.all(promises)
992 for (const workerNode of pool.workerNodes) {
993 expect(workerNode.usage).toStrictEqual({
995 executed: maxMultiplier,
1003 history: expect.any(CircularArray)
1006 history: expect.any(CircularArray)
1010 history: expect.any(CircularArray)
1013 history: expect.any(CircularArray)
1018 await pool.destroy()
1021 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1022 const pool = new DynamicThreadPool(
1023 Math.floor(numberOfWorkers / 2),
1025 './tests/worker-files/thread/testWorker.mjs'
1027 const promises = new Set()
1028 const maxMultiplier = 2
1029 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1030 promises.add(pool.execute())
1032 await Promise.all(promises)
1033 for (const workerNode of pool.workerNodes) {
1034 expect(workerNode.usage).toStrictEqual({
1036 executed: expect.any(Number),
1044 history: expect.any(CircularArray)
1047 history: expect.any(CircularArray)
1051 history: expect.any(CircularArray)
1054 history: expect.any(CircularArray)
1058 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1059 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1060 numberOfWorkers * maxMultiplier
1062 expect(workerNode.usage.runTime.history.length).toBe(0)
1063 expect(workerNode.usage.waitTime.history.length).toBe(0)
1064 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1065 expect(workerNode.usage.elu.active.history.length).toBe(0)
1067 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1068 for (const workerNode of pool.workerNodes) {
1069 expect(workerNode.usage).toStrictEqual({
1079 history: expect.any(CircularArray)
1082 history: expect.any(CircularArray)
1086 history: expect.any(CircularArray)
1089 history: expect.any(CircularArray)
1093 expect(workerNode.usage.runTime.history.length).toBe(0)
1094 expect(workerNode.usage.waitTime.history.length).toBe(0)
1095 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1096 expect(workerNode.usage.elu.active.history.length).toBe(0)
1098 await pool.destroy()
1101 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1102 const pool = new DynamicClusterPool(
1103 Math.floor(numberOfWorkers / 2),
1105 './tests/worker-files/cluster/testWorker.js'
1107 expect(pool.emitter.eventNames()).toStrictEqual([])
1110 pool.emitter.on(PoolEvents.ready, info => {
1114 await waitPoolEvents(pool, PoolEvents.ready, 1)
1115 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1116 expect(poolReady).toBe(1)
1117 expect(poolInfo).toStrictEqual({
1119 type: PoolTypes.dynamic,
1120 worker: WorkerTypes.cluster,
1123 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1124 minSize: expect.any(Number),
1125 maxSize: expect.any(Number),
1126 workerNodes: expect.any(Number),
1127 idleWorkerNodes: expect.any(Number),
1128 busyWorkerNodes: expect.any(Number),
1129 executedTasks: expect.any(Number),
1130 executingTasks: expect.any(Number),
1131 failedTasks: expect.any(Number)
1133 await pool.destroy()
1136 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1137 const pool = new FixedThreadPool(
1139 './tests/worker-files/thread/testWorker.mjs'
1141 expect(pool.emitter.eventNames()).toStrictEqual([])
1142 const promises = new Set()
1145 pool.emitter.on(PoolEvents.busy, info => {
1149 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1150 for (let i = 0; i < numberOfWorkers * 2; i++) {
1151 promises.add(pool.execute())
1153 await Promise.all(promises)
1154 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1155 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1156 expect(poolBusy).toBe(numberOfWorkers + 1)
1157 expect(poolInfo).toStrictEqual({
1159 type: PoolTypes.fixed,
1160 worker: WorkerTypes.thread,
1163 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1164 minSize: expect.any(Number),
1165 maxSize: expect.any(Number),
1166 workerNodes: expect.any(Number),
1167 idleWorkerNodes: expect.any(Number),
1168 busyWorkerNodes: expect.any(Number),
1169 executedTasks: expect.any(Number),
1170 executingTasks: expect.any(Number),
1171 failedTasks: expect.any(Number)
1173 await pool.destroy()
1176 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1177 const pool = new DynamicThreadPool(
1178 Math.floor(numberOfWorkers / 2),
1180 './tests/worker-files/thread/testWorker.mjs'
1182 expect(pool.emitter.eventNames()).toStrictEqual([])
1183 const promises = new Set()
1186 pool.emitter.on(PoolEvents.full, info => {
1190 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1191 for (let i = 0; i < numberOfWorkers * 2; i++) {
1192 promises.add(pool.execute())
1194 await Promise.all(promises)
1195 expect(poolFull).toBe(1)
1196 expect(poolInfo).toStrictEqual({
1198 type: PoolTypes.dynamic,
1199 worker: WorkerTypes.thread,
1202 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1203 minSize: expect.any(Number),
1204 maxSize: expect.any(Number),
1205 workerNodes: expect.any(Number),
1206 idleWorkerNodes: expect.any(Number),
1207 busyWorkerNodes: expect.any(Number),
1208 executedTasks: expect.any(Number),
1209 executingTasks: expect.any(Number),
1210 failedTasks: expect.any(Number)
1212 await pool.destroy()
1215 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1216 const pool = new FixedThreadPool(
1218 './tests/worker-files/thread/testWorker.mjs',
1220 enableTasksQueue: true
1223 stub(pool, 'hasBackPressure').returns(true)
1224 expect(pool.emitter.eventNames()).toStrictEqual([])
1225 const promises = new Set()
1226 let poolBackPressure = 0
1228 pool.emitter.on(PoolEvents.backPressure, info => {
1232 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1233 for (let i = 0; i < numberOfWorkers + 1; i++) {
1234 promises.add(pool.execute())
1236 await Promise.all(promises)
1237 expect(poolBackPressure).toBe(1)
1238 expect(poolInfo).toStrictEqual({
1240 type: PoolTypes.fixed,
1241 worker: WorkerTypes.thread,
1244 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1245 minSize: expect.any(Number),
1246 maxSize: expect.any(Number),
1247 workerNodes: expect.any(Number),
1248 idleWorkerNodes: expect.any(Number),
1249 busyWorkerNodes: expect.any(Number),
1250 executedTasks: expect.any(Number),
1251 executingTasks: expect.any(Number),
1252 maxQueuedTasks: expect.any(Number),
1253 queuedTasks: expect.any(Number),
1255 stolenTasks: expect.any(Number),
1256 failedTasks: expect.any(Number)
1258 expect(pool.hasBackPressure.called).toBe(true)
1259 await pool.destroy()
1262 it('Verify that hasTaskFunction() is working', async () => {
1263 const dynamicThreadPool = new DynamicThreadPool(
1264 Math.floor(numberOfWorkers / 2),
1266 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1268 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1269 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1270 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1273 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1274 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1275 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1276 await dynamicThreadPool.destroy()
1277 const fixedClusterPool = new FixedClusterPool(
1279 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1281 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1282 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1283 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1286 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1287 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1288 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1289 await fixedClusterPool.destroy()
1292 it('Verify that addTaskFunction() is working', async () => {
1293 const dynamicThreadPool = new DynamicThreadPool(
1294 Math.floor(numberOfWorkers / 2),
1296 './tests/worker-files/thread/testWorker.mjs'
1298 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1300 dynamicThreadPool.addTaskFunction(0, () => {})
1301 ).rejects.toThrow(new TypeError('name argument must be a string'))
1303 dynamicThreadPool.addTaskFunction('', () => {})
1305 new TypeError('name argument must not be an empty string')
1307 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1308 new TypeError('fn argument must be a function')
1310 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1311 new TypeError('fn argument must be a function')
1313 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1317 const echoTaskFunction = data => {
1321 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1322 ).resolves.toBe(true)
1323 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1324 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1327 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1332 const taskFunctionData = { test: 'test' }
1333 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1334 expect(echoResult).toStrictEqual(taskFunctionData)
1335 for (const workerNode of dynamicThreadPool.workerNodes) {
1336 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1338 executed: expect.any(Number),
1345 history: new CircularArray()
1348 history: new CircularArray()
1352 history: new CircularArray()
1355 history: new CircularArray()
1360 await dynamicThreadPool.destroy()
1363 it('Verify that removeTaskFunction() is working', async () => {
1364 const dynamicThreadPool = new DynamicThreadPool(
1365 Math.floor(numberOfWorkers / 2),
1367 './tests/worker-files/thread/testWorker.mjs'
1369 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1370 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1374 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1375 new Error('Cannot remove a task function not handled on the pool side')
1377 const echoTaskFunction = data => {
1380 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1381 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1382 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1385 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1390 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1393 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1394 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1395 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1399 await dynamicThreadPool.destroy()
1402 it('Verify that listTaskFunctionNames() is working', async () => {
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1406 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1409 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1411 'jsonIntegerSerialization',
1415 await dynamicThreadPool.destroy()
1416 const fixedClusterPool = new FixedClusterPool(
1418 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1420 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1421 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1423 'jsonIntegerSerialization',
1427 await fixedClusterPool.destroy()
1430 it('Verify that setDefaultTaskFunction() is working', async () => {
1431 const dynamicThreadPool = new DynamicThreadPool(
1432 Math.floor(numberOfWorkers / 2),
1434 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1436 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1437 const workerId = dynamicThreadPool.workerNodes[0].info.id
1438 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1440 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1444 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1447 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1451 dynamicThreadPool.setDefaultTaskFunction('unknown')
1454 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1457 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1459 'jsonIntegerSerialization',
1464 dynamicThreadPool.setDefaultTaskFunction('factorial')
1465 ).resolves.toBe(true)
1466 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 'jsonIntegerSerialization',
1473 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1474 ).resolves.toBe(true)
1475 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1478 'jsonIntegerSerialization',
1481 await dynamicThreadPool.destroy()
1484 it('Verify that multiple task functions worker is working', async () => {
1485 const pool = new DynamicClusterPool(
1486 Math.floor(numberOfWorkers / 2),
1488 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1490 const data = { n: 10 }
1491 const result0 = await pool.execute(data)
1492 expect(result0).toStrictEqual({ ok: 1 })
1493 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1494 expect(result1).toStrictEqual({ ok: 1 })
1495 const result2 = await pool.execute(data, 'factorial')
1496 expect(result2).toBe(3628800)
1497 const result3 = await pool.execute(data, 'fibonacci')
1498 expect(result3).toBe(55)
1499 expect(pool.info.executingTasks).toBe(0)
1500 expect(pool.info.executedTasks).toBe(4)
1501 for (const workerNode of pool.workerNodes) {
1502 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1504 'jsonIntegerSerialization',
1508 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1509 for (const name of pool.listTaskFunctionNames()) {
1510 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1512 executed: expect.any(Number),
1519 history: expect.any(CircularArray)
1522 history: expect.any(CircularArray)
1526 history: expect.any(CircularArray)
1529 history: expect.any(CircularArray)
1534 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1535 ).toBeGreaterThan(0)
1538 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1540 workerNode.getTaskFunctionWorkerUsage(
1541 workerNode.info.taskFunctionNames[1]
1545 await pool.destroy()
1548 it('Verify sendKillMessageToWorker()', async () => {
1549 const pool = new DynamicClusterPool(
1550 Math.floor(numberOfWorkers / 2),
1552 './tests/worker-files/cluster/testWorker.js'
1554 const workerNodeKey = 0
1556 pool.sendKillMessageToWorker(workerNodeKey)
1557 ).resolves.toBeUndefined()
1558 await pool.destroy()
1561 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1562 const pool = new DynamicClusterPool(
1563 Math.floor(numberOfWorkers / 2),
1565 './tests/worker-files/cluster/testWorker.js'
1567 const workerNodeKey = 0
1569 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1570 taskFunctionOperation: 'add',
1571 taskFunctionName: 'empty',
1572 taskFunction: (() => {}).toString()
1574 ).resolves.toBe(true)
1576 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1577 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1578 await pool.destroy()
1581 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1582 const pool = new DynamicClusterPool(
1583 Math.floor(numberOfWorkers / 2),
1585 './tests/worker-files/cluster/testWorker.js'
1588 pool.sendTaskFunctionOperationToWorkers({
1589 taskFunctionOperation: 'add',
1590 taskFunctionName: 'empty',
1591 taskFunction: (() => {}).toString()
1593 ).resolves.toBe(true)
1594 for (const workerNode of pool.workerNodes) {
1595 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1601 await pool.destroy()