1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Simulate pool creation from a non main thread/process', () => {
44 new StubPoolWithIsMain(
46 './tests/worker-files/thread/testWorker.mjs',
48 errorHandler: e => console.error(e)
53 'Cannot start a pool from a worker with the same type as the pool'
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
61 './tests/worker-files/thread/testWorker.mjs'
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
68 it('Verify that filePath is checked', () => {
69 const expectedError = new Error(
70 'Please specify a file with a worker implementation'
72 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
75 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
78 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
81 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
85 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
86 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 it('Verify that numberOfWorkers is checked', () => {
94 './tests/worker-files/thread/testWorker.mjs'
98 'Cannot instantiate a pool without specifying the number of workers'
103 it('Verify that a negative number of workers is checked', () => {
106 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 'Cannot instantiate a pool with a negative number of workers'
114 it('Verify that a non integer number of workers is checked', () => {
117 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 'Cannot instantiate a pool with a non safe integer number of workers'
125 it('Verify that dynamic pool sizing is checked', () => {
128 new DynamicClusterPool(
131 './tests/worker-files/cluster/testWorker.js'
135 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
140 new DynamicThreadPool(
143 './tests/worker-files/thread/testWorker.mjs'
147 'Cannot instantiate a pool with a non safe integer number of workers'
152 new DynamicClusterPool(
155 './tests/worker-files/cluster/testWorker.js'
159 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
164 new DynamicThreadPool(
167 './tests/worker-files/thread/testWorker.mjs'
171 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
176 new DynamicThreadPool(
179 './tests/worker-files/thread/testWorker.mjs'
183 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
188 new DynamicClusterPool(
191 './tests/worker-files/cluster/testWorker.js'
195 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 it('Verify that pool options are checked', async () => {
201 let pool = new FixedThreadPool(
203 './tests/worker-files/thread/testWorker.mjs'
205 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
206 expect(pool.opts).toStrictEqual({
209 restartWorkerOnError: true,
210 enableTasksQueue: false,
211 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
212 workerChoiceStrategyOptions: {
214 runTime: { median: false },
215 waitTime: { median: false },
216 elu: { median: false }
219 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
221 runTime: { median: false },
222 waitTime: { median: false },
223 elu: { median: false }
225 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
226 .workerChoiceStrategies) {
227 expect(workerChoiceStrategy.opts).toStrictEqual({
229 runTime: { median: false },
230 waitTime: { median: false },
231 elu: { median: false }
235 const testHandler = () => console.info('test handler executed')
236 pool = new FixedThreadPool(
238 './tests/worker-files/thread/testWorker.mjs',
240 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
241 workerChoiceStrategyOptions: {
242 runTime: { median: true },
243 weights: { 0: 300, 1: 200 }
246 restartWorkerOnError: false,
247 enableTasksQueue: true,
248 tasksQueueOptions: { concurrency: 2 },
249 messageHandler: testHandler,
250 errorHandler: testHandler,
251 onlineHandler: testHandler,
252 exitHandler: testHandler
255 expect(pool.emitter).toBeUndefined()
256 expect(pool.opts).toStrictEqual({
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
263 size: Math.pow(numberOfWorkers, 2),
265 tasksStealingOnBackPressure: true
267 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
268 workerChoiceStrategyOptions: {
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
275 onlineHandler: testHandler,
276 messageHandler: testHandler,
277 errorHandler: testHandler,
278 exitHandler: testHandler
280 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
282 runTime: { median: true },
283 waitTime: { median: false },
284 elu: { median: false },
285 weights: { 0: 300, 1: 200 }
287 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
288 .workerChoiceStrategies) {
289 expect(workerChoiceStrategy.opts).toStrictEqual({
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
300 it('Verify that pool options are validated', async () => {
305 './tests/worker-files/thread/testWorker.mjs',
307 workerChoiceStrategy: 'invalidStrategy'
311 new Error("Invalid worker choice strategy 'invalidStrategy'")
317 './tests/worker-files/thread/testWorker.mjs',
319 workerChoiceStrategyOptions: {
320 retries: 'invalidChoiceRetries'
326 'Invalid worker choice strategy options: retries must be an integer'
333 './tests/worker-files/thread/testWorker.mjs',
335 workerChoiceStrategyOptions: {
342 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
349 './tests/worker-files/thread/testWorker.mjs',
351 workerChoiceStrategyOptions: { weights: {} }
356 'Invalid worker choice strategy options: must have a weight for each worker node'
363 './tests/worker-files/thread/testWorker.mjs',
365 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
370 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
377 './tests/worker-files/thread/testWorker.mjs',
379 enableTasksQueue: true,
380 tasksQueueOptions: 'invalidTasksQueueOptions'
384 new TypeError('Invalid tasks queue options: must be a plain object')
390 './tests/worker-files/thread/testWorker.mjs',
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: 0 }
398 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
405 './tests/worker-files/thread/testWorker.mjs',
407 enableTasksQueue: true,
408 tasksQueueOptions: { concurrency: -1 }
413 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
420 './tests/worker-files/thread/testWorker.mjs',
422 enableTasksQueue: true,
423 tasksQueueOptions: { concurrency: 0.2 }
427 new TypeError('Invalid worker node tasks concurrency: must be an integer')
433 './tests/worker-files/thread/testWorker.mjs',
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: 0 }
441 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
448 './tests/worker-files/thread/testWorker.mjs',
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: -1 }
456 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
463 './tests/worker-files/thread/testWorker.mjs',
465 enableTasksQueue: true,
466 tasksQueueOptions: { size: 0.2 }
470 new TypeError('Invalid worker node tasks queue size: must be an integer')
474 it('Verify that pool worker choice strategy options can be set', async () => {
475 const pool = new FixedThreadPool(
477 './tests/worker-files/thread/testWorker.mjs',
478 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
480 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
486 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
488 runTime: { median: false },
489 waitTime: { median: false },
490 elu: { median: false }
492 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
493 .workerChoiceStrategies) {
494 expect(workerChoiceStrategy.opts).toStrictEqual({
496 runTime: { median: false },
497 waitTime: { median: false },
498 elu: { median: false }
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: true },
522 elu: { median: true }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
530 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
532 runTime: { median: true },
533 waitTime: { median: false },
534 elu: { median: true }
536 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
537 .workerChoiceStrategies) {
538 expect(workerChoiceStrategy.opts).toStrictEqual({
540 runTime: { median: true },
541 waitTime: { median: false },
542 elu: { median: true }
546 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
564 pool.setWorkerChoiceStrategyOptions({
565 runTime: { median: false },
566 elu: { median: false }
568 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
574 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
576 runTime: { median: false },
577 waitTime: { median: false },
578 elu: { median: false }
580 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
581 .workerChoiceStrategies) {
582 expect(workerChoiceStrategy.opts).toStrictEqual({
584 runTime: { median: false },
585 waitTime: { median: false },
586 elu: { median: false }
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
609 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
612 'Invalid worker choice strategy options: must be a plain object'
616 pool.setWorkerChoiceStrategyOptions({
617 retries: 'invalidChoiceRetries'
621 'Invalid worker choice strategy options: retries must be an integer'
625 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
628 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
632 pool.setWorkerChoiceStrategyOptions({ weights: {} })
635 'Invalid worker choice strategy options: must have a weight for each worker node'
639 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
642 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
648 it('Verify that pool tasks queue can be enabled/disabled', async () => {
649 const pool = new FixedThreadPool(
651 './tests/worker-files/thread/testWorker.mjs'
653 expect(pool.opts.enableTasksQueue).toBe(false)
654 expect(pool.opts.tasksQueueOptions).toBeUndefined()
655 for (const workerNode of pool.workerNodes) {
656 expect(workerNode.onEmptyQueue).toBeUndefined()
657 expect(workerNode.onBackPressure).toBeUndefined()
659 pool.enableTasksQueue(true)
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
663 size: Math.pow(numberOfWorkers, 2),
665 tasksStealingOnBackPressure: true
667 for (const workerNode of pool.workerNodes) {
668 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
669 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
671 pool.enableTasksQueue(true, { concurrency: 2 })
672 expect(pool.opts.enableTasksQueue).toBe(true)
673 expect(pool.opts.tasksQueueOptions).toStrictEqual({
675 size: Math.pow(numberOfWorkers, 2),
677 tasksStealingOnBackPressure: true
679 for (const workerNode of pool.workerNodes) {
680 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
681 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
683 pool.enableTasksQueue(false)
684 expect(pool.opts.enableTasksQueue).toBe(false)
685 expect(pool.opts.tasksQueueOptions).toBeUndefined()
686 for (const workerNode of pool.workerNodes) {
687 expect(workerNode.onEmptyQueue).toBeUndefined()
688 expect(workerNode.onBackPressure).toBeUndefined()
693 it('Verify that pool tasks queue options can be set', async () => {
694 const pool = new FixedThreadPool(
696 './tests/worker-files/thread/testWorker.mjs',
697 { enableTasksQueue: true }
699 expect(pool.opts.tasksQueueOptions).toStrictEqual({
701 size: Math.pow(numberOfWorkers, 2),
703 tasksStealingOnBackPressure: true
705 for (const workerNode of pool.workerNodes) {
706 expect(workerNode.tasksQueueBackPressureSize).toBe(
707 pool.opts.tasksQueueOptions.size
709 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
710 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
712 pool.setTasksQueueOptions({
716 tasksStealingOnBackPressure: false
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
722 tasksStealingOnBackPressure: false
724 for (const workerNode of pool.workerNodes) {
725 expect(workerNode.tasksQueueBackPressureSize).toBe(
726 pool.opts.tasksQueueOptions.size
728 expect(workerNode.onEmptyQueue).toBeUndefined()
729 expect(workerNode.onBackPressure).toBeUndefined()
731 pool.setTasksQueueOptions({
734 tasksStealingOnBackPressure: true
736 expect(pool.opts.tasksQueueOptions).toStrictEqual({
738 size: Math.pow(numberOfWorkers, 2),
740 tasksStealingOnBackPressure: true
742 for (const workerNode of pool.workerNodes) {
743 expect(workerNode.tasksQueueBackPressureSize).toBe(
744 pool.opts.tasksQueueOptions.size
746 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
747 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
750 pool.setTasksQueueOptions('invalidTasksQueueOptions')
752 new TypeError('Invalid tasks queue options: must be a plain object')
754 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
756 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
759 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
761 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
764 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
765 new TypeError('Invalid worker node tasks concurrency: must be an integer')
767 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
769 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
772 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
774 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
777 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
778 new TypeError('Invalid worker node tasks queue size: must be an integer')
783 it('Verify that pool info is set', async () => {
784 let pool = new FixedThreadPool(
786 './tests/worker-files/thread/testWorker.mjs'
788 expect(pool.info).toStrictEqual({
790 type: PoolTypes.fixed,
791 worker: WorkerTypes.thread,
794 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
795 minSize: numberOfWorkers,
796 maxSize: numberOfWorkers,
797 workerNodes: numberOfWorkers,
798 idleWorkerNodes: numberOfWorkers,
805 pool = new DynamicClusterPool(
806 Math.floor(numberOfWorkers / 2),
808 './tests/worker-files/cluster/testWorker.js'
810 expect(pool.info).toStrictEqual({
812 type: PoolTypes.dynamic,
813 worker: WorkerTypes.cluster,
816 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
817 minSize: Math.floor(numberOfWorkers / 2),
818 maxSize: numberOfWorkers,
819 workerNodes: Math.floor(numberOfWorkers / 2),
820 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
829 it('Verify that pool worker tasks usage are initialized', async () => {
830 const pool = new FixedClusterPool(
832 './tests/worker-files/cluster/testWorker.js'
834 for (const workerNode of pool.workerNodes) {
835 expect(workerNode).toBeInstanceOf(WorkerNode)
836 expect(workerNode.usage).toStrictEqual({
846 history: new CircularArray()
849 history: new CircularArray()
853 history: new CircularArray()
856 history: new CircularArray()
864 it('Verify that pool worker tasks queue are initialized', async () => {
865 let pool = new FixedClusterPool(
867 './tests/worker-files/cluster/testWorker.js'
869 for (const workerNode of pool.workerNodes) {
870 expect(workerNode).toBeInstanceOf(WorkerNode)
871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
872 expect(workerNode.tasksQueue.size).toBe(0)
873 expect(workerNode.tasksQueue.maxSize).toBe(0)
876 pool = new DynamicThreadPool(
877 Math.floor(numberOfWorkers / 2),
879 './tests/worker-files/thread/testWorker.mjs'
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
883 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
884 expect(workerNode.tasksQueue.size).toBe(0)
885 expect(workerNode.tasksQueue.maxSize).toBe(0)
890 it('Verify that pool worker info are initialized', async () => {
891 let pool = new FixedClusterPool(
893 './tests/worker-files/cluster/testWorker.js'
895 for (const workerNode of pool.workerNodes) {
896 expect(workerNode).toBeInstanceOf(WorkerNode)
897 expect(workerNode.info).toStrictEqual({
898 id: expect.any(Number),
899 type: WorkerTypes.cluster,
905 pool = new DynamicThreadPool(
906 Math.floor(numberOfWorkers / 2),
908 './tests/worker-files/thread/testWorker.mjs'
910 for (const workerNode of pool.workerNodes) {
911 expect(workerNode).toBeInstanceOf(WorkerNode)
912 expect(workerNode.info).toStrictEqual({
913 id: expect.any(Number),
914 type: WorkerTypes.thread,
922 it('Verify that pool can be started after initialization', async () => {
923 const pool = new FixedClusterPool(
925 './tests/worker-files/cluster/testWorker.js',
930 expect(pool.info.started).toBe(false)
931 expect(pool.info.ready).toBe(false)
932 expect(pool.workerNodes).toStrictEqual([])
933 await expect(pool.execute()).rejects.toThrowError(
934 new Error('Cannot execute a task on not started pool')
937 expect(pool.info.started).toBe(true)
938 expect(pool.info.ready).toBe(true)
939 expect(pool.workerNodes.length).toBe(numberOfWorkers)
940 for (const workerNode of pool.workerNodes) {
941 expect(workerNode).toBeInstanceOf(WorkerNode)
946 it('Verify that pool execute() arguments are checked', async () => {
947 const pool = new FixedClusterPool(
949 './tests/worker-files/cluster/testWorker.js'
951 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
952 new TypeError('name argument must be a string')
954 await expect(pool.execute(undefined, '')).rejects.toThrowError(
955 new TypeError('name argument must not be an empty string')
957 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
958 new TypeError('transferList argument must be an array')
960 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
961 "Task function 'unknown' not found"
964 await expect(pool.execute()).rejects.toThrowError(
965 new Error('Cannot execute a task on not started pool')
969 it('Verify that pool worker tasks usage are computed', async () => {
970 const pool = new FixedClusterPool(
972 './tests/worker-files/cluster/testWorker.js'
974 const promises = new Set()
975 const maxMultiplier = 2
976 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
977 promises.add(pool.execute())
979 for (const workerNode of pool.workerNodes) {
980 expect(workerNode.usage).toStrictEqual({
983 executing: maxMultiplier,
990 history: expect.any(CircularArray)
993 history: expect.any(CircularArray)
997 history: expect.any(CircularArray)
1000 history: expect.any(CircularArray)
1005 await Promise.all(promises)
1006 for (const workerNode of pool.workerNodes) {
1007 expect(workerNode.usage).toStrictEqual({
1009 executed: maxMultiplier,
1017 history: expect.any(CircularArray)
1020 history: expect.any(CircularArray)
1024 history: expect.any(CircularArray)
1027 history: expect.any(CircularArray)
1032 await pool.destroy()
1035 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1036 const pool = new DynamicThreadPool(
1037 Math.floor(numberOfWorkers / 2),
1039 './tests/worker-files/thread/testWorker.mjs'
1041 const promises = new Set()
1042 const maxMultiplier = 2
1043 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1044 promises.add(pool.execute())
1046 await Promise.all(promises)
1047 for (const workerNode of pool.workerNodes) {
1048 expect(workerNode.usage).toStrictEqual({
1050 executed: expect.any(Number),
1058 history: expect.any(CircularArray)
1061 history: expect.any(CircularArray)
1065 history: expect.any(CircularArray)
1068 history: expect.any(CircularArray)
1072 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 numberOfWorkers * maxMultiplier
1076 expect(workerNode.usage.runTime.history.length).toBe(0)
1077 expect(workerNode.usage.waitTime.history.length).toBe(0)
1078 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1079 expect(workerNode.usage.elu.active.history.length).toBe(0)
1081 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1082 for (const workerNode of pool.workerNodes) {
1083 expect(workerNode.usage).toStrictEqual({
1093 history: expect.any(CircularArray)
1096 history: expect.any(CircularArray)
1100 history: expect.any(CircularArray)
1103 history: expect.any(CircularArray)
1107 expect(workerNode.usage.runTime.history.length).toBe(0)
1108 expect(workerNode.usage.waitTime.history.length).toBe(0)
1109 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1110 expect(workerNode.usage.elu.active.history.length).toBe(0)
1112 await pool.destroy()
1115 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1116 const pool = new DynamicClusterPool(
1117 Math.floor(numberOfWorkers / 2),
1119 './tests/worker-files/cluster/testWorker.js'
1121 expect(pool.emitter.eventNames()).toStrictEqual([])
1124 pool.emitter.on(PoolEvents.ready, info => {
1128 await waitPoolEvents(pool, PoolEvents.ready, 1)
1129 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1130 expect(poolReady).toBe(1)
1131 expect(poolInfo).toStrictEqual({
1133 type: PoolTypes.dynamic,
1134 worker: WorkerTypes.cluster,
1137 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1138 minSize: expect.any(Number),
1139 maxSize: expect.any(Number),
1140 workerNodes: expect.any(Number),
1141 idleWorkerNodes: expect.any(Number),
1142 busyWorkerNodes: expect.any(Number),
1143 executedTasks: expect.any(Number),
1144 executingTasks: expect.any(Number),
1145 failedTasks: expect.any(Number)
1147 await pool.destroy()
1150 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1151 const pool = new FixedThreadPool(
1153 './tests/worker-files/thread/testWorker.mjs'
1155 expect(pool.emitter.eventNames()).toStrictEqual([])
1156 const promises = new Set()
1159 pool.emitter.on(PoolEvents.busy, info => {
1163 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1164 for (let i = 0; i < numberOfWorkers * 2; i++) {
1165 promises.add(pool.execute())
1167 await Promise.all(promises)
1168 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1169 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1170 expect(poolBusy).toBe(numberOfWorkers + 1)
1171 expect(poolInfo).toStrictEqual({
1173 type: PoolTypes.fixed,
1174 worker: WorkerTypes.thread,
1177 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1178 minSize: expect.any(Number),
1179 maxSize: expect.any(Number),
1180 workerNodes: expect.any(Number),
1181 idleWorkerNodes: expect.any(Number),
1182 busyWorkerNodes: expect.any(Number),
1183 executedTasks: expect.any(Number),
1184 executingTasks: expect.any(Number),
1185 failedTasks: expect.any(Number)
1187 await pool.destroy()
1190 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1191 const pool = new DynamicThreadPool(
1192 Math.floor(numberOfWorkers / 2),
1194 './tests/worker-files/thread/testWorker.mjs'
1196 expect(pool.emitter.eventNames()).toStrictEqual([])
1197 const promises = new Set()
1200 pool.emitter.on(PoolEvents.full, info => {
1204 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1205 for (let i = 0; i < numberOfWorkers * 2; i++) {
1206 promises.add(pool.execute())
1208 await Promise.all(promises)
1209 expect(poolFull).toBe(1)
1210 expect(poolInfo).toStrictEqual({
1212 type: PoolTypes.dynamic,
1213 worker: WorkerTypes.thread,
1216 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1217 minSize: expect.any(Number),
1218 maxSize: expect.any(Number),
1219 workerNodes: expect.any(Number),
1220 idleWorkerNodes: expect.any(Number),
1221 busyWorkerNodes: expect.any(Number),
1222 executedTasks: expect.any(Number),
1223 executingTasks: expect.any(Number),
1224 failedTasks: expect.any(Number)
1226 await pool.destroy()
1229 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1230 const pool = new FixedThreadPool(
1232 './tests/worker-files/thread/testWorker.mjs',
1234 enableTasksQueue: true
1237 stub(pool, 'hasBackPressure').returns(true)
1238 expect(pool.emitter.eventNames()).toStrictEqual([])
1239 const promises = new Set()
1240 let poolBackPressure = 0
1242 pool.emitter.on(PoolEvents.backPressure, info => {
1246 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1247 for (let i = 0; i < numberOfWorkers + 1; i++) {
1248 promises.add(pool.execute())
1250 await Promise.all(promises)
1251 expect(poolBackPressure).toBe(1)
1252 expect(poolInfo).toStrictEqual({
1254 type: PoolTypes.fixed,
1255 worker: WorkerTypes.thread,
1258 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1259 minSize: expect.any(Number),
1260 maxSize: expect.any(Number),
1261 workerNodes: expect.any(Number),
1262 idleWorkerNodes: expect.any(Number),
1263 busyWorkerNodes: expect.any(Number),
1264 executedTasks: expect.any(Number),
1265 executingTasks: expect.any(Number),
1266 maxQueuedTasks: expect.any(Number),
1267 queuedTasks: expect.any(Number),
1269 stolenTasks: expect.any(Number),
1270 failedTasks: expect.any(Number)
1272 expect(pool.hasBackPressure.called).toBe(true)
1273 await pool.destroy()
1276 it('Verify that hasTaskFunction() is working', async () => {
1277 const dynamicThreadPool = new DynamicThreadPool(
1278 Math.floor(numberOfWorkers / 2),
1280 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1282 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1283 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1284 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1287 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1288 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1289 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1290 await dynamicThreadPool.destroy()
1291 const fixedClusterPool = new FixedClusterPool(
1293 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1295 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1296 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1297 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1300 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1301 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1302 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1303 await fixedClusterPool.destroy()
1306 it('Verify that addTaskFunction() is working', async () => {
1307 const dynamicThreadPool = new DynamicThreadPool(
1308 Math.floor(numberOfWorkers / 2),
1310 './tests/worker-files/thread/testWorker.mjs'
1312 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1314 dynamicThreadPool.addTaskFunction(0, () => {})
1315 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1317 dynamicThreadPool.addTaskFunction('', () => {})
1318 ).rejects.toThrowError(
1319 new TypeError('name argument must not be an empty string')
1322 dynamicThreadPool.addTaskFunction('test', 0)
1323 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1325 dynamicThreadPool.addTaskFunction('test', '')
1326 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1327 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1331 const echoTaskFunction = data => {
1335 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1336 ).resolves.toBe(true)
1337 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1338 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1341 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1346 const taskFunctionData = { test: 'test' }
1347 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1348 expect(echoResult).toStrictEqual(taskFunctionData)
1349 for (const workerNode of dynamicThreadPool.workerNodes) {
1350 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1352 executed: expect.any(Number),
1359 history: new CircularArray()
1362 history: new CircularArray()
1366 history: new CircularArray()
1369 history: new CircularArray()
1374 await dynamicThreadPool.destroy()
1377 it('Verify that removeTaskFunction() is working', async () => {
1378 const dynamicThreadPool = new DynamicThreadPool(
1379 Math.floor(numberOfWorkers / 2),
1381 './tests/worker-files/thread/testWorker.mjs'
1383 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1384 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1389 dynamicThreadPool.removeTaskFunction('test')
1390 ).rejects.toThrowError(
1391 new Error('Cannot remove a task function not handled on the pool side')
1393 const echoTaskFunction = data => {
1396 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1397 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1398 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1401 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1406 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1409 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1410 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1411 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1415 await dynamicThreadPool.destroy()
1418 it('Verify that listTaskFunctionNames() is working', async () => {
1419 const dynamicThreadPool = new DynamicThreadPool(
1420 Math.floor(numberOfWorkers / 2),
1422 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1424 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1425 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1427 'jsonIntegerSerialization',
1431 await dynamicThreadPool.destroy()
1432 const fixedClusterPool = new FixedClusterPool(
1434 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1436 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1437 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1439 'jsonIntegerSerialization',
1443 await fixedClusterPool.destroy()
1446 it('Verify that setDefaultTaskFunction() is working', async () => {
1447 const dynamicThreadPool = new DynamicThreadPool(
1448 Math.floor(numberOfWorkers / 2),
1450 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1452 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1454 dynamicThreadPool.setDefaultTaskFunction(0)
1455 ).rejects.toThrowError(
1457 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1461 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1462 ).rejects.toThrowError(
1464 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1468 dynamicThreadPool.setDefaultTaskFunction('unknown')
1469 ).rejects.toThrowError(
1471 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1474 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1476 'jsonIntegerSerialization',
1481 dynamicThreadPool.setDefaultTaskFunction('factorial')
1482 ).resolves.toBe(true)
1483 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1486 'jsonIntegerSerialization',
1490 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1491 ).resolves.toBe(true)
1492 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1495 'jsonIntegerSerialization',
1500 it('Verify that multiple task functions worker is working', async () => {
1501 const pool = new DynamicClusterPool(
1502 Math.floor(numberOfWorkers / 2),
1504 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1506 const data = { n: 10 }
1507 const result0 = await pool.execute(data)
1508 expect(result0).toStrictEqual({ ok: 1 })
1509 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1510 expect(result1).toStrictEqual({ ok: 1 })
1511 const result2 = await pool.execute(data, 'factorial')
1512 expect(result2).toBe(3628800)
1513 const result3 = await pool.execute(data, 'fibonacci')
1514 expect(result3).toBe(55)
1515 expect(pool.info.executingTasks).toBe(0)
1516 expect(pool.info.executedTasks).toBe(4)
1517 for (const workerNode of pool.workerNodes) {
1518 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1520 'jsonIntegerSerialization',
1524 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1525 for (const name of pool.listTaskFunctionNames()) {
1526 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1528 executed: expect.any(Number),
1535 history: expect.any(CircularArray)
1538 history: expect.any(CircularArray)
1542 history: expect.any(CircularArray)
1545 history: expect.any(CircularArray)
1550 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1551 ).toBeGreaterThan(0)
1554 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1556 workerNode.getTaskFunctionWorkerUsage(
1557 workerNode.info.taskFunctionNames[1]
1561 await pool.destroy()
1564 it('Verify sendKillMessageToWorker()', async () => {
1565 const pool = new DynamicClusterPool(
1566 Math.floor(numberOfWorkers / 2),
1568 './tests/worker-files/cluster/testWorker.js'
1570 const workerNodeKey = 0
1572 pool.sendKillMessageToWorker(workerNodeKey)
1573 ).resolves.toBeUndefined()
1574 await pool.destroy()
1577 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1578 const pool = new DynamicClusterPool(
1579 Math.floor(numberOfWorkers / 2),
1581 './tests/worker-files/cluster/testWorker.js'
1583 const workerNodeKey = 0
1585 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1586 taskFunctionOperation: 'add',
1587 taskFunctionName: 'empty',
1588 taskFunction: (() => {}).toString()
1590 ).resolves.toBe(true)
1592 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1593 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1594 await pool.destroy()
1597 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1598 const pool = new DynamicClusterPool(
1599 Math.floor(numberOfWorkers / 2),
1601 './tests/worker-files/cluster/testWorker.js'
1604 pool.sendTaskFunctionOperationToWorkers({
1605 taskFunctionOperation: 'add',
1606 taskFunctionName: 'empty',
1607 taskFunction: (() => {}).toString()
1609 ).resolves.toBe(true)
1610 for (const workerNode of pool.workerNodes) {
1611 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1617 await pool.destroy()