1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { expect } from 'expect'
6 import { restore, stub } from 'sinon'
14 WorkerChoiceStrategies,
16 } from '../../lib/index.js'
17 import { CircularArray } from '../../lib/circular-array.js'
18 import { Deque } from '../../lib/deque.js'
19 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20 import { waitPoolEvents } from '../test-utils.js'
21 import { WorkerNode } from '../../lib/pools/worker-node.js'
23 describe('Abstract pool test suite', () => {
24 const version = JSON.parse(
26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 const numberOfWorkers = 2
31 class StubPoolWithIsMain extends FixedThreadPool {
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
44 './tests/worker-files/thread/testWorker.mjs'
46 expect(pool).toBeInstanceOf(FixedThreadPool)
50 it('Verify that pool cannot be created from a non main thread/process', () => {
53 new StubPoolWithIsMain(
55 './tests/worker-files/thread/testWorker.mjs',
57 errorHandler: e => console.error(e)
62 'Cannot start a pool from a worker with the same type as the pool'
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
70 './tests/worker-files/thread/testWorker.mjs'
72 expect(pool.starting).toBe(false)
73 expect(pool.started).toBe(true)
77 it('Verify that filePath is checked', () => {
78 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
79 new Error("Cannot find the worker file 'undefined'")
82 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
83 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
86 it('Verify that numberOfWorkers is checked', () => {
91 './tests/worker-files/thread/testWorker.mjs'
95 'Cannot instantiate a pool without specifying the number of workers'
100 it('Verify that a negative number of workers is checked', () => {
103 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
106 'Cannot instantiate a pool with a negative number of workers'
111 it('Verify that a non integer number of workers is checked', () => {
114 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
117 'Cannot instantiate a pool with a non safe integer number of workers'
122 it('Verify that dynamic pool sizing is checked', () => {
125 new DynamicClusterPool(
128 './tests/worker-files/cluster/testWorker.js'
132 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
137 new DynamicThreadPool(
140 './tests/worker-files/thread/testWorker.mjs'
144 'Cannot instantiate a pool with a non safe integer number of workers'
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.js'
156 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
173 new DynamicThreadPool(
176 './tests/worker-files/thread/testWorker.mjs'
180 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
185 new DynamicClusterPool(
188 './tests/worker-files/cluster/testWorker.js'
192 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
197 it('Verify that pool options are checked', async () => {
198 let pool = new FixedThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
202 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
203 expect(pool.opts).toStrictEqual({
206 restartWorkerOnError: true,
207 enableTasksQueue: false,
208 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
209 workerChoiceStrategyOptions: {
211 runTime: { median: false },
212 waitTime: { median: false },
213 elu: { median: false }
216 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
218 runTime: { median: false },
219 waitTime: { median: false },
220 elu: { median: false }
222 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
223 .workerChoiceStrategies) {
224 expect(workerChoiceStrategy.opts).toStrictEqual({
226 runTime: { median: false },
227 waitTime: { median: false },
228 elu: { median: false }
232 const testHandler = () => console.info('test handler executed')
233 pool = new FixedThreadPool(
235 './tests/worker-files/thread/testWorker.mjs',
237 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
238 workerChoiceStrategyOptions: {
239 runTime: { median: true },
240 weights: { 0: 300, 1: 200 }
243 restartWorkerOnError: false,
244 enableTasksQueue: true,
245 tasksQueueOptions: { concurrency: 2 },
246 messageHandler: testHandler,
247 errorHandler: testHandler,
248 onlineHandler: testHandler,
249 exitHandler: testHandler
252 expect(pool.emitter).toBeUndefined()
253 expect(pool.opts).toStrictEqual({
256 restartWorkerOnError: false,
257 enableTasksQueue: true,
260 size: Math.pow(numberOfWorkers, 2),
262 tasksStealingOnBackPressure: true
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
267 runTime: { median: true },
268 waitTime: { median: false },
269 elu: { median: false },
270 weights: { 0: 300, 1: 200 }
272 onlineHandler: testHandler,
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 exitHandler: testHandler
277 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
284 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
285 .workerChoiceStrategies) {
286 expect(workerChoiceStrategy.opts).toStrictEqual({
288 runTime: { median: true },
289 waitTime: { median: false },
290 elu: { median: false },
291 weights: { 0: 300, 1: 200 }
297 it('Verify that pool options are validated', () => {
302 './tests/worker-files/thread/testWorker.mjs',
304 workerChoiceStrategy: 'invalidStrategy'
307 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
312 './tests/worker-files/thread/testWorker.mjs',
314 workerChoiceStrategyOptions: {
315 retries: 'invalidChoiceRetries'
321 'Invalid worker choice strategy options: retries must be an integer'
328 './tests/worker-files/thread/testWorker.mjs',
330 workerChoiceStrategyOptions: {
337 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
344 './tests/worker-files/thread/testWorker.mjs',
346 workerChoiceStrategyOptions: { weights: {} }
351 'Invalid worker choice strategy options: must have a weight for each worker node'
358 './tests/worker-files/thread/testWorker.mjs',
360 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
365 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
372 './tests/worker-files/thread/testWorker.mjs',
374 enableTasksQueue: true,
375 tasksQueueOptions: 'invalidTasksQueueOptions'
379 new TypeError('Invalid tasks queue options: must be a plain object')
385 './tests/worker-files/thread/testWorker.mjs',
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: 0 }
393 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
400 './tests/worker-files/thread/testWorker.mjs',
402 enableTasksQueue: true,
403 tasksQueueOptions: { concurrency: -1 }
408 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
415 './tests/worker-files/thread/testWorker.mjs',
417 enableTasksQueue: true,
418 tasksQueueOptions: { concurrency: 0.2 }
422 new TypeError('Invalid worker node tasks concurrency: must be an integer')
428 './tests/worker-files/thread/testWorker.mjs',
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: 0 }
436 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
443 './tests/worker-files/thread/testWorker.mjs',
445 enableTasksQueue: true,
446 tasksQueueOptions: { size: -1 }
451 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
458 './tests/worker-files/thread/testWorker.mjs',
460 enableTasksQueue: true,
461 tasksQueueOptions: { size: 0.2 }
465 new TypeError('Invalid worker node tasks queue size: must be an integer')
469 it('Verify that pool worker choice strategy options can be set', async () => {
470 const pool = new FixedThreadPool(
472 './tests/worker-files/thread/testWorker.mjs',
473 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
475 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false }
481 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
483 runTime: { median: false },
484 waitTime: { median: false },
485 elu: { median: false }
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual({
491 runTime: { median: false },
492 waitTime: { median: false },
493 elu: { median: false }
497 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
515 pool.setWorkerChoiceStrategyOptions({
516 runTime: { median: true },
517 elu: { median: true }
519 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
521 runTime: { median: true },
522 waitTime: { median: false },
523 elu: { median: true }
525 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
527 runTime: { median: true },
528 waitTime: { median: false },
529 elu: { median: true }
531 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
532 .workerChoiceStrategies) {
533 expect(workerChoiceStrategy.opts).toStrictEqual({
535 runTime: { median: true },
536 waitTime: { median: false },
537 elu: { median: true }
541 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
559 pool.setWorkerChoiceStrategyOptions({
560 runTime: { median: false },
561 elu: { median: false }
563 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
565 runTime: { median: false },
566 waitTime: { median: false },
567 elu: { median: false }
569 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
571 runTime: { median: false },
572 waitTime: { median: false },
573 elu: { median: false }
575 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
576 .workerChoiceStrategies) {
577 expect(workerChoiceStrategy.opts).toStrictEqual({
579 runTime: { median: false },
580 waitTime: { median: false },
581 elu: { median: false }
585 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
604 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
607 'Invalid worker choice strategy options: must be a plain object'
611 pool.setWorkerChoiceStrategyOptions({
612 retries: 'invalidChoiceRetries'
616 'Invalid worker choice strategy options: retries must be an integer'
619 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
621 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
624 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
626 'Invalid worker choice strategy options: must have a weight for each worker node'
630 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
633 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
639 it('Verify that pool tasks queue can be enabled/disabled', async () => {
640 const pool = new FixedThreadPool(
642 './tests/worker-files/thread/testWorker.mjs'
644 expect(pool.opts.enableTasksQueue).toBe(false)
645 expect(pool.opts.tasksQueueOptions).toBeUndefined()
646 pool.enableTasksQueue(true)
647 expect(pool.opts.enableTasksQueue).toBe(true)
648 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 size: Math.pow(numberOfWorkers, 2),
652 tasksStealingOnBackPressure: true
654 pool.enableTasksQueue(true, { concurrency: 2 })
655 expect(pool.opts.enableTasksQueue).toBe(true)
656 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 size: Math.pow(numberOfWorkers, 2),
660 tasksStealingOnBackPressure: true
662 pool.enableTasksQueue(false)
663 expect(pool.opts.enableTasksQueue).toBe(false)
664 expect(pool.opts.tasksQueueOptions).toBeUndefined()
668 it('Verify that pool tasks queue options can be set', async () => {
669 const pool = new FixedThreadPool(
671 './tests/worker-files/thread/testWorker.mjs',
672 { enableTasksQueue: true }
674 expect(pool.opts.tasksQueueOptions).toStrictEqual({
676 size: Math.pow(numberOfWorkers, 2),
678 tasksStealingOnBackPressure: true
680 for (const workerNode of pool.workerNodes) {
681 expect(workerNode.tasksQueueBackPressureSize).toBe(
682 pool.opts.tasksQueueOptions.size
685 pool.setTasksQueueOptions({
689 tasksStealingOnBackPressure: false
691 expect(pool.opts.tasksQueueOptions).toStrictEqual({
695 tasksStealingOnBackPressure: false
697 for (const workerNode of pool.workerNodes) {
698 expect(workerNode.tasksQueueBackPressureSize).toBe(
699 pool.opts.tasksQueueOptions.size
702 pool.setTasksQueueOptions({
705 tasksStealingOnBackPressure: true
707 expect(pool.opts.tasksQueueOptions).toStrictEqual({
709 size: Math.pow(numberOfWorkers, 2),
711 tasksStealingOnBackPressure: true
713 for (const workerNode of pool.workerNodes) {
714 expect(workerNode.tasksQueueBackPressureSize).toBe(
715 pool.opts.tasksQueueOptions.size
718 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
719 new TypeError('Invalid tasks queue options: must be a plain object')
721 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
723 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
726 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
728 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
731 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
732 new TypeError('Invalid worker node tasks concurrency: must be an integer')
734 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
736 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
739 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
741 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
744 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
745 new TypeError('Invalid worker node tasks queue size: must be an integer')
750 it('Verify that pool info is set', async () => {
751 let pool = new FixedThreadPool(
753 './tests/worker-files/thread/testWorker.mjs'
755 expect(pool.info).toStrictEqual({
757 type: PoolTypes.fixed,
758 worker: WorkerTypes.thread,
761 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
762 minSize: numberOfWorkers,
763 maxSize: numberOfWorkers,
764 workerNodes: numberOfWorkers,
765 idleWorkerNodes: numberOfWorkers,
772 pool = new DynamicClusterPool(
773 Math.floor(numberOfWorkers / 2),
775 './tests/worker-files/cluster/testWorker.js'
777 expect(pool.info).toStrictEqual({
779 type: PoolTypes.dynamic,
780 worker: WorkerTypes.cluster,
783 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
784 minSize: Math.floor(numberOfWorkers / 2),
785 maxSize: numberOfWorkers,
786 workerNodes: Math.floor(numberOfWorkers / 2),
787 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
796 it('Verify that pool worker tasks usage are initialized', async () => {
797 const pool = new FixedClusterPool(
799 './tests/worker-files/cluster/testWorker.js'
801 for (const workerNode of pool.workerNodes) {
802 expect(workerNode).toBeInstanceOf(WorkerNode)
803 expect(workerNode.usage).toStrictEqual({
813 history: new CircularArray()
816 history: new CircularArray()
820 history: new CircularArray()
823 history: new CircularArray()
831 it('Verify that pool worker tasks queue are initialized', async () => {
832 let pool = new FixedClusterPool(
834 './tests/worker-files/cluster/testWorker.js'
836 for (const workerNode of pool.workerNodes) {
837 expect(workerNode).toBeInstanceOf(WorkerNode)
838 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
839 expect(workerNode.tasksQueue.size).toBe(0)
840 expect(workerNode.tasksQueue.maxSize).toBe(0)
843 pool = new DynamicThreadPool(
844 Math.floor(numberOfWorkers / 2),
846 './tests/worker-files/thread/testWorker.mjs'
848 for (const workerNode of pool.workerNodes) {
849 expect(workerNode).toBeInstanceOf(WorkerNode)
850 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
851 expect(workerNode.tasksQueue.size).toBe(0)
852 expect(workerNode.tasksQueue.maxSize).toBe(0)
857 it('Verify that pool worker info are initialized', async () => {
858 let pool = new FixedClusterPool(
860 './tests/worker-files/cluster/testWorker.js'
862 for (const workerNode of pool.workerNodes) {
863 expect(workerNode).toBeInstanceOf(WorkerNode)
864 expect(workerNode.info).toStrictEqual({
865 id: expect.any(Number),
866 type: WorkerTypes.cluster,
872 pool = new DynamicThreadPool(
873 Math.floor(numberOfWorkers / 2),
875 './tests/worker-files/thread/testWorker.mjs'
877 for (const workerNode of pool.workerNodes) {
878 expect(workerNode).toBeInstanceOf(WorkerNode)
879 expect(workerNode.info).toStrictEqual({
880 id: expect.any(Number),
881 type: WorkerTypes.thread,
889 it('Verify that pool can be started after initialization', async () => {
890 const pool = new FixedClusterPool(
892 './tests/worker-files/cluster/testWorker.js',
897 expect(pool.info.started).toBe(false)
898 expect(pool.info.ready).toBe(false)
899 expect(pool.workerNodes).toStrictEqual([])
900 await expect(pool.execute()).rejects.toThrow(
901 new Error('Cannot execute a task on not started pool')
904 expect(pool.info.started).toBe(true)
905 expect(pool.info.ready).toBe(true)
906 expect(pool.workerNodes.length).toBe(numberOfWorkers)
907 for (const workerNode of pool.workerNodes) {
908 expect(workerNode).toBeInstanceOf(WorkerNode)
913 it('Verify that pool execute() arguments are checked', async () => {
914 const pool = new FixedClusterPool(
916 './tests/worker-files/cluster/testWorker.js'
918 await expect(pool.execute(undefined, 0)).rejects.toThrow(
919 new TypeError('name argument must be a string')
921 await expect(pool.execute(undefined, '')).rejects.toThrow(
922 new TypeError('name argument must not be an empty string')
924 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
925 new TypeError('transferList argument must be an array')
927 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
928 "Task function 'unknown' not found"
931 await expect(pool.execute()).rejects.toThrow(
932 new Error('Cannot execute a task on not started pool')
936 it('Verify that pool worker tasks usage are computed', async () => {
937 const pool = new FixedClusterPool(
939 './tests/worker-files/cluster/testWorker.js'
941 const promises = new Set()
942 const maxMultiplier = 2
943 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
944 promises.add(pool.execute())
946 for (const workerNode of pool.workerNodes) {
947 expect(workerNode.usage).toStrictEqual({
950 executing: maxMultiplier,
957 history: expect.any(CircularArray)
960 history: expect.any(CircularArray)
964 history: expect.any(CircularArray)
967 history: expect.any(CircularArray)
972 await Promise.all(promises)
973 for (const workerNode of pool.workerNodes) {
974 expect(workerNode.usage).toStrictEqual({
976 executed: maxMultiplier,
984 history: expect.any(CircularArray)
987 history: expect.any(CircularArray)
991 history: expect.any(CircularArray)
994 history: expect.any(CircularArray)
1002 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1003 const pool = new DynamicThreadPool(
1004 Math.floor(numberOfWorkers / 2),
1006 './tests/worker-files/thread/testWorker.mjs'
1008 const promises = new Set()
1009 const maxMultiplier = 2
1010 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1011 promises.add(pool.execute())
1013 await Promise.all(promises)
1014 for (const workerNode of pool.workerNodes) {
1015 expect(workerNode.usage).toStrictEqual({
1017 executed: expect.any(Number),
1025 history: expect.any(CircularArray)
1028 history: expect.any(CircularArray)
1032 history: expect.any(CircularArray)
1035 history: expect.any(CircularArray)
1039 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1040 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1041 numberOfWorkers * maxMultiplier
1043 expect(workerNode.usage.runTime.history.length).toBe(0)
1044 expect(workerNode.usage.waitTime.history.length).toBe(0)
1045 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1046 expect(workerNode.usage.elu.active.history.length).toBe(0)
1048 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1049 for (const workerNode of pool.workerNodes) {
1050 expect(workerNode.usage).toStrictEqual({
1060 history: expect.any(CircularArray)
1063 history: expect.any(CircularArray)
1067 history: expect.any(CircularArray)
1070 history: expect.any(CircularArray)
1074 expect(workerNode.usage.runTime.history.length).toBe(0)
1075 expect(workerNode.usage.waitTime.history.length).toBe(0)
1076 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1077 expect(workerNode.usage.elu.active.history.length).toBe(0)
1079 await pool.destroy()
1082 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1083 const pool = new DynamicClusterPool(
1084 Math.floor(numberOfWorkers / 2),
1086 './tests/worker-files/cluster/testWorker.js'
1088 expect(pool.emitter.eventNames()).toStrictEqual([])
1091 pool.emitter.on(PoolEvents.ready, info => {
1095 await waitPoolEvents(pool, PoolEvents.ready, 1)
1096 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1097 expect(poolReady).toBe(1)
1098 expect(poolInfo).toStrictEqual({
1100 type: PoolTypes.dynamic,
1101 worker: WorkerTypes.cluster,
1104 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1105 minSize: expect.any(Number),
1106 maxSize: expect.any(Number),
1107 workerNodes: expect.any(Number),
1108 idleWorkerNodes: expect.any(Number),
1109 busyWorkerNodes: expect.any(Number),
1110 executedTasks: expect.any(Number),
1111 executingTasks: expect.any(Number),
1112 failedTasks: expect.any(Number)
1114 await pool.destroy()
1117 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1118 const pool = new FixedThreadPool(
1120 './tests/worker-files/thread/testWorker.mjs'
1122 expect(pool.emitter.eventNames()).toStrictEqual([])
1123 const promises = new Set()
1126 pool.emitter.on(PoolEvents.busy, info => {
1130 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1131 for (let i = 0; i < numberOfWorkers * 2; i++) {
1132 promises.add(pool.execute())
1134 await Promise.all(promises)
1135 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1136 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1137 expect(poolBusy).toBe(numberOfWorkers + 1)
1138 expect(poolInfo).toStrictEqual({
1140 type: PoolTypes.fixed,
1141 worker: WorkerTypes.thread,
1144 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1145 minSize: expect.any(Number),
1146 maxSize: expect.any(Number),
1147 workerNodes: expect.any(Number),
1148 idleWorkerNodes: expect.any(Number),
1149 busyWorkerNodes: expect.any(Number),
1150 executedTasks: expect.any(Number),
1151 executingTasks: expect.any(Number),
1152 failedTasks: expect.any(Number)
1154 await pool.destroy()
1157 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1158 const pool = new DynamicThreadPool(
1159 Math.floor(numberOfWorkers / 2),
1161 './tests/worker-files/thread/testWorker.mjs'
1163 expect(pool.emitter.eventNames()).toStrictEqual([])
1164 const promises = new Set()
1167 pool.emitter.on(PoolEvents.full, info => {
1171 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1172 for (let i = 0; i < numberOfWorkers * 2; i++) {
1173 promises.add(pool.execute())
1175 await Promise.all(promises)
1176 expect(poolFull).toBe(1)
1177 expect(poolInfo).toStrictEqual({
1179 type: PoolTypes.dynamic,
1180 worker: WorkerTypes.thread,
1183 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1184 minSize: expect.any(Number),
1185 maxSize: expect.any(Number),
1186 workerNodes: expect.any(Number),
1187 idleWorkerNodes: expect.any(Number),
1188 busyWorkerNodes: expect.any(Number),
1189 executedTasks: expect.any(Number),
1190 executingTasks: expect.any(Number),
1191 failedTasks: expect.any(Number)
1193 await pool.destroy()
1196 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1197 const pool = new FixedThreadPool(
1199 './tests/worker-files/thread/testWorker.mjs',
1201 enableTasksQueue: true
1204 stub(pool, 'hasBackPressure').returns(true)
1205 expect(pool.emitter.eventNames()).toStrictEqual([])
1206 const promises = new Set()
1207 let poolBackPressure = 0
1209 pool.emitter.on(PoolEvents.backPressure, info => {
1213 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1214 for (let i = 0; i < numberOfWorkers + 1; i++) {
1215 promises.add(pool.execute())
1217 await Promise.all(promises)
1218 expect(poolBackPressure).toBe(1)
1219 expect(poolInfo).toStrictEqual({
1221 type: PoolTypes.fixed,
1222 worker: WorkerTypes.thread,
1225 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1226 minSize: expect.any(Number),
1227 maxSize: expect.any(Number),
1228 workerNodes: expect.any(Number),
1229 idleWorkerNodes: expect.any(Number),
1230 busyWorkerNodes: expect.any(Number),
1231 executedTasks: expect.any(Number),
1232 executingTasks: expect.any(Number),
1233 maxQueuedTasks: expect.any(Number),
1234 queuedTasks: expect.any(Number),
1236 stolenTasks: expect.any(Number),
1237 failedTasks: expect.any(Number)
1239 expect(pool.hasBackPressure.called).toBe(true)
1240 await pool.destroy()
1243 it('Verify that hasTaskFunction() is working', async () => {
1244 const dynamicThreadPool = new DynamicThreadPool(
1245 Math.floor(numberOfWorkers / 2),
1247 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1249 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1250 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1251 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1254 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1255 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1256 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1257 await dynamicThreadPool.destroy()
1258 const fixedClusterPool = new FixedClusterPool(
1260 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1262 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1263 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1264 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1267 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1268 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1269 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1270 await fixedClusterPool.destroy()
1273 it('Verify that addTaskFunction() is working', async () => {
1274 const dynamicThreadPool = new DynamicThreadPool(
1275 Math.floor(numberOfWorkers / 2),
1277 './tests/worker-files/thread/testWorker.mjs'
1279 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1281 dynamicThreadPool.addTaskFunction(0, () => {})
1282 ).rejects.toThrow(new TypeError('name argument must be a string'))
1284 dynamicThreadPool.addTaskFunction('', () => {})
1286 new TypeError('name argument must not be an empty string')
1288 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1289 new TypeError('fn argument must be a function')
1291 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1292 new TypeError('fn argument must be a function')
1294 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1298 const echoTaskFunction = data => {
1302 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1303 ).resolves.toBe(true)
1304 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1305 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1308 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1313 const taskFunctionData = { test: 'test' }
1314 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1315 expect(echoResult).toStrictEqual(taskFunctionData)
1316 for (const workerNode of dynamicThreadPool.workerNodes) {
1317 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1319 executed: expect.any(Number),
1326 history: new CircularArray()
1329 history: new CircularArray()
1333 history: new CircularArray()
1336 history: new CircularArray()
1341 await dynamicThreadPool.destroy()
1344 it('Verify that removeTaskFunction() is working', async () => {
1345 const dynamicThreadPool = new DynamicThreadPool(
1346 Math.floor(numberOfWorkers / 2),
1348 './tests/worker-files/thread/testWorker.mjs'
1350 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1351 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1355 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1356 new Error('Cannot remove a task function not handled on the pool side')
1358 const echoTaskFunction = data => {
1361 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1362 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1363 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1366 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1371 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1374 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1375 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1380 await dynamicThreadPool.destroy()
1383 it('Verify that listTaskFunctionNames() is working', async () => {
1384 const dynamicThreadPool = new DynamicThreadPool(
1385 Math.floor(numberOfWorkers / 2),
1387 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1389 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1390 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1392 'jsonIntegerSerialization',
1396 await dynamicThreadPool.destroy()
1397 const fixedClusterPool = new FixedClusterPool(
1399 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1401 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1402 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1404 'jsonIntegerSerialization',
1408 await fixedClusterPool.destroy()
1411 it('Verify that setDefaultTaskFunction() is working', async () => {
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1418 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1420 "Task function operation 'default' failed on worker 33 with error: 'TypeError: name parameter is not a string'"
1424 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1427 "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1431 dynamicThreadPool.setDefaultTaskFunction('unknown')
1434 "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1439 'jsonIntegerSerialization',
1444 dynamicThreadPool.setDefaultTaskFunction('factorial')
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1449 'jsonIntegerSerialization',
1453 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1454 ).resolves.toBe(true)
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1458 'jsonIntegerSerialization',
1461 await dynamicThreadPool.destroy()
1464 it('Verify that multiple task functions worker is working', async () => {
1465 const pool = new DynamicClusterPool(
1466 Math.floor(numberOfWorkers / 2),
1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1470 const data = { n: 10 }
1471 const result0 = await pool.execute(data)
1472 expect(result0).toStrictEqual({ ok: 1 })
1473 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1474 expect(result1).toStrictEqual({ ok: 1 })
1475 const result2 = await pool.execute(data, 'factorial')
1476 expect(result2).toBe(3628800)
1477 const result3 = await pool.execute(data, 'fibonacci')
1478 expect(result3).toBe(55)
1479 expect(pool.info.executingTasks).toBe(0)
1480 expect(pool.info.executedTasks).toBe(4)
1481 for (const workerNode of pool.workerNodes) {
1482 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1484 'jsonIntegerSerialization',
1488 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1489 for (const name of pool.listTaskFunctionNames()) {
1490 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1492 executed: expect.any(Number),
1499 history: expect.any(CircularArray)
1502 history: expect.any(CircularArray)
1506 history: expect.any(CircularArray)
1509 history: expect.any(CircularArray)
1514 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1515 ).toBeGreaterThan(0)
1518 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1520 workerNode.getTaskFunctionWorkerUsage(
1521 workerNode.info.taskFunctionNames[1]
1525 await pool.destroy()
1528 it('Verify sendKillMessageToWorker()', async () => {
1529 const pool = new DynamicClusterPool(
1530 Math.floor(numberOfWorkers / 2),
1532 './tests/worker-files/cluster/testWorker.js'
1534 const workerNodeKey = 0
1536 pool.sendKillMessageToWorker(workerNodeKey)
1537 ).resolves.toBeUndefined()
1538 await pool.destroy()
1541 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1542 const pool = new DynamicClusterPool(
1543 Math.floor(numberOfWorkers / 2),
1545 './tests/worker-files/cluster/testWorker.js'
1547 const workerNodeKey = 0
1549 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1550 taskFunctionOperation: 'add',
1551 taskFunctionName: 'empty',
1552 taskFunction: (() => {}).toString()
1554 ).resolves.toBe(true)
1556 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1557 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1558 await pool.destroy()
1561 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1562 const pool = new DynamicClusterPool(
1563 Math.floor(numberOfWorkers / 2),
1565 './tests/worker-files/cluster/testWorker.js'
1568 pool.sendTaskFunctionOperationToWorkers({
1569 taskFunctionOperation: 'add',
1570 taskFunctionName: 'empty',
1571 taskFunction: (() => {}).toString()
1573 ).resolves.toBe(true)
1574 for (const workerNode of pool.workerNodes) {
1575 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1581 await pool.destroy()