1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
44 './tests/worker-files/thread/testWorker.mjs'
46 expect(pool).toBeInstanceOf(FixedThreadPool)
50 it('Verify that pool cannot be created from a non main thread/process', () => {
53 new StubPoolWithIsMain(
55 './tests/worker-files/thread/testWorker.mjs',
57 errorHandler: e => console.error(e)
62 'Cannot start a pool from a worker with the same type as the pool'
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
70 './tests/worker-files/thread/testWorker.mjs'
72 expect(pool.started).toBe(true)
73 expect(pool.starting).toBe(false)
74 expect(pool.destroying).toBe(false)
78 it('Verify that filePath is checked', () => {
79 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
80 new Error("Cannot find the worker file 'undefined'")
83 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
84 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
87 it('Verify that numberOfWorkers is checked', () => {
92 './tests/worker-files/thread/testWorker.mjs'
96 'Cannot instantiate a pool without specifying the number of workers'
101 it('Verify that a negative number of workers is checked', () => {
104 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
107 'Cannot instantiate a pool with a negative number of workers'
112 it('Verify that a non integer number of workers is checked', () => {
115 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
118 'Cannot instantiate a pool with a non safe integer number of workers'
123 it('Verify that dynamic pool sizing is checked', () => {
126 new DynamicClusterPool(
129 './tests/worker-files/cluster/testWorker.js'
133 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
138 new DynamicThreadPool(
141 './tests/worker-files/thread/testWorker.mjs'
145 'Cannot instantiate a pool with a non safe integer number of workers'
150 new DynamicClusterPool(
153 './tests/worker-files/cluster/testWorker.js'
157 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
162 new DynamicThreadPool(
165 './tests/worker-files/thread/testWorker.mjs'
169 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 new DynamicThreadPool(
177 './tests/worker-files/thread/testWorker.mjs'
181 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
186 new DynamicClusterPool(
189 './tests/worker-files/cluster/testWorker.js'
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
198 it('Verify that pool options are checked', async () => {
199 let pool = new FixedThreadPool(
201 './tests/worker-files/thread/testWorker.mjs'
203 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
204 expect(pool.opts).toStrictEqual({
207 restartWorkerOnError: true,
208 enableTasksQueue: false,
209 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
210 workerChoiceStrategyOptions: {
212 runTime: { median: false },
213 waitTime: { median: false },
214 elu: { median: false }
217 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
219 runTime: { median: false },
220 waitTime: { median: false },
221 elu: { median: false }
223 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
224 .workerChoiceStrategies) {
225 expect(workerChoiceStrategy.opts).toStrictEqual({
227 runTime: { median: false },
228 waitTime: { median: false },
229 elu: { median: false }
233 const testHandler = () => console.info('test handler executed')
234 pool = new FixedThreadPool(
236 './tests/worker-files/thread/testWorker.mjs',
238 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
239 workerChoiceStrategyOptions: {
240 runTime: { median: true },
241 weights: { 0: 300, 1: 200 }
244 restartWorkerOnError: false,
245 enableTasksQueue: true,
246 tasksQueueOptions: { concurrency: 2 },
247 messageHandler: testHandler,
248 errorHandler: testHandler,
249 onlineHandler: testHandler,
250 exitHandler: testHandler
253 expect(pool.emitter).toBeUndefined()
254 expect(pool.opts).toStrictEqual({
257 restartWorkerOnError: false,
258 enableTasksQueue: true,
261 size: Math.pow(numberOfWorkers, 2),
263 tasksStealingOnBackPressure: true
265 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
266 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 waitTime: { median: false },
270 elu: { median: false },
271 weights: { 0: 300, 1: 200 }
273 onlineHandler: testHandler,
274 messageHandler: testHandler,
275 errorHandler: testHandler,
276 exitHandler: testHandler
278 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
280 runTime: { median: true },
281 waitTime: { median: false },
282 elu: { median: false },
283 weights: { 0: 300, 1: 200 }
285 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
286 .workerChoiceStrategies) {
287 expect(workerChoiceStrategy.opts).toStrictEqual({
289 runTime: { median: true },
290 waitTime: { median: false },
291 elu: { median: false },
292 weights: { 0: 300, 1: 200 }
298 it('Verify that pool options are validated', () => {
303 './tests/worker-files/thread/testWorker.mjs',
305 workerChoiceStrategy: 'invalidStrategy'
308 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
313 './tests/worker-files/thread/testWorker.mjs',
315 workerChoiceStrategyOptions: {
316 retries: 'invalidChoiceRetries'
322 'Invalid worker choice strategy options: retries must be an integer'
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategyOptions: {
338 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
345 './tests/worker-files/thread/testWorker.mjs',
347 workerChoiceStrategyOptions: { weights: {} }
352 'Invalid worker choice strategy options: must have a weight for each worker node'
359 './tests/worker-files/thread/testWorker.mjs',
361 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
366 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
373 './tests/worker-files/thread/testWorker.mjs',
375 enableTasksQueue: true,
376 tasksQueueOptions: 'invalidTasksQueueOptions'
380 new TypeError('Invalid tasks queue options: must be a plain object')
386 './tests/worker-files/thread/testWorker.mjs',
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0 }
394 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
401 './tests/worker-files/thread/testWorker.mjs',
403 enableTasksQueue: true,
404 tasksQueueOptions: { concurrency: -1 }
409 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
416 './tests/worker-files/thread/testWorker.mjs',
418 enableTasksQueue: true,
419 tasksQueueOptions: { concurrency: 0.2 }
423 new TypeError('Invalid worker node tasks concurrency: must be an integer')
429 './tests/worker-files/thread/testWorker.mjs',
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0 }
437 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
444 './tests/worker-files/thread/testWorker.mjs',
446 enableTasksQueue: true,
447 tasksQueueOptions: { size: -1 }
452 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
459 './tests/worker-files/thread/testWorker.mjs',
461 enableTasksQueue: true,
462 tasksQueueOptions: { size: 0.2 }
466 new TypeError('Invalid worker node tasks queue size: must be an integer')
470 it('Verify that pool worker choice strategy options can be set', async () => {
471 const pool = new FixedThreadPool(
473 './tests/worker-files/thread/testWorker.mjs',
474 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
476 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
478 runTime: { median: false },
479 waitTime: { median: false },
480 elu: { median: false }
482 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
484 runTime: { median: false },
485 waitTime: { median: false },
486 elu: { median: false }
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
490 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: false },
493 waitTime: { median: false },
494 elu: { median: false }
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: true },
518 elu: { median: true }
520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: true },
523 waitTime: { median: false },
524 elu: { median: true }
526 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
528 runTime: { median: true },
529 waitTime: { median: false },
530 elu: { median: true }
532 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
533 .workerChoiceStrategies) {
534 expect(workerChoiceStrategy.opts).toStrictEqual({
536 runTime: { median: true },
537 waitTime: { median: false },
538 elu: { median: true }
542 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions({
561 runTime: { median: false },
562 elu: { median: false }
564 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
566 runTime: { median: false },
567 waitTime: { median: false },
568 elu: { median: false }
570 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
572 runTime: { median: false },
573 waitTime: { median: false },
574 elu: { median: false }
576 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
577 .workerChoiceStrategies) {
578 expect(workerChoiceStrategy.opts).toStrictEqual({
580 runTime: { median: false },
581 waitTime: { median: false },
582 elu: { median: false }
586 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
605 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
608 'Invalid worker choice strategy options: must be a plain object'
612 pool.setWorkerChoiceStrategyOptions({
613 retries: 'invalidChoiceRetries'
617 'Invalid worker choice strategy options: retries must be an integer'
620 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
622 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
625 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
627 'Invalid worker choice strategy options: must have a weight for each worker node'
631 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
634 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
640 it('Verify that pool tasks queue can be enabled/disabled', async () => {
641 const pool = new FixedThreadPool(
643 './tests/worker-files/thread/testWorker.mjs'
645 expect(pool.opts.enableTasksQueue).toBe(false)
646 expect(pool.opts.tasksQueueOptions).toBeUndefined()
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 size: Math.pow(numberOfWorkers, 2),
653 tasksStealingOnBackPressure: true
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 size: Math.pow(numberOfWorkers, 2),
661 tasksStealingOnBackPressure: true
663 pool.enableTasksQueue(false)
664 expect(pool.opts.enableTasksQueue).toBe(false)
665 expect(pool.opts.tasksQueueOptions).toBeUndefined()
669 it('Verify that pool tasks queue options can be set', async () => {
670 const pool = new FixedThreadPool(
672 './tests/worker-files/thread/testWorker.mjs',
673 { enableTasksQueue: true }
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
677 size: Math.pow(numberOfWorkers, 2),
679 tasksStealingOnBackPressure: true
681 for (const workerNode of pool.workerNodes) {
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
686 pool.setTasksQueueOptions({
690 tasksStealingOnBackPressure: false
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
696 tasksStealingOnBackPressure: false
698 for (const workerNode of pool.workerNodes) {
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
703 pool.setTasksQueueOptions({
706 tasksStealingOnBackPressure: true
708 expect(pool.opts.tasksQueueOptions).toStrictEqual({
710 size: Math.pow(numberOfWorkers, 2),
712 tasksStealingOnBackPressure: true
714 for (const workerNode of pool.workerNodes) {
715 expect(workerNode.tasksQueueBackPressureSize).toBe(
716 pool.opts.tasksQueueOptions.size
719 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
720 new TypeError('Invalid tasks queue options: must be a plain object')
722 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
724 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
727 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
729 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
732 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
733 new TypeError('Invalid worker node tasks concurrency: must be an integer')
735 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
737 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
740 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
742 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
745 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
746 new TypeError('Invalid worker node tasks queue size: must be an integer')
751 it('Verify that pool info is set', async () => {
752 let pool = new FixedThreadPool(
754 './tests/worker-files/thread/testWorker.mjs'
756 expect(pool.info).toStrictEqual({
758 type: PoolTypes.fixed,
759 worker: WorkerTypes.thread,
762 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
763 minSize: numberOfWorkers,
764 maxSize: numberOfWorkers,
765 workerNodes: numberOfWorkers,
766 idleWorkerNodes: numberOfWorkers,
773 pool = new DynamicClusterPool(
774 Math.floor(numberOfWorkers / 2),
776 './tests/worker-files/cluster/testWorker.js'
778 expect(pool.info).toStrictEqual({
780 type: PoolTypes.dynamic,
781 worker: WorkerTypes.cluster,
784 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
785 minSize: Math.floor(numberOfWorkers / 2),
786 maxSize: numberOfWorkers,
787 workerNodes: Math.floor(numberOfWorkers / 2),
788 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
797 it('Verify that pool worker tasks usage are initialized', async () => {
798 const pool = new FixedClusterPool(
800 './tests/worker-files/cluster/testWorker.js'
802 for (const workerNode of pool.workerNodes) {
803 expect(workerNode).toBeInstanceOf(WorkerNode)
804 expect(workerNode.usage).toStrictEqual({
810 sequentiallyStolen: 0,
815 history: new CircularArray()
818 history: new CircularArray()
822 history: new CircularArray()
825 history: new CircularArray()
833 it('Verify that pool worker tasks queue are initialized', async () => {
834 let pool = new FixedClusterPool(
836 './tests/worker-files/cluster/testWorker.js'
838 for (const workerNode of pool.workerNodes) {
839 expect(workerNode).toBeInstanceOf(WorkerNode)
840 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
841 expect(workerNode.tasksQueue.size).toBe(0)
842 expect(workerNode.tasksQueue.maxSize).toBe(0)
845 pool = new DynamicThreadPool(
846 Math.floor(numberOfWorkers / 2),
848 './tests/worker-files/thread/testWorker.mjs'
850 for (const workerNode of pool.workerNodes) {
851 expect(workerNode).toBeInstanceOf(WorkerNode)
852 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
853 expect(workerNode.tasksQueue.size).toBe(0)
854 expect(workerNode.tasksQueue.maxSize).toBe(0)
859 it('Verify that pool worker info are initialized', async () => {
860 let pool = new FixedClusterPool(
862 './tests/worker-files/cluster/testWorker.js'
864 for (const workerNode of pool.workerNodes) {
865 expect(workerNode).toBeInstanceOf(WorkerNode)
866 expect(workerNode.info).toStrictEqual({
867 id: expect.any(Number),
868 type: WorkerTypes.cluster,
874 pool = new DynamicThreadPool(
875 Math.floor(numberOfWorkers / 2),
877 './tests/worker-files/thread/testWorker.mjs'
879 for (const workerNode of pool.workerNodes) {
880 expect(workerNode).toBeInstanceOf(WorkerNode)
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.thread,
891 it('Verify that pool statuses are checked at start or destroy', async () => {
892 const pool = new FixedThreadPool(
894 './tests/worker-files/thread/testWorker.mjs'
896 expect(pool.info.started).toBe(true)
897 expect(pool.info.ready).toBe(true)
898 expect(() => pool.start()).toThrow(
899 new Error('Cannot start an already started pool')
902 expect(pool.info.started).toBe(false)
903 expect(pool.info.ready).toBe(false)
904 await expect(pool.destroy()).rejects.toThrow(
905 new Error('Cannot destroy an already destroyed pool')
909 it('Verify that pool can be started after initialization', async () => {
910 const pool = new FixedClusterPool(
912 './tests/worker-files/cluster/testWorker.js',
917 expect(pool.info.started).toBe(false)
918 expect(pool.info.ready).toBe(false)
919 expect(pool.readyEventEmitted).toBe(false)
920 expect(pool.workerNodes).toStrictEqual([])
921 await expect(pool.execute()).rejects.toThrow(
922 new Error('Cannot execute a task on not started pool')
925 expect(pool.info.started).toBe(true)
926 expect(pool.info.ready).toBe(true)
927 await waitPoolEvents(pool, PoolEvents.ready, 1)
928 expect(pool.readyEventEmitted).toBe(true)
929 expect(pool.workerNodes.length).toBe(numberOfWorkers)
930 for (const workerNode of pool.workerNodes) {
931 expect(workerNode).toBeInstanceOf(WorkerNode)
936 it('Verify that pool execute() arguments are checked', async () => {
937 const pool = new FixedClusterPool(
939 './tests/worker-files/cluster/testWorker.js'
941 await expect(pool.execute(undefined, 0)).rejects.toThrow(
942 new TypeError('name argument must be a string')
944 await expect(pool.execute(undefined, '')).rejects.toThrow(
945 new TypeError('name argument must not be an empty string')
947 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
948 new TypeError('transferList argument must be an array')
950 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
951 "Task function 'unknown' not found"
954 await expect(pool.execute()).rejects.toThrow(
955 new Error('Cannot execute a task on not started pool')
959 it('Verify that pool worker tasks usage are computed', async () => {
960 const pool = new FixedClusterPool(
962 './tests/worker-files/cluster/testWorker.js'
964 const promises = new Set()
965 const maxMultiplier = 2
966 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
967 promises.add(pool.execute())
969 for (const workerNode of pool.workerNodes) {
970 expect(workerNode.usage).toStrictEqual({
973 executing: maxMultiplier,
976 sequentiallyStolen: 0,
981 history: expect.any(CircularArray)
984 history: expect.any(CircularArray)
988 history: expect.any(CircularArray)
991 history: expect.any(CircularArray)
996 await Promise.all(promises)
997 for (const workerNode of pool.workerNodes) {
998 expect(workerNode.usage).toStrictEqual({
1000 executed: maxMultiplier,
1004 sequentiallyStolen: 0,
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1016 history: expect.any(CircularArray)
1019 history: expect.any(CircularArray)
1024 await pool.destroy()
1027 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1028 const pool = new DynamicThreadPool(
1029 Math.floor(numberOfWorkers / 2),
1031 './tests/worker-files/thread/testWorker.mjs'
1033 const promises = new Set()
1034 const maxMultiplier = 2
1035 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1036 promises.add(pool.execute())
1038 await Promise.all(promises)
1039 for (const workerNode of pool.workerNodes) {
1040 expect(workerNode.usage).toStrictEqual({
1042 executed: expect.any(Number),
1046 sequentiallyStolen: 0,
1051 history: expect.any(CircularArray)
1054 history: expect.any(CircularArray)
1058 history: expect.any(CircularArray)
1061 history: expect.any(CircularArray)
1065 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1066 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1067 numberOfWorkers * maxMultiplier
1069 expect(workerNode.usage.runTime.history.length).toBe(0)
1070 expect(workerNode.usage.waitTime.history.length).toBe(0)
1071 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1072 expect(workerNode.usage.elu.active.history.length).toBe(0)
1074 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1075 for (const workerNode of pool.workerNodes) {
1076 expect(workerNode.usage).toStrictEqual({
1082 sequentiallyStolen: 0,
1087 history: expect.any(CircularArray)
1090 history: expect.any(CircularArray)
1094 history: expect.any(CircularArray)
1097 history: expect.any(CircularArray)
1101 expect(workerNode.usage.runTime.history.length).toBe(0)
1102 expect(workerNode.usage.waitTime.history.length).toBe(0)
1103 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1104 expect(workerNode.usage.elu.active.history.length).toBe(0)
1106 await pool.destroy()
1109 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1110 const pool = new DynamicClusterPool(
1111 Math.floor(numberOfWorkers / 2),
1113 './tests/worker-files/cluster/testWorker.js'
1115 expect(pool.emitter.eventNames()).toStrictEqual([])
1118 pool.emitter.on(PoolEvents.ready, info => {
1122 await waitPoolEvents(pool, PoolEvents.ready, 1)
1123 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1124 expect(poolReady).toBe(1)
1125 expect(poolInfo).toStrictEqual({
1127 type: PoolTypes.dynamic,
1128 worker: WorkerTypes.cluster,
1131 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1132 minSize: expect.any(Number),
1133 maxSize: expect.any(Number),
1134 workerNodes: expect.any(Number),
1135 idleWorkerNodes: expect.any(Number),
1136 busyWorkerNodes: expect.any(Number),
1137 executedTasks: expect.any(Number),
1138 executingTasks: expect.any(Number),
1139 failedTasks: expect.any(Number)
1141 await pool.destroy()
1144 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1145 const pool = new FixedThreadPool(
1147 './tests/worker-files/thread/testWorker.mjs'
1149 expect(pool.emitter.eventNames()).toStrictEqual([])
1150 const promises = new Set()
1153 pool.emitter.on(PoolEvents.busy, info => {
1157 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1158 for (let i = 0; i < numberOfWorkers * 2; i++) {
1159 promises.add(pool.execute())
1161 await Promise.all(promises)
1162 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1163 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1164 expect(poolBusy).toBe(numberOfWorkers + 1)
1165 expect(poolInfo).toStrictEqual({
1167 type: PoolTypes.fixed,
1168 worker: WorkerTypes.thread,
1171 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1172 minSize: expect.any(Number),
1173 maxSize: expect.any(Number),
1174 workerNodes: expect.any(Number),
1175 idleWorkerNodes: expect.any(Number),
1176 busyWorkerNodes: expect.any(Number),
1177 executedTasks: expect.any(Number),
1178 executingTasks: expect.any(Number),
1179 failedTasks: expect.any(Number)
1181 await pool.destroy()
1184 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1185 const pool = new DynamicThreadPool(
1186 Math.floor(numberOfWorkers / 2),
1188 './tests/worker-files/thread/testWorker.mjs'
1190 expect(pool.emitter.eventNames()).toStrictEqual([])
1191 const promises = new Set()
1194 pool.emitter.on(PoolEvents.full, info => {
1198 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1199 for (let i = 0; i < numberOfWorkers * 2; i++) {
1200 promises.add(pool.execute())
1202 await Promise.all(promises)
1203 expect(poolFull).toBe(1)
1204 expect(poolInfo).toStrictEqual({
1206 type: PoolTypes.dynamic,
1207 worker: WorkerTypes.thread,
1210 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1211 minSize: expect.any(Number),
1212 maxSize: expect.any(Number),
1213 workerNodes: expect.any(Number),
1214 idleWorkerNodes: expect.any(Number),
1215 busyWorkerNodes: expect.any(Number),
1216 executedTasks: expect.any(Number),
1217 executingTasks: expect.any(Number),
1218 failedTasks: expect.any(Number)
1220 await pool.destroy()
1223 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1224 const pool = new FixedThreadPool(
1226 './tests/worker-files/thread/testWorker.mjs',
1228 enableTasksQueue: true
1231 stub(pool, 'hasBackPressure').returns(true)
1232 expect(pool.emitter.eventNames()).toStrictEqual([])
1233 const promises = new Set()
1234 let poolBackPressure = 0
1236 pool.emitter.on(PoolEvents.backPressure, info => {
1240 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1241 for (let i = 0; i < numberOfWorkers + 1; i++) {
1242 promises.add(pool.execute())
1244 await Promise.all(promises)
1245 expect(poolBackPressure).toBe(1)
1246 expect(poolInfo).toStrictEqual({
1248 type: PoolTypes.fixed,
1249 worker: WorkerTypes.thread,
1252 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1253 minSize: expect.any(Number),
1254 maxSize: expect.any(Number),
1255 workerNodes: expect.any(Number),
1256 idleWorkerNodes: expect.any(Number),
1257 busyWorkerNodes: expect.any(Number),
1258 executedTasks: expect.any(Number),
1259 executingTasks: expect.any(Number),
1260 maxQueuedTasks: expect.any(Number),
1261 queuedTasks: expect.any(Number),
1263 stolenTasks: expect.any(Number),
1264 failedTasks: expect.any(Number)
1266 expect(pool.hasBackPressure.called).toBe(true)
1267 await pool.destroy()
1270 it('Verify that hasTaskFunction() is working', async () => {
1271 const dynamicThreadPool = new DynamicThreadPool(
1272 Math.floor(numberOfWorkers / 2),
1274 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1276 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1277 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1278 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1281 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1282 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1283 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1284 await dynamicThreadPool.destroy()
1285 const fixedClusterPool = new FixedClusterPool(
1287 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1289 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1290 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1291 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1294 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1295 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1296 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1297 await fixedClusterPool.destroy()
1300 it('Verify that addTaskFunction() is working', async () => {
1301 const dynamicThreadPool = new DynamicThreadPool(
1302 Math.floor(numberOfWorkers / 2),
1304 './tests/worker-files/thread/testWorker.mjs'
1306 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1308 dynamicThreadPool.addTaskFunction(0, () => {})
1309 ).rejects.toThrow(new TypeError('name argument must be a string'))
1311 dynamicThreadPool.addTaskFunction('', () => {})
1313 new TypeError('name argument must not be an empty string')
1315 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1316 new TypeError('fn argument must be a function')
1318 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1319 new TypeError('fn argument must be a function')
1321 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1325 const echoTaskFunction = data => {
1329 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1330 ).resolves.toBe(true)
1331 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1332 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1335 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1340 const taskFunctionData = { test: 'test' }
1341 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1342 expect(echoResult).toStrictEqual(taskFunctionData)
1343 for (const workerNode of dynamicThreadPool.workerNodes) {
1344 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1346 executed: expect.any(Number),
1349 sequentiallyStolen: 0,
1354 history: new CircularArray()
1357 history: new CircularArray()
1361 history: new CircularArray()
1364 history: new CircularArray()
1369 await dynamicThreadPool.destroy()
1372 it('Verify that removeTaskFunction() is working', async () => {
1373 const dynamicThreadPool = new DynamicThreadPool(
1374 Math.floor(numberOfWorkers / 2),
1376 './tests/worker-files/thread/testWorker.mjs'
1378 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1379 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1383 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1384 new Error('Cannot remove a task function not handled on the pool side')
1386 const echoTaskFunction = data => {
1389 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1390 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1391 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1394 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1399 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1402 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1403 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1404 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1408 await dynamicThreadPool.destroy()
1411 it('Verify that listTaskFunctionNames() is working', async () => {
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1418 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1420 'jsonIntegerSerialization',
1424 await dynamicThreadPool.destroy()
1425 const fixedClusterPool = new FixedClusterPool(
1427 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1429 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1430 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1432 'jsonIntegerSerialization',
1436 await fixedClusterPool.destroy()
1439 it('Verify that setDefaultTaskFunction() is working', async () => {
1440 const dynamicThreadPool = new DynamicThreadPool(
1441 Math.floor(numberOfWorkers / 2),
1443 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1445 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1446 const workerId = dynamicThreadPool.workerNodes[0].info.id
1447 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1449 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1453 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1456 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1460 dynamicThreadPool.setDefaultTaskFunction('unknown')
1463 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1466 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1468 'jsonIntegerSerialization',
1473 dynamicThreadPool.setDefaultTaskFunction('factorial')
1474 ).resolves.toBe(true)
1475 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1478 'jsonIntegerSerialization',
1482 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1483 ).resolves.toBe(true)
1484 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1487 'jsonIntegerSerialization',
1490 await dynamicThreadPool.destroy()
1493 it('Verify that multiple task functions worker is working', async () => {
1494 const pool = new DynamicClusterPool(
1495 Math.floor(numberOfWorkers / 2),
1497 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1499 const data = { n: 10 }
1500 const result0 = await pool.execute(data)
1501 expect(result0).toStrictEqual({ ok: 1 })
1502 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1503 expect(result1).toStrictEqual({ ok: 1 })
1504 const result2 = await pool.execute(data, 'factorial')
1505 expect(result2).toBe(3628800)
1506 const result3 = await pool.execute(data, 'fibonacci')
1507 expect(result3).toBe(55)
1508 expect(pool.info.executingTasks).toBe(0)
1509 expect(pool.info.executedTasks).toBe(4)
1510 for (const workerNode of pool.workerNodes) {
1511 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1513 'jsonIntegerSerialization',
1517 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1518 for (const name of pool.listTaskFunctionNames()) {
1519 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1521 executed: expect.any(Number),
1525 sequentiallyStolen: 0,
1529 history: expect.any(CircularArray)
1532 history: expect.any(CircularArray)
1536 history: expect.any(CircularArray)
1539 history: expect.any(CircularArray)
1544 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1545 ).toBeGreaterThan(0)
1548 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1550 workerNode.getTaskFunctionWorkerUsage(
1551 workerNode.info.taskFunctionNames[1]
1555 await pool.destroy()
1558 it('Verify sendKillMessageToWorker()', async () => {
1559 const pool = new DynamicClusterPool(
1560 Math.floor(numberOfWorkers / 2),
1562 './tests/worker-files/cluster/testWorker.js'
1564 const workerNodeKey = 0
1566 pool.sendKillMessageToWorker(workerNodeKey)
1567 ).resolves.toBeUndefined()
1568 await pool.destroy()
1571 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1572 const pool = new DynamicClusterPool(
1573 Math.floor(numberOfWorkers / 2),
1575 './tests/worker-files/cluster/testWorker.js'
1577 const workerNodeKey = 0
1579 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1580 taskFunctionOperation: 'add',
1581 taskFunctionName: 'empty',
1582 taskFunction: (() => {}).toString()
1584 ).resolves.toBe(true)
1586 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1587 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1588 await pool.destroy()
1591 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1592 const pool = new DynamicClusterPool(
1593 Math.floor(numberOfWorkers / 2),
1595 './tests/worker-files/cluster/testWorker.js'
1598 pool.sendTaskFunctionOperationToWorkers({
1599 taskFunctionOperation: 'add',
1600 taskFunctionName: 'empty',
1601 taskFunction: (() => {}).toString()
1603 ).resolves.toBe(true)
1604 for (const workerNode of pool.workerNodes) {
1605 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1611 await pool.destroy()