1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that dynamic pool sizing is checked', () => {
130 new DynamicClusterPool(
133 './tests/worker-files/cluster/testWorker.js'
137 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
142 new DynamicThreadPool(
145 './tests/worker-files/thread/testWorker.mjs'
149 'Cannot instantiate a pool with a non safe integer number of workers'
154 new DynamicClusterPool(
157 './tests/worker-files/cluster/testWorker.js'
161 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
166 new DynamicThreadPool(
169 './tests/worker-files/thread/testWorker.mjs'
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
178 new DynamicThreadPool(
181 './tests/worker-files/thread/testWorker.mjs'
185 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
190 new DynamicClusterPool(
193 './tests/worker-files/cluster/testWorker.js'
197 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
202 it('Verify that pool options are checked', async () => {
203 let pool = new FixedThreadPool(
205 './tests/worker-files/thread/testWorker.mjs'
207 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
208 expect(pool.opts).toStrictEqual({
211 restartWorkerOnError: true,
212 enableTasksQueue: false,
213 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
214 workerChoiceStrategyOptions: {
216 runTime: { median: false },
217 waitTime: { median: false },
218 elu: { median: false }
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
223 runTime: { median: false },
224 waitTime: { median: false },
225 elu: { median: false }
227 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
228 .workerChoiceStrategies) {
229 expect(workerChoiceStrategy.opts).toStrictEqual({
231 runTime: { median: false },
232 waitTime: { median: false },
233 elu: { median: false }
237 const testHandler = () => console.info('test handler executed')
238 pool = new FixedThreadPool(
240 './tests/worker-files/thread/testWorker.mjs',
242 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
243 workerChoiceStrategyOptions: {
244 runTime: { median: true },
245 weights: { 0: 300, 1: 200 }
248 restartWorkerOnError: false,
249 enableTasksQueue: true,
250 tasksQueueOptions: { concurrency: 2 },
251 messageHandler: testHandler,
252 errorHandler: testHandler,
253 onlineHandler: testHandler,
254 exitHandler: testHandler
257 expect(pool.emitter).toBeUndefined()
258 expect(pool.opts).toStrictEqual({
261 restartWorkerOnError: false,
262 enableTasksQueue: true,
265 size: Math.pow(numberOfWorkers, 2),
267 tasksStealingOnBackPressure: true
269 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
270 workerChoiceStrategyOptions: {
272 runTime: { median: true },
273 waitTime: { median: false },
274 elu: { median: false },
275 weights: { 0: 300, 1: 200 }
277 onlineHandler: testHandler,
278 messageHandler: testHandler,
279 errorHandler: testHandler,
280 exitHandler: testHandler
282 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
284 runTime: { median: true },
285 waitTime: { median: false },
286 elu: { median: false },
287 weights: { 0: 300, 1: 200 }
289 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
290 .workerChoiceStrategies) {
291 expect(workerChoiceStrategy.opts).toStrictEqual({
293 runTime: { median: true },
294 waitTime: { median: false },
295 elu: { median: false },
296 weights: { 0: 300, 1: 200 }
302 it('Verify that pool options are validated', () => {
307 './tests/worker-files/thread/testWorker.mjs',
309 workerChoiceStrategy: 'invalidStrategy'
312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
317 './tests/worker-files/thread/testWorker.mjs',
319 workerChoiceStrategyOptions: {
320 retries: 'invalidChoiceRetries'
326 'Invalid worker choice strategy options: retries must be an integer'
333 './tests/worker-files/thread/testWorker.mjs',
335 workerChoiceStrategyOptions: {
342 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
349 './tests/worker-files/thread/testWorker.mjs',
351 workerChoiceStrategyOptions: { weights: {} }
356 'Invalid worker choice strategy options: must have a weight for each worker node'
363 './tests/worker-files/thread/testWorker.mjs',
365 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
370 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
377 './tests/worker-files/thread/testWorker.mjs',
379 enableTasksQueue: true,
380 tasksQueueOptions: 'invalidTasksQueueOptions'
384 new TypeError('Invalid tasks queue options: must be a plain object')
390 './tests/worker-files/thread/testWorker.mjs',
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: 0 }
398 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
405 './tests/worker-files/thread/testWorker.mjs',
407 enableTasksQueue: true,
408 tasksQueueOptions: { concurrency: -1 }
413 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
420 './tests/worker-files/thread/testWorker.mjs',
422 enableTasksQueue: true,
423 tasksQueueOptions: { concurrency: 0.2 }
427 new TypeError('Invalid worker node tasks concurrency: must be an integer')
433 './tests/worker-files/thread/testWorker.mjs',
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: 0 }
441 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
448 './tests/worker-files/thread/testWorker.mjs',
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: -1 }
456 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
463 './tests/worker-files/thread/testWorker.mjs',
465 enableTasksQueue: true,
466 tasksQueueOptions: { size: 0.2 }
470 new TypeError('Invalid worker node tasks queue size: must be an integer')
474 it('Verify that pool worker choice strategy options can be set', async () => {
475 const pool = new FixedThreadPool(
477 './tests/worker-files/thread/testWorker.mjs',
478 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
480 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
486 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
488 runTime: { median: false },
489 waitTime: { median: false },
490 elu: { median: false }
492 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
493 .workerChoiceStrategies) {
494 expect(workerChoiceStrategy.opts).toStrictEqual({
496 runTime: { median: false },
497 waitTime: { median: false },
498 elu: { median: false }
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: true },
522 elu: { median: true }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
526 runTime: { median: true },
527 waitTime: { median: false },
528 elu: { median: true }
530 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
532 runTime: { median: true },
533 waitTime: { median: false },
534 elu: { median: true }
536 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
537 .workerChoiceStrategies) {
538 expect(workerChoiceStrategy.opts).toStrictEqual({
540 runTime: { median: true },
541 waitTime: { median: false },
542 elu: { median: true }
546 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
564 pool.setWorkerChoiceStrategyOptions({
565 runTime: { median: false },
566 elu: { median: false }
568 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
574 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
576 runTime: { median: false },
577 waitTime: { median: false },
578 elu: { median: false }
580 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
581 .workerChoiceStrategies) {
582 expect(workerChoiceStrategy.opts).toStrictEqual({
584 runTime: { median: false },
585 waitTime: { median: false },
586 elu: { median: false }
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
609 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
612 'Invalid worker choice strategy options: must be a plain object'
616 pool.setWorkerChoiceStrategyOptions({
617 retries: 'invalidChoiceRetries'
621 'Invalid worker choice strategy options: retries must be an integer'
624 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
626 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
629 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
631 'Invalid worker choice strategy options: must have a weight for each worker node'
635 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
638 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
644 it('Verify that pool tasks queue can be enabled/disabled', async () => {
645 const pool = new FixedThreadPool(
647 './tests/worker-files/thread/testWorker.mjs'
649 expect(pool.opts.enableTasksQueue).toBe(false)
650 expect(pool.opts.tasksQueueOptions).toBeUndefined()
651 pool.enableTasksQueue(true)
652 expect(pool.opts.enableTasksQueue).toBe(true)
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 size: Math.pow(numberOfWorkers, 2),
657 tasksStealingOnBackPressure: true
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
663 size: Math.pow(numberOfWorkers, 2),
665 tasksStealingOnBackPressure: true
667 pool.enableTasksQueue(false)
668 expect(pool.opts.enableTasksQueue).toBe(false)
669 expect(pool.opts.tasksQueueOptions).toBeUndefined()
673 it('Verify that pool tasks queue options can be set', async () => {
674 const pool = new FixedThreadPool(
676 './tests/worker-files/thread/testWorker.mjs',
677 { enableTasksQueue: true }
679 expect(pool.opts.tasksQueueOptions).toStrictEqual({
681 size: Math.pow(numberOfWorkers, 2),
683 tasksStealingOnBackPressure: true
685 for (const workerNode of pool.workerNodes) {
686 expect(workerNode.tasksQueueBackPressureSize).toBe(
687 pool.opts.tasksQueueOptions.size
690 pool.setTasksQueueOptions({
694 tasksStealingOnBackPressure: false
696 expect(pool.opts.tasksQueueOptions).toStrictEqual({
700 tasksStealingOnBackPressure: false
702 for (const workerNode of pool.workerNodes) {
703 expect(workerNode.tasksQueueBackPressureSize).toBe(
704 pool.opts.tasksQueueOptions.size
707 pool.setTasksQueueOptions({
710 tasksStealingOnBackPressure: true
712 expect(pool.opts.tasksQueueOptions).toStrictEqual({
714 size: Math.pow(numberOfWorkers, 2),
716 tasksStealingOnBackPressure: true
718 for (const workerNode of pool.workerNodes) {
719 expect(workerNode.tasksQueueBackPressureSize).toBe(
720 pool.opts.tasksQueueOptions.size
723 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
724 new TypeError('Invalid tasks queue options: must be a plain object')
726 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
728 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
731 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
733 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
736 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
737 new TypeError('Invalid worker node tasks concurrency: must be an integer')
739 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
741 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
744 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
746 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
749 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
750 new TypeError('Invalid worker node tasks queue size: must be an integer')
755 it('Verify that pool info is set', async () => {
756 let pool = new FixedThreadPool(
758 './tests/worker-files/thread/testWorker.mjs'
760 expect(pool.info).toStrictEqual({
762 type: PoolTypes.fixed,
763 worker: WorkerTypes.thread,
766 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
767 minSize: numberOfWorkers,
768 maxSize: numberOfWorkers,
769 workerNodes: numberOfWorkers,
770 idleWorkerNodes: numberOfWorkers,
777 pool = new DynamicClusterPool(
778 Math.floor(numberOfWorkers / 2),
780 './tests/worker-files/cluster/testWorker.js'
782 expect(pool.info).toStrictEqual({
784 type: PoolTypes.dynamic,
785 worker: WorkerTypes.cluster,
788 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
789 minSize: Math.floor(numberOfWorkers / 2),
790 maxSize: numberOfWorkers,
791 workerNodes: Math.floor(numberOfWorkers / 2),
792 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
801 it('Verify that pool worker tasks usage are initialized', async () => {
802 const pool = new FixedClusterPool(
804 './tests/worker-files/cluster/testWorker.js'
806 for (const workerNode of pool.workerNodes) {
807 expect(workerNode).toBeInstanceOf(WorkerNode)
808 expect(workerNode.usage).toStrictEqual({
814 sequentiallyStolen: 0,
819 history: new CircularArray()
822 history: new CircularArray()
826 history: new CircularArray()
829 history: new CircularArray()
837 it('Verify that pool worker tasks queue are initialized', async () => {
838 let pool = new FixedClusterPool(
840 './tests/worker-files/cluster/testWorker.js'
842 for (const workerNode of pool.workerNodes) {
843 expect(workerNode).toBeInstanceOf(WorkerNode)
844 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
845 expect(workerNode.tasksQueue.size).toBe(0)
846 expect(workerNode.tasksQueue.maxSize).toBe(0)
849 pool = new DynamicThreadPool(
850 Math.floor(numberOfWorkers / 2),
852 './tests/worker-files/thread/testWorker.mjs'
854 for (const workerNode of pool.workerNodes) {
855 expect(workerNode).toBeInstanceOf(WorkerNode)
856 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
857 expect(workerNode.tasksQueue.size).toBe(0)
858 expect(workerNode.tasksQueue.maxSize).toBe(0)
863 it('Verify that pool worker info are initialized', async () => {
864 let pool = new FixedClusterPool(
866 './tests/worker-files/cluster/testWorker.js'
868 for (const workerNode of pool.workerNodes) {
869 expect(workerNode).toBeInstanceOf(WorkerNode)
870 expect(workerNode.info).toStrictEqual({
871 id: expect.any(Number),
872 type: WorkerTypes.cluster,
878 pool = new DynamicThreadPool(
879 Math.floor(numberOfWorkers / 2),
881 './tests/worker-files/thread/testWorker.mjs'
883 for (const workerNode of pool.workerNodes) {
884 expect(workerNode).toBeInstanceOf(WorkerNode)
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.thread,
895 it('Verify that pool statuses are checked at start or destroy', async () => {
896 const pool = new FixedThreadPool(
898 './tests/worker-files/thread/testWorker.mjs'
900 expect(pool.info.started).toBe(true)
901 expect(pool.info.ready).toBe(true)
902 expect(() => pool.start()).toThrow(
903 new Error('Cannot start an already started pool')
906 expect(pool.info.started).toBe(false)
907 expect(pool.info.ready).toBe(false)
908 await expect(pool.destroy()).rejects.toThrow(
909 new Error('Cannot destroy an already destroyed pool')
913 it('Verify that pool can be started after initialization', async () => {
914 const pool = new FixedClusterPool(
916 './tests/worker-files/cluster/testWorker.js',
921 expect(pool.info.started).toBe(false)
922 expect(pool.info.ready).toBe(false)
923 expect(pool.readyEventEmitted).toBe(false)
924 expect(pool.workerNodes).toStrictEqual([])
925 await expect(pool.execute()).rejects.toThrow(
926 new Error('Cannot execute a task on not started pool')
929 expect(pool.info.started).toBe(true)
930 expect(pool.info.ready).toBe(true)
931 await waitPoolEvents(pool, PoolEvents.ready, 1)
932 expect(pool.readyEventEmitted).toBe(true)
933 expect(pool.workerNodes.length).toBe(numberOfWorkers)
934 for (const workerNode of pool.workerNodes) {
935 expect(workerNode).toBeInstanceOf(WorkerNode)
940 it('Verify that pool execute() arguments are checked', async () => {
941 const pool = new FixedClusterPool(
943 './tests/worker-files/cluster/testWorker.js'
945 await expect(pool.execute(undefined, 0)).rejects.toThrow(
946 new TypeError('name argument must be a string')
948 await expect(pool.execute(undefined, '')).rejects.toThrow(
949 new TypeError('name argument must not be an empty string')
951 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
952 new TypeError('transferList argument must be an array')
954 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
955 "Task function 'unknown' not found"
958 await expect(pool.execute()).rejects.toThrow(
959 new Error('Cannot execute a task on not started pool')
963 it('Verify that pool worker tasks usage are computed', async () => {
964 const pool = new FixedClusterPool(
966 './tests/worker-files/cluster/testWorker.js'
968 const promises = new Set()
969 const maxMultiplier = 2
970 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
971 promises.add(pool.execute())
973 for (const workerNode of pool.workerNodes) {
974 expect(workerNode.usage).toStrictEqual({
977 executing: maxMultiplier,
980 sequentiallyStolen: 0,
985 history: expect.any(CircularArray)
988 history: expect.any(CircularArray)
992 history: expect.any(CircularArray)
995 history: expect.any(CircularArray)
1000 await Promise.all(promises)
1001 for (const workerNode of pool.workerNodes) {
1002 expect(workerNode.usage).toStrictEqual({
1004 executed: maxMultiplier,
1008 sequentiallyStolen: 0,
1013 history: expect.any(CircularArray)
1016 history: expect.any(CircularArray)
1020 history: expect.any(CircularArray)
1023 history: expect.any(CircularArray)
1028 await pool.destroy()
1031 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1032 const pool = new DynamicThreadPool(
1033 Math.floor(numberOfWorkers / 2),
1035 './tests/worker-files/thread/testWorker.mjs'
1037 const promises = new Set()
1038 const maxMultiplier = 2
1039 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1040 promises.add(pool.execute())
1042 await Promise.all(promises)
1043 for (const workerNode of pool.workerNodes) {
1044 expect(workerNode.usage).toStrictEqual({
1046 executed: expect.any(Number),
1050 sequentiallyStolen: 0,
1055 history: expect.any(CircularArray)
1058 history: expect.any(CircularArray)
1062 history: expect.any(CircularArray)
1065 history: expect.any(CircularArray)
1069 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1070 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1071 numberOfWorkers * maxMultiplier
1073 expect(workerNode.usage.runTime.history.length).toBe(0)
1074 expect(workerNode.usage.waitTime.history.length).toBe(0)
1075 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1076 expect(workerNode.usage.elu.active.history.length).toBe(0)
1078 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1079 for (const workerNode of pool.workerNodes) {
1080 expect(workerNode.usage).toStrictEqual({
1086 sequentiallyStolen: 0,
1091 history: expect.any(CircularArray)
1094 history: expect.any(CircularArray)
1098 history: expect.any(CircularArray)
1101 history: expect.any(CircularArray)
1105 expect(workerNode.usage.runTime.history.length).toBe(0)
1106 expect(workerNode.usage.waitTime.history.length).toBe(0)
1107 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1108 expect(workerNode.usage.elu.active.history.length).toBe(0)
1110 await pool.destroy()
1113 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1114 const pool = new DynamicClusterPool(
1115 Math.floor(numberOfWorkers / 2),
1117 './tests/worker-files/cluster/testWorker.js'
1119 expect(pool.emitter.eventNames()).toStrictEqual([])
1122 pool.emitter.on(PoolEvents.ready, info => {
1126 await waitPoolEvents(pool, PoolEvents.ready, 1)
1127 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1128 expect(poolReady).toBe(1)
1129 expect(poolInfo).toStrictEqual({
1131 type: PoolTypes.dynamic,
1132 worker: WorkerTypes.cluster,
1135 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 minSize: expect.any(Number),
1137 maxSize: expect.any(Number),
1138 workerNodes: expect.any(Number),
1139 idleWorkerNodes: expect.any(Number),
1140 busyWorkerNodes: expect.any(Number),
1141 executedTasks: expect.any(Number),
1142 executingTasks: expect.any(Number),
1143 failedTasks: expect.any(Number)
1145 await pool.destroy()
1148 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1149 const pool = new FixedThreadPool(
1151 './tests/worker-files/thread/testWorker.mjs'
1153 expect(pool.emitter.eventNames()).toStrictEqual([])
1154 const promises = new Set()
1157 pool.emitter.on(PoolEvents.busy, info => {
1161 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1162 for (let i = 0; i < numberOfWorkers * 2; i++) {
1163 promises.add(pool.execute())
1165 await Promise.all(promises)
1166 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1167 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1168 expect(poolBusy).toBe(numberOfWorkers + 1)
1169 expect(poolInfo).toStrictEqual({
1171 type: PoolTypes.fixed,
1172 worker: WorkerTypes.thread,
1175 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1176 minSize: expect.any(Number),
1177 maxSize: expect.any(Number),
1178 workerNodes: expect.any(Number),
1179 idleWorkerNodes: expect.any(Number),
1180 busyWorkerNodes: expect.any(Number),
1181 executedTasks: expect.any(Number),
1182 executingTasks: expect.any(Number),
1183 failedTasks: expect.any(Number)
1185 await pool.destroy()
1188 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1189 const pool = new DynamicThreadPool(
1190 Math.floor(numberOfWorkers / 2),
1192 './tests/worker-files/thread/testWorker.mjs'
1194 expect(pool.emitter.eventNames()).toStrictEqual([])
1195 const promises = new Set()
1198 pool.emitter.on(PoolEvents.full, info => {
1202 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1203 for (let i = 0; i < numberOfWorkers * 2; i++) {
1204 promises.add(pool.execute())
1206 await Promise.all(promises)
1207 expect(poolFull).toBe(1)
1208 expect(poolInfo).toStrictEqual({
1210 type: PoolTypes.dynamic,
1211 worker: WorkerTypes.thread,
1214 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1215 minSize: expect.any(Number),
1216 maxSize: expect.any(Number),
1217 workerNodes: expect.any(Number),
1218 idleWorkerNodes: expect.any(Number),
1219 busyWorkerNodes: expect.any(Number),
1220 executedTasks: expect.any(Number),
1221 executingTasks: expect.any(Number),
1222 failedTasks: expect.any(Number)
1224 await pool.destroy()
1227 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1228 const pool = new FixedThreadPool(
1230 './tests/worker-files/thread/testWorker.mjs',
1232 enableTasksQueue: true
1235 stub(pool, 'hasBackPressure').returns(true)
1236 expect(pool.emitter.eventNames()).toStrictEqual([])
1237 const promises = new Set()
1238 let poolBackPressure = 0
1240 pool.emitter.on(PoolEvents.backPressure, info => {
1244 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1245 for (let i = 0; i < numberOfWorkers + 1; i++) {
1246 promises.add(pool.execute())
1248 await Promise.all(promises)
1249 expect(poolBackPressure).toBe(1)
1250 expect(poolInfo).toStrictEqual({
1252 type: PoolTypes.fixed,
1253 worker: WorkerTypes.thread,
1256 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1257 minSize: expect.any(Number),
1258 maxSize: expect.any(Number),
1259 workerNodes: expect.any(Number),
1260 idleWorkerNodes: expect.any(Number),
1261 busyWorkerNodes: expect.any(Number),
1262 executedTasks: expect.any(Number),
1263 executingTasks: expect.any(Number),
1264 maxQueuedTasks: expect.any(Number),
1265 queuedTasks: expect.any(Number),
1267 stolenTasks: expect.any(Number),
1268 failedTasks: expect.any(Number)
1270 expect(pool.hasBackPressure.called).toBe(true)
1271 await pool.destroy()
1274 it('Verify that pool asynchronous resource track tasks execution', async () => {
1279 let resolveCalls = 0
1280 const hook = createHook({
1281 init (asyncId, type) {
1282 if (type === 'poolifier:task') {
1284 taskAsyncId = asyncId
1288 if (asyncId === taskAsyncId) beforeCalls++
1291 if (asyncId === taskAsyncId) afterCalls++
1294 if (executionAsyncId() === taskAsyncId) resolveCalls++
1297 const pool = new FixedThreadPool(
1299 './tests/worker-files/thread/testWorker.mjs'
1302 await pool.execute()
1304 expect(initCalls).toBe(1)
1305 expect(beforeCalls).toBe(1)
1306 expect(afterCalls).toBe(1)
1307 expect(resolveCalls).toBe(1)
1308 await pool.destroy()
1311 it('Verify that hasTaskFunction() is working', async () => {
1312 const dynamicThreadPool = new DynamicThreadPool(
1313 Math.floor(numberOfWorkers / 2),
1315 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1317 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1318 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1322 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1324 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1325 await dynamicThreadPool.destroy()
1326 const fixedClusterPool = new FixedClusterPool(
1328 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1330 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1331 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1335 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1338 await fixedClusterPool.destroy()
1341 it('Verify that addTaskFunction() is working', async () => {
1342 const dynamicThreadPool = new DynamicThreadPool(
1343 Math.floor(numberOfWorkers / 2),
1345 './tests/worker-files/thread/testWorker.mjs'
1347 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1349 dynamicThreadPool.addTaskFunction(0, () => {})
1350 ).rejects.toThrow(new TypeError('name argument must be a string'))
1352 dynamicThreadPool.addTaskFunction('', () => {})
1354 new TypeError('name argument must not be an empty string')
1356 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1359 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1360 new TypeError('fn argument must be a function')
1362 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1366 const echoTaskFunction = data => {
1370 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1371 ).resolves.toBe(true)
1372 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1373 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1381 const taskFunctionData = { test: 'test' }
1382 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1383 expect(echoResult).toStrictEqual(taskFunctionData)
1384 for (const workerNode of dynamicThreadPool.workerNodes) {
1385 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1387 executed: expect.any(Number),
1390 sequentiallyStolen: 0,
1395 history: new CircularArray()
1398 history: new CircularArray()
1402 history: new CircularArray()
1405 history: new CircularArray()
1410 await dynamicThreadPool.destroy()
1413 it('Verify that removeTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1417 './tests/worker-files/thread/testWorker.mjs'
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1420 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1424 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1425 new Error('Cannot remove a task function not handled on the pool side')
1427 const echoTaskFunction = data => {
1430 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1432 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1435 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1440 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1445 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1449 await dynamicThreadPool.destroy()
1452 it('Verify that listTaskFunctionNames() is working', async () => {
1453 const dynamicThreadPool = new DynamicThreadPool(
1454 Math.floor(numberOfWorkers / 2),
1456 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1458 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1461 'jsonIntegerSerialization',
1465 await dynamicThreadPool.destroy()
1466 const fixedClusterPool = new FixedClusterPool(
1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1470 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1471 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1473 'jsonIntegerSerialization',
1477 await fixedClusterPool.destroy()
1480 it('Verify that setDefaultTaskFunction() is working', async () => {
1481 const dynamicThreadPool = new DynamicThreadPool(
1482 Math.floor(numberOfWorkers / 2),
1484 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1486 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1487 const workerId = dynamicThreadPool.workerNodes[0].info.id
1488 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1490 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1494 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1497 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1501 dynamicThreadPool.setDefaultTaskFunction('unknown')
1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1509 'jsonIntegerSerialization',
1514 dynamicThreadPool.setDefaultTaskFunction('factorial')
1515 ).resolves.toBe(true)
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1519 'jsonIntegerSerialization',
1523 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1528 'jsonIntegerSerialization',
1531 await dynamicThreadPool.destroy()
1534 it('Verify that multiple task functions worker is working', async () => {
1535 const pool = new DynamicClusterPool(
1536 Math.floor(numberOfWorkers / 2),
1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1540 const data = { n: 10 }
1541 const result0 = await pool.execute(data)
1542 expect(result0).toStrictEqual({ ok: 1 })
1543 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1544 expect(result1).toStrictEqual({ ok: 1 })
1545 const result2 = await pool.execute(data, 'factorial')
1546 expect(result2).toBe(3628800)
1547 const result3 = await pool.execute(data, 'fibonacci')
1548 expect(result3).toBe(55)
1549 expect(pool.info.executingTasks).toBe(0)
1550 expect(pool.info.executedTasks).toBe(4)
1551 for (const workerNode of pool.workerNodes) {
1552 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1554 'jsonIntegerSerialization',
1558 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1559 for (const name of pool.listTaskFunctionNames()) {
1560 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1562 executed: expect.any(Number),
1566 sequentiallyStolen: 0,
1570 history: expect.any(CircularArray)
1573 history: expect.any(CircularArray)
1577 history: expect.any(CircularArray)
1580 history: expect.any(CircularArray)
1585 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1586 ).toBeGreaterThan(0)
1589 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1591 workerNode.getTaskFunctionWorkerUsage(
1592 workerNode.info.taskFunctionNames[1]
1596 await pool.destroy()
1599 it('Verify sendKillMessageToWorker()', async () => {
1600 const pool = new DynamicClusterPool(
1601 Math.floor(numberOfWorkers / 2),
1603 './tests/worker-files/cluster/testWorker.js'
1605 const workerNodeKey = 0
1607 pool.sendKillMessageToWorker(workerNodeKey)
1608 ).resolves.toBeUndefined()
1609 await pool.destroy()
1612 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1613 const pool = new DynamicClusterPool(
1614 Math.floor(numberOfWorkers / 2),
1616 './tests/worker-files/cluster/testWorker.js'
1618 const workerNodeKey = 0
1620 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1621 taskFunctionOperation: 'add',
1622 taskFunctionName: 'empty',
1623 taskFunction: (() => {}).toString()
1625 ).resolves.toBe(true)
1627 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1628 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1629 await pool.destroy()
1632 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1633 const pool = new DynamicClusterPool(
1634 Math.floor(numberOfWorkers / 2),
1636 './tests/worker-files/cluster/testWorker.js'
1639 pool.sendTaskFunctionOperationToWorkers({
1640 taskFunctionOperation: 'add',
1641 taskFunctionName: 'empty',
1642 taskFunction: (() => {}).toString()
1644 ).resolves.toBe(true)
1645 for (const workerNode of pool.workerNodes) {
1646 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1652 await pool.destroy()