1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new Error("Cannot find the worker file 'undefined'")
84 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
85 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
88 it('Verify that numberOfWorkers is checked', () => {
93 './tests/worker-files/thread/testWorker.mjs'
97 'Cannot instantiate a pool without specifying the number of workers'
102 it('Verify that a negative number of workers is checked', () => {
105 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
108 'Cannot instantiate a pool with a negative number of workers'
113 it('Verify that a non integer number of workers is checked', () => {
116 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
119 'Cannot instantiate a pool with a non safe integer number of workers'
124 it('Verify that dynamic pool sizing is checked', () => {
127 new DynamicClusterPool(
130 './tests/worker-files/cluster/testWorker.js'
134 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
139 new DynamicThreadPool(
142 './tests/worker-files/thread/testWorker.mjs'
146 'Cannot instantiate a pool with a non safe integer number of workers'
151 new DynamicClusterPool(
154 './tests/worker-files/cluster/testWorker.js'
158 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
163 new DynamicThreadPool(
166 './tests/worker-files/thread/testWorker.mjs'
170 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
175 new DynamicThreadPool(
178 './tests/worker-files/thread/testWorker.mjs'
182 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
187 new DynamicClusterPool(
190 './tests/worker-files/cluster/testWorker.js'
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
199 it('Verify that pool options are checked', async () => {
200 let pool = new FixedThreadPool(
202 './tests/worker-files/thread/testWorker.mjs'
204 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
205 expect(pool.opts).toStrictEqual({
208 restartWorkerOnError: true,
209 enableTasksQueue: false,
210 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
211 workerChoiceStrategyOptions: {
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
218 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
220 runTime: { median: false },
221 waitTime: { median: false },
222 elu: { median: false }
224 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
225 .workerChoiceStrategies) {
226 expect(workerChoiceStrategy.opts).toStrictEqual({
228 runTime: { median: false },
229 waitTime: { median: false },
230 elu: { median: false }
234 const testHandler = () => console.info('test handler executed')
235 pool = new FixedThreadPool(
237 './tests/worker-files/thread/testWorker.mjs',
239 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
240 workerChoiceStrategyOptions: {
241 runTime: { median: true },
242 weights: { 0: 300, 1: 200 }
245 restartWorkerOnError: false,
246 enableTasksQueue: true,
247 tasksQueueOptions: { concurrency: 2 },
248 messageHandler: testHandler,
249 errorHandler: testHandler,
250 onlineHandler: testHandler,
251 exitHandler: testHandler
254 expect(pool.emitter).toBeUndefined()
255 expect(pool.opts).toStrictEqual({
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
262 size: Math.pow(numberOfWorkers, 2),
264 tasksStealingOnBackPressure: true
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
269 runTime: { median: true },
270 waitTime: { median: false },
271 elu: { median: false },
272 weights: { 0: 300, 1: 200 }
274 onlineHandler: testHandler,
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 exitHandler: testHandler
279 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
281 runTime: { median: true },
282 waitTime: { median: false },
283 elu: { median: false },
284 weights: { 0: 300, 1: 200 }
286 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
287 .workerChoiceStrategies) {
288 expect(workerChoiceStrategy.opts).toStrictEqual({
290 runTime: { median: true },
291 waitTime: { median: false },
292 elu: { median: false },
293 weights: { 0: 300, 1: 200 }
299 it('Verify that pool options are validated', () => {
304 './tests/worker-files/thread/testWorker.mjs',
306 workerChoiceStrategy: 'invalidStrategy'
309 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
314 './tests/worker-files/thread/testWorker.mjs',
316 workerChoiceStrategyOptions: {
317 retries: 'invalidChoiceRetries'
323 'Invalid worker choice strategy options: retries must be an integer'
330 './tests/worker-files/thread/testWorker.mjs',
332 workerChoiceStrategyOptions: {
339 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
346 './tests/worker-files/thread/testWorker.mjs',
348 workerChoiceStrategyOptions: { weights: {} }
353 'Invalid worker choice strategy options: must have a weight for each worker node'
360 './tests/worker-files/thread/testWorker.mjs',
362 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
367 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: 'invalidTasksQueueOptions'
381 new TypeError('Invalid tasks queue options: must be a plain object')
387 './tests/worker-files/thread/testWorker.mjs',
389 enableTasksQueue: true,
390 tasksQueueOptions: { concurrency: 0 }
395 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { concurrency: -1 }
410 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { concurrency: 0.2 }
424 new TypeError('Invalid worker node tasks concurrency: must be an integer')
430 './tests/worker-files/thread/testWorker.mjs',
432 enableTasksQueue: true,
433 tasksQueueOptions: { size: 0 }
438 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
445 './tests/worker-files/thread/testWorker.mjs',
447 enableTasksQueue: true,
448 tasksQueueOptions: { size: -1 }
453 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
460 './tests/worker-files/thread/testWorker.mjs',
462 enableTasksQueue: true,
463 tasksQueueOptions: { size: 0.2 }
467 new TypeError('Invalid worker node tasks queue size: must be an integer')
471 it('Verify that pool worker choice strategy options can be set', async () => {
472 const pool = new FixedThreadPool(
474 './tests/worker-files/thread/testWorker.mjs',
475 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
477 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false }
483 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
485 runTime: { median: false },
486 waitTime: { median: false },
487 elu: { median: false }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
493 runTime: { median: false },
494 waitTime: { median: false },
495 elu: { median: false }
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: true },
519 elu: { median: true }
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
523 runTime: { median: true },
524 waitTime: { median: false },
525 elu: { median: true }
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
529 runTime: { median: true },
530 waitTime: { median: false },
531 elu: { median: true }
533 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
534 .workerChoiceStrategies) {
535 expect(workerChoiceStrategy.opts).toStrictEqual({
537 runTime: { median: true },
538 waitTime: { median: false },
539 elu: { median: true }
543 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
561 pool.setWorkerChoiceStrategyOptions({
562 runTime: { median: false },
563 elu: { median: false }
565 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
567 runTime: { median: false },
568 waitTime: { median: false },
569 elu: { median: false }
571 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
573 runTime: { median: false },
574 waitTime: { median: false },
575 elu: { median: false }
577 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
578 .workerChoiceStrategies) {
579 expect(workerChoiceStrategy.opts).toStrictEqual({
581 runTime: { median: false },
582 waitTime: { median: false },
583 elu: { median: false }
587 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
606 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
609 'Invalid worker choice strategy options: must be a plain object'
613 pool.setWorkerChoiceStrategyOptions({
614 retries: 'invalidChoiceRetries'
618 'Invalid worker choice strategy options: retries must be an integer'
621 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
623 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
644 './tests/worker-files/thread/testWorker.mjs'
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 size: Math.pow(numberOfWorkers, 2),
654 tasksStealingOnBackPressure: true
656 pool.enableTasksQueue(true, { concurrency: 2 })
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
660 size: Math.pow(numberOfWorkers, 2),
662 tasksStealingOnBackPressure: true
664 pool.enableTasksQueue(false)
665 expect(pool.opts.enableTasksQueue).toBe(false)
666 expect(pool.opts.tasksQueueOptions).toBeUndefined()
670 it('Verify that pool tasks queue options can be set', async () => {
671 const pool = new FixedThreadPool(
673 './tests/worker-files/thread/testWorker.mjs',
674 { enableTasksQueue: true }
676 expect(pool.opts.tasksQueueOptions).toStrictEqual({
678 size: Math.pow(numberOfWorkers, 2),
680 tasksStealingOnBackPressure: true
682 for (const workerNode of pool.workerNodes) {
683 expect(workerNode.tasksQueueBackPressureSize).toBe(
684 pool.opts.tasksQueueOptions.size
687 pool.setTasksQueueOptions({
691 tasksStealingOnBackPressure: false
693 expect(pool.opts.tasksQueueOptions).toStrictEqual({
697 tasksStealingOnBackPressure: false
699 for (const workerNode of pool.workerNodes) {
700 expect(workerNode.tasksQueueBackPressureSize).toBe(
701 pool.opts.tasksQueueOptions.size
704 pool.setTasksQueueOptions({
707 tasksStealingOnBackPressure: true
709 expect(pool.opts.tasksQueueOptions).toStrictEqual({
711 size: Math.pow(numberOfWorkers, 2),
713 tasksStealingOnBackPressure: true
715 for (const workerNode of pool.workerNodes) {
716 expect(workerNode.tasksQueueBackPressureSize).toBe(
717 pool.opts.tasksQueueOptions.size
720 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
721 new TypeError('Invalid tasks queue options: must be a plain object')
723 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
725 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
728 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
730 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
733 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
734 new TypeError('Invalid worker node tasks concurrency: must be an integer')
736 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
738 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
741 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
743 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
746 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
747 new TypeError('Invalid worker node tasks queue size: must be an integer')
752 it('Verify that pool info is set', async () => {
753 let pool = new FixedThreadPool(
755 './tests/worker-files/thread/testWorker.mjs'
757 expect(pool.info).toStrictEqual({
759 type: PoolTypes.fixed,
760 worker: WorkerTypes.thread,
763 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
764 minSize: numberOfWorkers,
765 maxSize: numberOfWorkers,
766 workerNodes: numberOfWorkers,
767 idleWorkerNodes: numberOfWorkers,
774 pool = new DynamicClusterPool(
775 Math.floor(numberOfWorkers / 2),
777 './tests/worker-files/cluster/testWorker.js'
779 expect(pool.info).toStrictEqual({
781 type: PoolTypes.dynamic,
782 worker: WorkerTypes.cluster,
785 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
786 minSize: Math.floor(numberOfWorkers / 2),
787 maxSize: numberOfWorkers,
788 workerNodes: Math.floor(numberOfWorkers / 2),
789 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
798 it('Verify that pool worker tasks usage are initialized', async () => {
799 const pool = new FixedClusterPool(
801 './tests/worker-files/cluster/testWorker.js'
803 for (const workerNode of pool.workerNodes) {
804 expect(workerNode).toBeInstanceOf(WorkerNode)
805 expect(workerNode.usage).toStrictEqual({
811 sequentiallyStolen: 0,
816 history: new CircularArray()
819 history: new CircularArray()
823 history: new CircularArray()
826 history: new CircularArray()
834 it('Verify that pool worker tasks queue are initialized', async () => {
835 let pool = new FixedClusterPool(
837 './tests/worker-files/cluster/testWorker.js'
839 for (const workerNode of pool.workerNodes) {
840 expect(workerNode).toBeInstanceOf(WorkerNode)
841 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
842 expect(workerNode.tasksQueue.size).toBe(0)
843 expect(workerNode.tasksQueue.maxSize).toBe(0)
846 pool = new DynamicThreadPool(
847 Math.floor(numberOfWorkers / 2),
849 './tests/worker-files/thread/testWorker.mjs'
851 for (const workerNode of pool.workerNodes) {
852 expect(workerNode).toBeInstanceOf(WorkerNode)
853 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
854 expect(workerNode.tasksQueue.size).toBe(0)
855 expect(workerNode.tasksQueue.maxSize).toBe(0)
860 it('Verify that pool worker info are initialized', async () => {
861 let pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.js'
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.info).toStrictEqual({
868 id: expect.any(Number),
869 type: WorkerTypes.cluster,
875 pool = new DynamicThreadPool(
876 Math.floor(numberOfWorkers / 2),
878 './tests/worker-files/thread/testWorker.mjs'
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
882 expect(workerNode.info).toStrictEqual({
883 id: expect.any(Number),
884 type: WorkerTypes.thread,
892 it('Verify that pool statuses are checked at start or destroy', async () => {
893 const pool = new FixedThreadPool(
895 './tests/worker-files/thread/testWorker.mjs'
897 expect(pool.info.started).toBe(true)
898 expect(pool.info.ready).toBe(true)
899 expect(() => pool.start()).toThrow(
900 new Error('Cannot start an already started pool')
903 expect(pool.info.started).toBe(false)
904 expect(pool.info.ready).toBe(false)
905 await expect(pool.destroy()).rejects.toThrow(
906 new Error('Cannot destroy an already destroyed pool')
910 it('Verify that pool can be started after initialization', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.js',
918 expect(pool.info.started).toBe(false)
919 expect(pool.info.ready).toBe(false)
920 expect(pool.readyEventEmitted).toBe(false)
921 expect(pool.workerNodes).toStrictEqual([])
922 await expect(pool.execute()).rejects.toThrow(
923 new Error('Cannot execute a task on not started pool')
926 expect(pool.info.started).toBe(true)
927 expect(pool.info.ready).toBe(true)
928 await waitPoolEvents(pool, PoolEvents.ready, 1)
929 expect(pool.readyEventEmitted).toBe(true)
930 expect(pool.workerNodes.length).toBe(numberOfWorkers)
931 for (const workerNode of pool.workerNodes) {
932 expect(workerNode).toBeInstanceOf(WorkerNode)
937 it('Verify that pool execute() arguments are checked', async () => {
938 const pool = new FixedClusterPool(
940 './tests/worker-files/cluster/testWorker.js'
942 await expect(pool.execute(undefined, 0)).rejects.toThrow(
943 new TypeError('name argument must be a string')
945 await expect(pool.execute(undefined, '')).rejects.toThrow(
946 new TypeError('name argument must not be an empty string')
948 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
949 new TypeError('transferList argument must be an array')
951 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
952 "Task function 'unknown' not found"
955 await expect(pool.execute()).rejects.toThrow(
956 new Error('Cannot execute a task on not started pool')
960 it('Verify that pool worker tasks usage are computed', async () => {
961 const pool = new FixedClusterPool(
963 './tests/worker-files/cluster/testWorker.js'
965 const promises = new Set()
966 const maxMultiplier = 2
967 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
968 promises.add(pool.execute())
970 for (const workerNode of pool.workerNodes) {
971 expect(workerNode.usage).toStrictEqual({
974 executing: maxMultiplier,
977 sequentiallyStolen: 0,
982 history: expect.any(CircularArray)
985 history: expect.any(CircularArray)
989 history: expect.any(CircularArray)
992 history: expect.any(CircularArray)
997 await Promise.all(promises)
998 for (const workerNode of pool.workerNodes) {
999 expect(workerNode.usage).toStrictEqual({
1001 executed: maxMultiplier,
1005 sequentiallyStolen: 0,
1010 history: expect.any(CircularArray)
1013 history: expect.any(CircularArray)
1017 history: expect.any(CircularArray)
1020 history: expect.any(CircularArray)
1025 await pool.destroy()
1028 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1029 const pool = new DynamicThreadPool(
1030 Math.floor(numberOfWorkers / 2),
1032 './tests/worker-files/thread/testWorker.mjs'
1034 const promises = new Set()
1035 const maxMultiplier = 2
1036 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1037 promises.add(pool.execute())
1039 await Promise.all(promises)
1040 for (const workerNode of pool.workerNodes) {
1041 expect(workerNode.usage).toStrictEqual({
1043 executed: expect.any(Number),
1047 sequentiallyStolen: 0,
1052 history: expect.any(CircularArray)
1055 history: expect.any(CircularArray)
1059 history: expect.any(CircularArray)
1062 history: expect.any(CircularArray)
1066 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1067 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1068 numberOfWorkers * maxMultiplier
1070 expect(workerNode.usage.runTime.history.length).toBe(0)
1071 expect(workerNode.usage.waitTime.history.length).toBe(0)
1072 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1073 expect(workerNode.usage.elu.active.history.length).toBe(0)
1075 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1076 for (const workerNode of pool.workerNodes) {
1077 expect(workerNode.usage).toStrictEqual({
1083 sequentiallyStolen: 0,
1088 history: expect.any(CircularArray)
1091 history: expect.any(CircularArray)
1095 history: expect.any(CircularArray)
1098 history: expect.any(CircularArray)
1102 expect(workerNode.usage.runTime.history.length).toBe(0)
1103 expect(workerNode.usage.waitTime.history.length).toBe(0)
1104 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1105 expect(workerNode.usage.elu.active.history.length).toBe(0)
1107 await pool.destroy()
1110 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1111 const pool = new DynamicClusterPool(
1112 Math.floor(numberOfWorkers / 2),
1114 './tests/worker-files/cluster/testWorker.js'
1116 expect(pool.emitter.eventNames()).toStrictEqual([])
1119 pool.emitter.on(PoolEvents.ready, info => {
1123 await waitPoolEvents(pool, PoolEvents.ready, 1)
1124 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1125 expect(poolReady).toBe(1)
1126 expect(poolInfo).toStrictEqual({
1128 type: PoolTypes.dynamic,
1129 worker: WorkerTypes.cluster,
1132 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1133 minSize: expect.any(Number),
1134 maxSize: expect.any(Number),
1135 workerNodes: expect.any(Number),
1136 idleWorkerNodes: expect.any(Number),
1137 busyWorkerNodes: expect.any(Number),
1138 executedTasks: expect.any(Number),
1139 executingTasks: expect.any(Number),
1140 failedTasks: expect.any(Number)
1142 await pool.destroy()
1145 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1146 const pool = new FixedThreadPool(
1148 './tests/worker-files/thread/testWorker.mjs'
1150 expect(pool.emitter.eventNames()).toStrictEqual([])
1151 const promises = new Set()
1154 pool.emitter.on(PoolEvents.busy, info => {
1158 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1159 for (let i = 0; i < numberOfWorkers * 2; i++) {
1160 promises.add(pool.execute())
1162 await Promise.all(promises)
1163 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1164 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1165 expect(poolBusy).toBe(numberOfWorkers + 1)
1166 expect(poolInfo).toStrictEqual({
1168 type: PoolTypes.fixed,
1169 worker: WorkerTypes.thread,
1172 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1173 minSize: expect.any(Number),
1174 maxSize: expect.any(Number),
1175 workerNodes: expect.any(Number),
1176 idleWorkerNodes: expect.any(Number),
1177 busyWorkerNodes: expect.any(Number),
1178 executedTasks: expect.any(Number),
1179 executingTasks: expect.any(Number),
1180 failedTasks: expect.any(Number)
1182 await pool.destroy()
1185 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1186 const pool = new DynamicThreadPool(
1187 Math.floor(numberOfWorkers / 2),
1189 './tests/worker-files/thread/testWorker.mjs'
1191 expect(pool.emitter.eventNames()).toStrictEqual([])
1192 const promises = new Set()
1195 pool.emitter.on(PoolEvents.full, info => {
1199 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1200 for (let i = 0; i < numberOfWorkers * 2; i++) {
1201 promises.add(pool.execute())
1203 await Promise.all(promises)
1204 expect(poolFull).toBe(1)
1205 expect(poolInfo).toStrictEqual({
1207 type: PoolTypes.dynamic,
1208 worker: WorkerTypes.thread,
1211 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 busyWorkerNodes: expect.any(Number),
1217 executedTasks: expect.any(Number),
1218 executingTasks: expect.any(Number),
1219 failedTasks: expect.any(Number)
1221 await pool.destroy()
1224 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1225 const pool = new FixedThreadPool(
1227 './tests/worker-files/thread/testWorker.mjs',
1229 enableTasksQueue: true
1232 stub(pool, 'hasBackPressure').returns(true)
1233 expect(pool.emitter.eventNames()).toStrictEqual([])
1234 const promises = new Set()
1235 let poolBackPressure = 0
1237 pool.emitter.on(PoolEvents.backPressure, info => {
1241 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1242 for (let i = 0; i < numberOfWorkers + 1; i++) {
1243 promises.add(pool.execute())
1245 await Promise.all(promises)
1246 expect(poolBackPressure).toBe(1)
1247 expect(poolInfo).toStrictEqual({
1249 type: PoolTypes.fixed,
1250 worker: WorkerTypes.thread,
1253 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1254 minSize: expect.any(Number),
1255 maxSize: expect.any(Number),
1256 workerNodes: expect.any(Number),
1257 idleWorkerNodes: expect.any(Number),
1258 busyWorkerNodes: expect.any(Number),
1259 executedTasks: expect.any(Number),
1260 executingTasks: expect.any(Number),
1261 maxQueuedTasks: expect.any(Number),
1262 queuedTasks: expect.any(Number),
1264 stolenTasks: expect.any(Number),
1265 failedTasks: expect.any(Number)
1267 expect(pool.hasBackPressure.called).toBe(true)
1268 await pool.destroy()
1271 it('Verify that pool asynchronous resource track tasks execution', async () => {
1276 let resolveCalls = 0
1277 const hook = createHook({
1278 init (asyncId, type) {
1279 if (type === 'poolifier:task') {
1281 taskAsyncId = asyncId
1285 if (asyncId === taskAsyncId) beforeCalls++
1288 if (asyncId === taskAsyncId) afterCalls++
1291 if (executionAsyncId() === taskAsyncId) resolveCalls++
1294 const pool = new FixedThreadPool(
1296 './tests/worker-files/thread/testWorker.mjs'
1299 await pool.execute()
1301 expect(initCalls).toBe(1)
1302 expect(beforeCalls).toBe(1)
1303 expect(afterCalls).toBe(1)
1304 expect(resolveCalls).toBe(1)
1305 await pool.destroy()
1308 it('Verify that hasTaskFunction() is working', async () => {
1309 const dynamicThreadPool = new DynamicThreadPool(
1310 Math.floor(numberOfWorkers / 2),
1312 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1314 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1315 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1316 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1319 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1320 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1321 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1322 await dynamicThreadPool.destroy()
1323 const fixedClusterPool = new FixedClusterPool(
1325 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1327 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1328 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1329 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1332 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1333 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1334 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1335 await fixedClusterPool.destroy()
1338 it('Verify that addTaskFunction() is working', async () => {
1339 const dynamicThreadPool = new DynamicThreadPool(
1340 Math.floor(numberOfWorkers / 2),
1342 './tests/worker-files/thread/testWorker.mjs'
1344 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1346 dynamicThreadPool.addTaskFunction(0, () => {})
1347 ).rejects.toThrow(new TypeError('name argument must be a string'))
1349 dynamicThreadPool.addTaskFunction('', () => {})
1351 new TypeError('name argument must not be an empty string')
1353 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1354 new TypeError('fn argument must be a function')
1356 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1359 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1363 const echoTaskFunction = data => {
1367 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1368 ).resolves.toBe(true)
1369 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1370 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1373 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1378 const taskFunctionData = { test: 'test' }
1379 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1380 expect(echoResult).toStrictEqual(taskFunctionData)
1381 for (const workerNode of dynamicThreadPool.workerNodes) {
1382 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1384 executed: expect.any(Number),
1387 sequentiallyStolen: 0,
1392 history: new CircularArray()
1395 history: new CircularArray()
1399 history: new CircularArray()
1402 history: new CircularArray()
1407 await dynamicThreadPool.destroy()
1410 it('Verify that removeTaskFunction() is working', async () => {
1411 const dynamicThreadPool = new DynamicThreadPool(
1412 Math.floor(numberOfWorkers / 2),
1414 './tests/worker-files/thread/testWorker.mjs'
1416 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1417 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1421 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1422 new Error('Cannot remove a task function not handled on the pool side')
1424 const echoTaskFunction = data => {
1427 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1428 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1429 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1432 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1437 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1440 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1441 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1442 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1446 await dynamicThreadPool.destroy()
1449 it('Verify that listTaskFunctionNames() is working', async () => {
1450 const dynamicThreadPool = new DynamicThreadPool(
1451 Math.floor(numberOfWorkers / 2),
1453 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1455 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1456 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1458 'jsonIntegerSerialization',
1462 await dynamicThreadPool.destroy()
1463 const fixedClusterPool = new FixedClusterPool(
1465 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1467 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1468 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1470 'jsonIntegerSerialization',
1474 await fixedClusterPool.destroy()
1477 it('Verify that setDefaultTaskFunction() is working', async () => {
1478 const dynamicThreadPool = new DynamicThreadPool(
1479 Math.floor(numberOfWorkers / 2),
1481 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1483 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1484 const workerId = dynamicThreadPool.workerNodes[0].info.id
1485 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1487 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1491 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1494 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1498 dynamicThreadPool.setDefaultTaskFunction('unknown')
1501 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1504 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1506 'jsonIntegerSerialization',
1511 dynamicThreadPool.setDefaultTaskFunction('factorial')
1512 ).resolves.toBe(true)
1513 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 'jsonIntegerSerialization',
1520 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1521 ).resolves.toBe(true)
1522 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 'jsonIntegerSerialization',
1528 await dynamicThreadPool.destroy()
1531 it('Verify that multiple task functions worker is working', async () => {
1532 const pool = new DynamicClusterPool(
1533 Math.floor(numberOfWorkers / 2),
1535 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1537 const data = { n: 10 }
1538 const result0 = await pool.execute(data)
1539 expect(result0).toStrictEqual({ ok: 1 })
1540 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1541 expect(result1).toStrictEqual({ ok: 1 })
1542 const result2 = await pool.execute(data, 'factorial')
1543 expect(result2).toBe(3628800)
1544 const result3 = await pool.execute(data, 'fibonacci')
1545 expect(result3).toBe(55)
1546 expect(pool.info.executingTasks).toBe(0)
1547 expect(pool.info.executedTasks).toBe(4)
1548 for (const workerNode of pool.workerNodes) {
1549 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1551 'jsonIntegerSerialization',
1555 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1556 for (const name of pool.listTaskFunctionNames()) {
1557 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1559 executed: expect.any(Number),
1563 sequentiallyStolen: 0,
1567 history: expect.any(CircularArray)
1570 history: expect.any(CircularArray)
1574 history: expect.any(CircularArray)
1577 history: expect.any(CircularArray)
1582 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1583 ).toBeGreaterThan(0)
1586 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1588 workerNode.getTaskFunctionWorkerUsage(
1589 workerNode.info.taskFunctionNames[1]
1593 await pool.destroy()
1596 it('Verify sendKillMessageToWorker()', async () => {
1597 const pool = new DynamicClusterPool(
1598 Math.floor(numberOfWorkers / 2),
1600 './tests/worker-files/cluster/testWorker.js'
1602 const workerNodeKey = 0
1604 pool.sendKillMessageToWorker(workerNodeKey)
1605 ).resolves.toBeUndefined()
1606 await pool.destroy()
1609 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1610 const pool = new DynamicClusterPool(
1611 Math.floor(numberOfWorkers / 2),
1613 './tests/worker-files/cluster/testWorker.js'
1615 const workerNodeKey = 0
1617 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1618 taskFunctionOperation: 'add',
1619 taskFunctionName: 'empty',
1620 taskFunction: (() => {}).toString()
1622 ).resolves.toBe(true)
1624 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1625 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1626 await pool.destroy()
1629 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1630 const pool = new DynamicClusterPool(
1631 Math.floor(numberOfWorkers / 2),
1633 './tests/worker-files/cluster/testWorker.js'
1636 pool.sendTaskFunctionOperationToWorkers({
1637 taskFunctionOperation: 'add',
1638 taskFunctionName: 'empty',
1639 taskFunction: (() => {}).toString()
1641 ).resolves.toBe(true)
1642 for (const workerNode of pool.workerNodes) {
1643 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1649 await pool.destroy()