1 import { EventEmitterAsyncResource } from 'node:events'
2 import { readFileSync } from 'node:fs'
3 import { expect } from 'expect'
4 import { restore, stub } from 'sinon'
12 WorkerChoiceStrategies,
14 } from '../../lib/index.js'
15 import { CircularArray } from '../../lib/circular-array.js'
16 import { Deque } from '../../lib/deque.js'
17 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
18 import { waitPoolEvents } from '../test-utils.js'
19 import { WorkerNode } from '../../lib/pools/worker-node.js'
21 describe('Abstract pool test suite', () => {
22 const version = JSON.parse(readFileSync('./package.json', 'utf8')).version
23 const numberOfWorkers = 2
24 class StubPoolWithIsMain extends FixedThreadPool {
34 it('Simulate pool creation from a non main thread/process', () => {
37 new StubPoolWithIsMain(
39 './tests/worker-files/thread/testWorker.mjs',
41 errorHandler: e => console.error(e)
46 'Cannot start a pool from a worker with the same type as the pool'
51 it('Verify that pool statuses properties are set', async () => {
52 const pool = new FixedThreadPool(
54 './tests/worker-files/thread/testWorker.mjs'
56 expect(pool.starting).toBe(false)
57 expect(pool.started).toBe(true)
61 it('Verify that filePath is checked', () => {
62 const expectedError = new Error(
63 'Please specify a file with a worker implementation'
65 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
68 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
71 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
74 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
78 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
79 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
82 it('Verify that numberOfWorkers is checked', () => {
87 './tests/worker-files/thread/testWorker.mjs'
91 'Cannot instantiate a pool without specifying the number of workers'
96 it('Verify that a negative number of workers is checked', () => {
99 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
102 'Cannot instantiate a pool with a negative number of workers'
107 it('Verify that a non integer number of workers is checked', () => {
110 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
113 'Cannot instantiate a pool with a non safe integer number of workers'
118 it('Verify that dynamic pool sizing is checked', () => {
121 new DynamicClusterPool(
124 './tests/worker-files/cluster/testWorker.js'
128 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
133 new DynamicThreadPool(
136 './tests/worker-files/thread/testWorker.mjs'
140 'Cannot instantiate a pool with a non safe integer number of workers'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
157 new DynamicThreadPool(
160 './tests/worker-files/thread/testWorker.mjs'
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
169 new DynamicThreadPool(
172 './tests/worker-files/thread/testWorker.mjs'
176 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
181 new DynamicClusterPool(
184 './tests/worker-files/cluster/testWorker.js'
188 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
193 it('Verify that pool options are checked', async () => {
194 let pool = new FixedThreadPool(
196 './tests/worker-files/thread/testWorker.mjs'
198 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
199 expect(pool.opts).toStrictEqual({
202 restartWorkerOnError: true,
203 enableTasksQueue: false,
204 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
205 workerChoiceStrategyOptions: {
207 runTime: { median: false },
208 waitTime: { median: false },
209 elu: { median: false }
212 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
214 runTime: { median: false },
215 waitTime: { median: false },
216 elu: { median: false }
218 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
219 .workerChoiceStrategies) {
220 expect(workerChoiceStrategy.opts).toStrictEqual({
222 runTime: { median: false },
223 waitTime: { median: false },
224 elu: { median: false }
228 const testHandler = () => console.info('test handler executed')
229 pool = new FixedThreadPool(
231 './tests/worker-files/thread/testWorker.mjs',
233 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
234 workerChoiceStrategyOptions: {
235 runTime: { median: true },
236 weights: { 0: 300, 1: 200 }
239 restartWorkerOnError: false,
240 enableTasksQueue: true,
241 tasksQueueOptions: { concurrency: 2 },
242 messageHandler: testHandler,
243 errorHandler: testHandler,
244 onlineHandler: testHandler,
245 exitHandler: testHandler
248 expect(pool.emitter).toBeUndefined()
249 expect(pool.opts).toStrictEqual({
252 restartWorkerOnError: false,
253 enableTasksQueue: true,
256 size: Math.pow(numberOfWorkers, 2),
258 tasksStealingOnBackPressure: true
260 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
261 workerChoiceStrategyOptions: {
263 runTime: { median: true },
264 waitTime: { median: false },
265 elu: { median: false },
266 weights: { 0: 300, 1: 200 }
268 onlineHandler: testHandler,
269 messageHandler: testHandler,
270 errorHandler: testHandler,
271 exitHandler: testHandler
273 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
275 runTime: { median: true },
276 waitTime: { median: false },
277 elu: { median: false },
278 weights: { 0: 300, 1: 200 }
280 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
281 .workerChoiceStrategies) {
282 expect(workerChoiceStrategy.opts).toStrictEqual({
284 runTime: { median: true },
285 waitTime: { median: false },
286 elu: { median: false },
287 weights: { 0: 300, 1: 200 }
293 it('Verify that pool options are validated', async () => {
298 './tests/worker-files/thread/testWorker.mjs',
300 workerChoiceStrategy: 'invalidStrategy'
304 new Error("Invalid worker choice strategy 'invalidStrategy'")
310 './tests/worker-files/thread/testWorker.mjs',
312 workerChoiceStrategyOptions: {
313 retries: 'invalidChoiceRetries'
319 'Invalid worker choice strategy options: retries must be an integer'
326 './tests/worker-files/thread/testWorker.mjs',
328 workerChoiceStrategyOptions: {
335 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
342 './tests/worker-files/thread/testWorker.mjs',
344 workerChoiceStrategyOptions: { weights: {} }
349 'Invalid worker choice strategy options: must have a weight for each worker node'
356 './tests/worker-files/thread/testWorker.mjs',
358 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
363 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
370 './tests/worker-files/thread/testWorker.mjs',
372 enableTasksQueue: true,
373 tasksQueueOptions: 'invalidTasksQueueOptions'
377 new TypeError('Invalid tasks queue options: must be a plain object')
383 './tests/worker-files/thread/testWorker.mjs',
385 enableTasksQueue: true,
386 tasksQueueOptions: { concurrency: 0 }
391 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
398 './tests/worker-files/thread/testWorker.mjs',
400 enableTasksQueue: true,
401 tasksQueueOptions: { concurrency: -1 }
406 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
413 './tests/worker-files/thread/testWorker.mjs',
415 enableTasksQueue: true,
416 tasksQueueOptions: { concurrency: 0.2 }
420 new TypeError('Invalid worker node tasks concurrency: must be an integer')
426 './tests/worker-files/thread/testWorker.mjs',
428 enableTasksQueue: true,
429 tasksQueueOptions: { size: 0 }
434 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
441 './tests/worker-files/thread/testWorker.mjs',
443 enableTasksQueue: true,
444 tasksQueueOptions: { size: -1 }
449 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
456 './tests/worker-files/thread/testWorker.mjs',
458 enableTasksQueue: true,
459 tasksQueueOptions: { size: 0.2 }
463 new TypeError('Invalid worker node tasks queue size: must be an integer')
467 it('Verify that pool worker choice strategy options can be set', async () => {
468 const pool = new FixedThreadPool(
470 './tests/worker-files/thread/testWorker.mjs',
471 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
473 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
475 runTime: { median: false },
476 waitTime: { median: false },
477 elu: { median: false }
479 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
481 runTime: { median: false },
482 waitTime: { median: false },
483 elu: { median: false }
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual({
489 runTime: { median: false },
490 waitTime: { median: false },
491 elu: { median: false }
495 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
513 pool.setWorkerChoiceStrategyOptions({
514 runTime: { median: true },
515 elu: { median: true }
517 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
519 runTime: { median: true },
520 waitTime: { median: false },
521 elu: { median: true }
523 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
525 runTime: { median: true },
526 waitTime: { median: false },
527 elu: { median: true }
529 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
530 .workerChoiceStrategies) {
531 expect(workerChoiceStrategy.opts).toStrictEqual({
533 runTime: { median: true },
534 waitTime: { median: false },
535 elu: { median: true }
539 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
557 pool.setWorkerChoiceStrategyOptions({
558 runTime: { median: false },
559 elu: { median: false }
561 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
563 runTime: { median: false },
564 waitTime: { median: false },
565 elu: { median: false }
567 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
569 runTime: { median: false },
570 waitTime: { median: false },
571 elu: { median: false }
573 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
574 .workerChoiceStrategies) {
575 expect(workerChoiceStrategy.opts).toStrictEqual({
577 runTime: { median: false },
578 waitTime: { median: false },
579 elu: { median: false }
583 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
602 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
605 'Invalid worker choice strategy options: must be a plain object'
609 pool.setWorkerChoiceStrategyOptions({
610 retries: 'invalidChoiceRetries'
614 'Invalid worker choice strategy options: retries must be an integer'
618 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
621 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
625 pool.setWorkerChoiceStrategyOptions({ weights: {} })
628 'Invalid worker choice strategy options: must have a weight for each worker node'
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
644 './tests/worker-files/thread/testWorker.mjs'
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 for (const workerNode of pool.workerNodes) {
649 expect(workerNode.onEmptyQueue).toBeUndefined()
650 expect(workerNode.onBackPressure).toBeUndefined()
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true
660 for (const workerNode of pool.workerNodes) {
661 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
662 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
664 pool.enableTasksQueue(true, { concurrency: 2 })
665 expect(pool.opts.enableTasksQueue).toBe(true)
666 expect(pool.opts.tasksQueueOptions).toStrictEqual({
668 size: Math.pow(numberOfWorkers, 2),
670 tasksStealingOnBackPressure: true
672 for (const workerNode of pool.workerNodes) {
673 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
674 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
676 pool.enableTasksQueue(false)
677 expect(pool.opts.enableTasksQueue).toBe(false)
678 expect(pool.opts.tasksQueueOptions).toBeUndefined()
679 for (const workerNode of pool.workerNodes) {
680 expect(workerNode.onEmptyQueue).toBeUndefined()
681 expect(workerNode.onBackPressure).toBeUndefined()
686 it('Verify that pool tasks queue options can be set', async () => {
687 const pool = new FixedThreadPool(
689 './tests/worker-files/thread/testWorker.mjs',
690 { enableTasksQueue: true }
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
694 size: Math.pow(numberOfWorkers, 2),
696 tasksStealingOnBackPressure: true
698 for (const workerNode of pool.workerNodes) {
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
702 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
703 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
705 pool.setTasksQueueOptions({
709 tasksStealingOnBackPressure: false
711 expect(pool.opts.tasksQueueOptions).toStrictEqual({
715 tasksStealingOnBackPressure: false
717 for (const workerNode of pool.workerNodes) {
718 expect(workerNode.tasksQueueBackPressureSize).toBe(
719 pool.opts.tasksQueueOptions.size
721 expect(workerNode.onEmptyQueue).toBeUndefined()
722 expect(workerNode.onBackPressure).toBeUndefined()
724 pool.setTasksQueueOptions({
727 tasksStealingOnBackPressure: true
729 expect(pool.opts.tasksQueueOptions).toStrictEqual({
731 size: Math.pow(numberOfWorkers, 2),
733 tasksStealingOnBackPressure: true
735 for (const workerNode of pool.workerNodes) {
736 expect(workerNode.tasksQueueBackPressureSize).toBe(
737 pool.opts.tasksQueueOptions.size
739 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
740 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
743 pool.setTasksQueueOptions('invalidTasksQueueOptions')
745 new TypeError('Invalid tasks queue options: must be a plain object')
747 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
749 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
752 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
754 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
757 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
758 new TypeError('Invalid worker node tasks concurrency: must be an integer')
760 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
762 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
765 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
767 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
770 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
771 new TypeError('Invalid worker node tasks queue size: must be an integer')
776 it('Verify that pool info is set', async () => {
777 let pool = new FixedThreadPool(
779 './tests/worker-files/thread/testWorker.mjs'
781 expect(pool.info).toStrictEqual({
783 type: PoolTypes.fixed,
784 worker: WorkerTypes.thread,
787 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
788 minSize: numberOfWorkers,
789 maxSize: numberOfWorkers,
790 workerNodes: numberOfWorkers,
791 idleWorkerNodes: numberOfWorkers,
798 pool = new DynamicClusterPool(
799 Math.floor(numberOfWorkers / 2),
801 './tests/worker-files/cluster/testWorker.js'
803 expect(pool.info).toStrictEqual({
805 type: PoolTypes.dynamic,
806 worker: WorkerTypes.cluster,
809 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
810 minSize: Math.floor(numberOfWorkers / 2),
811 maxSize: numberOfWorkers,
812 workerNodes: Math.floor(numberOfWorkers / 2),
813 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
822 it('Verify that pool worker tasks usage are initialized', async () => {
823 const pool = new FixedClusterPool(
825 './tests/worker-files/cluster/testWorker.js'
827 for (const workerNode of pool.workerNodes) {
828 expect(workerNode).toBeInstanceOf(WorkerNode)
829 expect(workerNode.usage).toStrictEqual({
839 history: new CircularArray()
842 history: new CircularArray()
846 history: new CircularArray()
849 history: new CircularArray()
857 it('Verify that pool worker tasks queue are initialized', async () => {
858 let pool = new FixedClusterPool(
860 './tests/worker-files/cluster/testWorker.js'
862 for (const workerNode of pool.workerNodes) {
863 expect(workerNode).toBeInstanceOf(WorkerNode)
864 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
865 expect(workerNode.tasksQueue.size).toBe(0)
866 expect(workerNode.tasksQueue.maxSize).toBe(0)
869 pool = new DynamicThreadPool(
870 Math.floor(numberOfWorkers / 2),
872 './tests/worker-files/thread/testWorker.mjs'
874 for (const workerNode of pool.workerNodes) {
875 expect(workerNode).toBeInstanceOf(WorkerNode)
876 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
877 expect(workerNode.tasksQueue.size).toBe(0)
878 expect(workerNode.tasksQueue.maxSize).toBe(0)
883 it('Verify that pool worker info are initialized', async () => {
884 let pool = new FixedClusterPool(
886 './tests/worker-files/cluster/testWorker.js'
888 for (const workerNode of pool.workerNodes) {
889 expect(workerNode).toBeInstanceOf(WorkerNode)
890 expect(workerNode.info).toStrictEqual({
891 id: expect.any(Number),
892 type: WorkerTypes.cluster,
898 pool = new DynamicThreadPool(
899 Math.floor(numberOfWorkers / 2),
901 './tests/worker-files/thread/testWorker.mjs'
903 for (const workerNode of pool.workerNodes) {
904 expect(workerNode).toBeInstanceOf(WorkerNode)
905 expect(workerNode.info).toStrictEqual({
906 id: expect.any(Number),
907 type: WorkerTypes.thread,
915 it('Verify that pool can be started after initialization', async () => {
916 const pool = new FixedClusterPool(
918 './tests/worker-files/cluster/testWorker.js',
923 expect(pool.info.started).toBe(false)
924 expect(pool.info.ready).toBe(false)
925 expect(pool.workerNodes).toStrictEqual([])
926 await expect(pool.execute()).rejects.toThrowError(
927 new Error('Cannot execute a task on not started pool')
930 expect(pool.info.started).toBe(true)
931 expect(pool.info.ready).toBe(true)
932 expect(pool.workerNodes.length).toBe(numberOfWorkers)
933 for (const workerNode of pool.workerNodes) {
934 expect(workerNode).toBeInstanceOf(WorkerNode)
939 it('Verify that pool execute() arguments are checked', async () => {
940 const pool = new FixedClusterPool(
942 './tests/worker-files/cluster/testWorker.js'
944 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
945 new TypeError('name argument must be a string')
947 await expect(pool.execute(undefined, '')).rejects.toThrowError(
948 new TypeError('name argument must not be an empty string')
950 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
951 new TypeError('transferList argument must be an array')
953 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
954 "Task function 'unknown' not found"
957 await expect(pool.execute()).rejects.toThrowError(
958 new Error('Cannot execute a task on not started pool')
962 it('Verify that pool worker tasks usage are computed', async () => {
963 const pool = new FixedClusterPool(
965 './tests/worker-files/cluster/testWorker.js'
967 const promises = new Set()
968 const maxMultiplier = 2
969 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
970 promises.add(pool.execute())
972 for (const workerNode of pool.workerNodes) {
973 expect(workerNode.usage).toStrictEqual({
976 executing: maxMultiplier,
983 history: expect.any(CircularArray)
986 history: expect.any(CircularArray)
990 history: expect.any(CircularArray)
993 history: expect.any(CircularArray)
998 await Promise.all(promises)
999 for (const workerNode of pool.workerNodes) {
1000 expect(workerNode.usage).toStrictEqual({
1002 executed: maxMultiplier,
1010 history: expect.any(CircularArray)
1013 history: expect.any(CircularArray)
1017 history: expect.any(CircularArray)
1020 history: expect.any(CircularArray)
1025 await pool.destroy()
1028 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1029 const pool = new DynamicThreadPool(
1030 Math.floor(numberOfWorkers / 2),
1032 './tests/worker-files/thread/testWorker.mjs'
1034 const promises = new Set()
1035 const maxMultiplier = 2
1036 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1037 promises.add(pool.execute())
1039 await Promise.all(promises)
1040 for (const workerNode of pool.workerNodes) {
1041 expect(workerNode.usage).toStrictEqual({
1043 executed: expect.any(Number),
1051 history: expect.any(CircularArray)
1054 history: expect.any(CircularArray)
1058 history: expect.any(CircularArray)
1061 history: expect.any(CircularArray)
1065 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1066 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1067 numberOfWorkers * maxMultiplier
1069 expect(workerNode.usage.runTime.history.length).toBe(0)
1070 expect(workerNode.usage.waitTime.history.length).toBe(0)
1071 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1072 expect(workerNode.usage.elu.active.history.length).toBe(0)
1074 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1075 for (const workerNode of pool.workerNodes) {
1076 expect(workerNode.usage).toStrictEqual({
1086 history: expect.any(CircularArray)
1089 history: expect.any(CircularArray)
1093 history: expect.any(CircularArray)
1096 history: expect.any(CircularArray)
1100 expect(workerNode.usage.runTime.history.length).toBe(0)
1101 expect(workerNode.usage.waitTime.history.length).toBe(0)
1102 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1103 expect(workerNode.usage.elu.active.history.length).toBe(0)
1105 await pool.destroy()
1108 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1109 const pool = new DynamicClusterPool(
1110 Math.floor(numberOfWorkers / 2),
1112 './tests/worker-files/cluster/testWorker.js'
1114 expect(pool.emitter.eventNames()).toStrictEqual([])
1117 pool.emitter.on(PoolEvents.ready, info => {
1121 await waitPoolEvents(pool, PoolEvents.ready, 1)
1122 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1123 expect(poolReady).toBe(1)
1124 expect(poolInfo).toStrictEqual({
1126 type: PoolTypes.dynamic,
1127 worker: WorkerTypes.cluster,
1130 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1131 minSize: expect.any(Number),
1132 maxSize: expect.any(Number),
1133 workerNodes: expect.any(Number),
1134 idleWorkerNodes: expect.any(Number),
1135 busyWorkerNodes: expect.any(Number),
1136 executedTasks: expect.any(Number),
1137 executingTasks: expect.any(Number),
1138 failedTasks: expect.any(Number)
1140 await pool.destroy()
1143 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1144 const pool = new FixedThreadPool(
1146 './tests/worker-files/thread/testWorker.mjs'
1148 expect(pool.emitter.eventNames()).toStrictEqual([])
1149 const promises = new Set()
1152 pool.emitter.on(PoolEvents.busy, info => {
1156 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1157 for (let i = 0; i < numberOfWorkers * 2; i++) {
1158 promises.add(pool.execute())
1160 await Promise.all(promises)
1161 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1162 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1163 expect(poolBusy).toBe(numberOfWorkers + 1)
1164 expect(poolInfo).toStrictEqual({
1166 type: PoolTypes.fixed,
1167 worker: WorkerTypes.thread,
1170 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1171 minSize: expect.any(Number),
1172 maxSize: expect.any(Number),
1173 workerNodes: expect.any(Number),
1174 idleWorkerNodes: expect.any(Number),
1175 busyWorkerNodes: expect.any(Number),
1176 executedTasks: expect.any(Number),
1177 executingTasks: expect.any(Number),
1178 failedTasks: expect.any(Number)
1180 await pool.destroy()
1183 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1184 const pool = new DynamicThreadPool(
1185 Math.floor(numberOfWorkers / 2),
1187 './tests/worker-files/thread/testWorker.mjs'
1189 expect(pool.emitter.eventNames()).toStrictEqual([])
1190 const promises = new Set()
1193 pool.emitter.on(PoolEvents.full, info => {
1197 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1198 for (let i = 0; i < numberOfWorkers * 2; i++) {
1199 promises.add(pool.execute())
1201 await Promise.all(promises)
1202 expect(poolFull).toBe(1)
1203 expect(poolInfo).toStrictEqual({
1205 type: PoolTypes.dynamic,
1206 worker: WorkerTypes.thread,
1209 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1210 minSize: expect.any(Number),
1211 maxSize: expect.any(Number),
1212 workerNodes: expect.any(Number),
1213 idleWorkerNodes: expect.any(Number),
1214 busyWorkerNodes: expect.any(Number),
1215 executedTasks: expect.any(Number),
1216 executingTasks: expect.any(Number),
1217 failedTasks: expect.any(Number)
1219 await pool.destroy()
1222 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1223 const pool = new FixedThreadPool(
1225 './tests/worker-files/thread/testWorker.mjs',
1227 enableTasksQueue: true
1230 stub(pool, 'hasBackPressure').returns(true)
1231 expect(pool.emitter.eventNames()).toStrictEqual([])
1232 const promises = new Set()
1233 let poolBackPressure = 0
1235 pool.emitter.on(PoolEvents.backPressure, info => {
1239 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1240 for (let i = 0; i < numberOfWorkers + 1; i++) {
1241 promises.add(pool.execute())
1243 await Promise.all(promises)
1244 expect(poolBackPressure).toBe(1)
1245 expect(poolInfo).toStrictEqual({
1247 type: PoolTypes.fixed,
1248 worker: WorkerTypes.thread,
1251 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1252 minSize: expect.any(Number),
1253 maxSize: expect.any(Number),
1254 workerNodes: expect.any(Number),
1255 idleWorkerNodes: expect.any(Number),
1256 busyWorkerNodes: expect.any(Number),
1257 executedTasks: expect.any(Number),
1258 executingTasks: expect.any(Number),
1259 maxQueuedTasks: expect.any(Number),
1260 queuedTasks: expect.any(Number),
1262 stolenTasks: expect.any(Number),
1263 failedTasks: expect.any(Number)
1265 expect(pool.hasBackPressure.called).toBe(true)
1266 await pool.destroy()
1269 it('Verify that hasTaskFunction() is working', async () => {
1270 const dynamicThreadPool = new DynamicThreadPool(
1271 Math.floor(numberOfWorkers / 2),
1273 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1275 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1276 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1280 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1281 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1282 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1283 await dynamicThreadPool.destroy()
1284 const fixedClusterPool = new FixedClusterPool(
1286 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1288 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1289 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1293 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1294 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1295 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1296 await fixedClusterPool.destroy()
1299 it('Verify that addTaskFunction() is working', async () => {
1300 const dynamicThreadPool = new DynamicThreadPool(
1301 Math.floor(numberOfWorkers / 2),
1303 './tests/worker-files/thread/testWorker.mjs'
1305 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1307 dynamicThreadPool.addTaskFunction(0, () => {})
1308 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1310 dynamicThreadPool.addTaskFunction('', () => {})
1311 ).rejects.toThrowError(
1312 new TypeError('name argument must not be an empty string')
1315 dynamicThreadPool.addTaskFunction('test', 0)
1316 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1318 dynamicThreadPool.addTaskFunction('test', '')
1319 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1320 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1324 const echoTaskFunction = data => {
1328 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1329 ).resolves.toBe(true)
1330 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1331 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1334 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1339 const taskFunctionData = { test: 'test' }
1340 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1341 expect(echoResult).toStrictEqual(taskFunctionData)
1342 for (const workerNode of dynamicThreadPool.workerNodes) {
1343 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1345 executed: expect.any(Number),
1352 history: new CircularArray()
1355 history: new CircularArray()
1359 history: new CircularArray()
1362 history: new CircularArray()
1367 await dynamicThreadPool.destroy()
1370 it('Verify that removeTaskFunction() is working', async () => {
1371 const dynamicThreadPool = new DynamicThreadPool(
1372 Math.floor(numberOfWorkers / 2),
1374 './tests/worker-files/thread/testWorker.mjs'
1376 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1377 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1382 dynamicThreadPool.removeTaskFunction('test')
1383 ).rejects.toThrowError(
1384 new Error('Cannot remove a task function not handled on the pool side')
1386 const echoTaskFunction = data => {
1389 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1390 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1391 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1394 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1399 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1402 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1403 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1404 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1408 await dynamicThreadPool.destroy()
1411 it('Verify that listTaskFunctionNames() is working', async () => {
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1418 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1420 'jsonIntegerSerialization',
1424 await dynamicThreadPool.destroy()
1425 const fixedClusterPool = new FixedClusterPool(
1427 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1429 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1430 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1432 'jsonIntegerSerialization',
1436 await fixedClusterPool.destroy()
1439 it('Verify that setDefaultTaskFunction() is working', async () => {
1440 const dynamicThreadPool = new DynamicThreadPool(
1441 Math.floor(numberOfWorkers / 2),
1443 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1445 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1447 dynamicThreadPool.setDefaultTaskFunction(0)
1448 ).rejects.toThrowError(
1450 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1454 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1455 ).rejects.toThrowError(
1457 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1461 dynamicThreadPool.setDefaultTaskFunction('unknown')
1462 ).rejects.toThrowError(
1464 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 'jsonIntegerSerialization',
1474 dynamicThreadPool.setDefaultTaskFunction('factorial')
1475 ).resolves.toBe(true)
1476 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1479 'jsonIntegerSerialization',
1483 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1484 ).resolves.toBe(true)
1485 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1488 'jsonIntegerSerialization',
1493 it('Verify that multiple task functions worker is working', async () => {
1494 const pool = new DynamicClusterPool(
1495 Math.floor(numberOfWorkers / 2),
1497 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1499 const data = { n: 10 }
1500 const result0 = await pool.execute(data)
1501 expect(result0).toStrictEqual({ ok: 1 })
1502 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1503 expect(result1).toStrictEqual({ ok: 1 })
1504 const result2 = await pool.execute(data, 'factorial')
1505 expect(result2).toBe(3628800)
1506 const result3 = await pool.execute(data, 'fibonacci')
1507 expect(result3).toBe(55)
1508 expect(pool.info.executingTasks).toBe(0)
1509 expect(pool.info.executedTasks).toBe(4)
1510 for (const workerNode of pool.workerNodes) {
1511 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1513 'jsonIntegerSerialization',
1517 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1518 for (const name of pool.listTaskFunctionNames()) {
1519 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1521 executed: expect.any(Number),
1528 history: expect.any(CircularArray)
1531 history: expect.any(CircularArray)
1535 history: expect.any(CircularArray)
1538 history: expect.any(CircularArray)
1543 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1544 ).toBeGreaterThan(0)
1547 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1549 workerNode.getTaskFunctionWorkerUsage(
1550 workerNode.info.taskFunctionNames[1]
1554 await pool.destroy()
1557 it('Verify sendKillMessageToWorker()', async () => {
1558 const pool = new DynamicClusterPool(
1559 Math.floor(numberOfWorkers / 2),
1561 './tests/worker-files/cluster/testWorker.js'
1563 const workerNodeKey = 0
1565 pool.sendKillMessageToWorker(workerNodeKey)
1566 ).resolves.toBeUndefined()
1567 await pool.destroy()
1570 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1571 const pool = new DynamicClusterPool(
1572 Math.floor(numberOfWorkers / 2),
1574 './tests/worker-files/cluster/testWorker.js'
1576 const workerNodeKey = 0
1578 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1579 taskFunctionOperation: 'add',
1580 taskFunctionName: 'empty',
1581 taskFunction: (() => {}).toString()
1583 ).resolves.toBe(true)
1585 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1586 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1587 await pool.destroy()
1590 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1591 const pool = new DynamicClusterPool(
1592 Math.floor(numberOfWorkers / 2),
1594 './tests/worker-files/cluster/testWorker.js'
1597 pool.sendTaskFunctionOperationToWorkers({
1598 taskFunctionOperation: 'add',
1599 taskFunctionName: 'empty',
1600 taskFunction: (() => {}).toString()
1602 ).resolves.toBe(true)
1603 for (const workerNode of pool.workerNodes) {
1604 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1610 await pool.destroy()