1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new Error("Cannot find the worker file 'undefined'")
84 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
85 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
88 it('Verify that numberOfWorkers is checked', () => {
93 './tests/worker-files/thread/testWorker.mjs'
97 'Cannot instantiate a pool without specifying the number of workers'
102 it('Verify that a negative number of workers is checked', () => {
105 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
108 'Cannot instantiate a pool with a negative number of workers'
113 it('Verify that a non integer number of workers is checked', () => {
116 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
119 'Cannot instantiate a pool with a non safe integer number of workers'
124 it('Verify that dynamic pool sizing is checked', () => {
127 new DynamicClusterPool(
130 './tests/worker-files/cluster/testWorker.js'
134 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
139 new DynamicThreadPool(
142 './tests/worker-files/thread/testWorker.mjs'
146 'Cannot instantiate a pool with a non safe integer number of workers'
151 new DynamicClusterPool(
154 './tests/worker-files/cluster/testWorker.js'
158 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
163 new DynamicThreadPool(
166 './tests/worker-files/thread/testWorker.mjs'
170 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
175 new DynamicThreadPool(
178 './tests/worker-files/thread/testWorker.mjs'
182 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
187 new DynamicClusterPool(
190 './tests/worker-files/cluster/testWorker.js'
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
199 it('Verify that pool options are checked', async () => {
200 let pool = new FixedThreadPool(
202 './tests/worker-files/thread/testWorker.mjs'
204 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
205 expect(pool.opts).toStrictEqual({
208 restartWorkerOnError: true,
209 enableTasksQueue: false,
210 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
211 workerChoiceStrategyOptions: {
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
218 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
220 runTime: { median: false },
221 waitTime: { median: false },
222 elu: { median: false }
224 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
225 .workerChoiceStrategies) {
226 expect(workerChoiceStrategy.opts).toStrictEqual({
228 runTime: { median: false },
229 waitTime: { median: false },
230 elu: { median: false }
234 const testHandler = () => console.info('test handler executed')
235 pool = new FixedThreadPool(
237 './tests/worker-files/thread/testWorker.mjs',
239 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
240 workerChoiceStrategyOptions: {
241 runTime: { median: true },
242 weights: { 0: 300, 1: 200 }
245 restartWorkerOnError: false,
246 enableTasksQueue: true,
247 tasksQueueOptions: { concurrency: 2 },
248 messageHandler: testHandler,
249 errorHandler: testHandler,
250 onlineHandler: testHandler,
251 exitHandler: testHandler
254 expect(pool.emitter).toBeUndefined()
255 expect(pool.opts).toStrictEqual({
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
262 size: Math.pow(numberOfWorkers, 2),
264 tasksStealingOnBackPressure: true
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
269 runTime: { median: true },
270 waitTime: { median: false },
271 elu: { median: false },
272 weights: { 0: 300, 1: 200 }
274 onlineHandler: testHandler,
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 exitHandler: testHandler
279 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
281 runTime: { median: true },
282 waitTime: { median: false },
283 elu: { median: false },
284 weights: { 0: 300, 1: 200 }
286 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
287 .workerChoiceStrategies) {
288 expect(workerChoiceStrategy.opts).toStrictEqual({
290 runTime: { median: true },
291 waitTime: { median: false },
292 elu: { median: false },
293 weights: { 0: 300, 1: 200 }
299 it('Verify that pool options are validated', () => {
304 './tests/worker-files/thread/testWorker.mjs',
306 workerChoiceStrategy: 'invalidStrategy'
309 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
314 './tests/worker-files/thread/testWorker.mjs',
316 workerChoiceStrategyOptions: {
317 retries: 'invalidChoiceRetries'
323 'Invalid worker choice strategy options: retries must be an integer'
330 './tests/worker-files/thread/testWorker.mjs',
332 workerChoiceStrategyOptions: {
339 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
346 './tests/worker-files/thread/testWorker.mjs',
348 workerChoiceStrategyOptions: { weights: {} }
353 'Invalid worker choice strategy options: must have a weight for each worker node'
360 './tests/worker-files/thread/testWorker.mjs',
362 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
367 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: 'invalidTasksQueueOptions'
381 new TypeError('Invalid tasks queue options: must be a plain object')
387 './tests/worker-files/thread/testWorker.mjs',
389 enableTasksQueue: true,
390 tasksQueueOptions: { concurrency: 0 }
395 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { concurrency: -1 }
410 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { concurrency: 0.2 }
424 new TypeError('Invalid worker node tasks concurrency: must be an integer')
430 './tests/worker-files/thread/testWorker.mjs',
432 enableTasksQueue: true,
433 tasksQueueOptions: { size: 0 }
438 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
445 './tests/worker-files/thread/testWorker.mjs',
447 enableTasksQueue: true,
448 tasksQueueOptions: { size: -1 }
453 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
460 './tests/worker-files/thread/testWorker.mjs',
462 enableTasksQueue: true,
463 tasksQueueOptions: { size: 0.2 }
467 new TypeError('Invalid worker node tasks queue size: must be an integer')
471 it('Verify that pool worker choice strategy options can be set', async () => {
472 const pool = new FixedThreadPool(
474 './tests/worker-files/thread/testWorker.mjs',
475 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
477 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false }
483 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
485 runTime: { median: false },
486 waitTime: { median: false },
487 elu: { median: false }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
493 runTime: { median: false },
494 waitTime: { median: false },
495 elu: { median: false }
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: true },
519 elu: { median: true }
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
523 runTime: { median: true },
524 waitTime: { median: false },
525 elu: { median: true }
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
529 runTime: { median: true },
530 waitTime: { median: false },
531 elu: { median: true }
533 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
534 .workerChoiceStrategies) {
535 expect(workerChoiceStrategy.opts).toStrictEqual({
537 runTime: { median: true },
538 waitTime: { median: false },
539 elu: { median: true }
543 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
561 pool.setWorkerChoiceStrategyOptions({
562 runTime: { median: false },
563 elu: { median: false }
565 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
567 runTime: { median: false },
568 waitTime: { median: false },
569 elu: { median: false }
571 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
573 runTime: { median: false },
574 waitTime: { median: false },
575 elu: { median: false }
577 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
578 .workerChoiceStrategies) {
579 expect(workerChoiceStrategy.opts).toStrictEqual({
581 runTime: { median: false },
582 waitTime: { median: false },
583 elu: { median: false }
587 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
606 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
609 'Invalid worker choice strategy options: must be a plain object'
613 pool.setWorkerChoiceStrategyOptions({
614 retries: 'invalidChoiceRetries'
618 'Invalid worker choice strategy options: retries must be an integer'
621 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
623 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
644 './tests/worker-files/thread/testWorker.mjs'
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 size: Math.pow(numberOfWorkers, 2),
654 tasksStealingOnBackPressure: true
656 pool.enableTasksQueue(true, { concurrency: 2 })
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
660 size: Math.pow(numberOfWorkers, 2),
662 tasksStealingOnBackPressure: true
664 pool.enableTasksQueue(false)
665 expect(pool.opts.enableTasksQueue).toBe(false)
666 expect(pool.opts.tasksQueueOptions).toBeUndefined()
670 it('Verify that pool tasks queue options can be set', async () => {
671 const pool = new FixedThreadPool(
673 './tests/worker-files/thread/testWorker.mjs',
674 { enableTasksQueue: true }
676 expect(pool.opts.tasksQueueOptions).toStrictEqual({
678 size: Math.pow(numberOfWorkers, 2),
680 tasksStealingOnBackPressure: true
682 for (const workerNode of pool.workerNodes) {
683 expect(workerNode.tasksQueueBackPressureSize).toBe(
684 pool.opts.tasksQueueOptions.size
687 pool.setTasksQueueOptions({
691 tasksStealingOnBackPressure: false
693 expect(pool.opts.tasksQueueOptions).toStrictEqual({
697 tasksStealingOnBackPressure: false
699 for (const workerNode of pool.workerNodes) {
700 expect(workerNode.tasksQueueBackPressureSize).toBe(
701 pool.opts.tasksQueueOptions.size
704 pool.setTasksQueueOptions({
707 tasksStealingOnBackPressure: true
709 expect(pool.opts.tasksQueueOptions).toStrictEqual({
711 size: Math.pow(numberOfWorkers, 2),
713 tasksStealingOnBackPressure: true
715 for (const workerNode of pool.workerNodes) {
716 expect(workerNode.tasksQueueBackPressureSize).toBe(
717 pool.opts.tasksQueueOptions.size
720 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
721 new TypeError('Invalid tasks queue options: must be a plain object')
723 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
725 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
728 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
730 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
733 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
734 new TypeError('Invalid worker node tasks concurrency: must be an integer')
736 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
738 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
741 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
743 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
746 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
747 new TypeError('Invalid worker node tasks queue size: must be an integer')
752 it('Verify that pool info is set', async () => {
753 let pool = new FixedThreadPool(
755 './tests/worker-files/thread/testWorker.mjs'
757 expect(pool.info).toStrictEqual({
759 type: PoolTypes.fixed,
760 worker: WorkerTypes.thread,
763 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
764 minSize: numberOfWorkers,
765 maxSize: numberOfWorkers,
766 workerNodes: numberOfWorkers,
767 idleWorkerNodes: numberOfWorkers,
774 pool = new DynamicClusterPool(
775 Math.floor(numberOfWorkers / 2),
777 './tests/worker-files/cluster/testWorker.js'
779 expect(pool.info).toStrictEqual({
781 type: PoolTypes.dynamic,
782 worker: WorkerTypes.cluster,
785 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
786 minSize: Math.floor(numberOfWorkers / 2),
787 maxSize: numberOfWorkers,
788 workerNodes: Math.floor(numberOfWorkers / 2),
789 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
798 it('Verify that pool worker tasks usage are initialized', async () => {
799 const pool = new FixedClusterPool(
801 './tests/worker-files/cluster/testWorker.js'
803 for (const workerNode of pool.workerNodes) {
804 expect(workerNode).toBeInstanceOf(WorkerNode)
805 expect(workerNode.usage).toStrictEqual({
811 sequentiallyStolen: 0,
816 history: new CircularArray()
819 history: new CircularArray()
823 history: new CircularArray()
826 history: new CircularArray()
834 it('Verify that pool worker tasks queue are initialized', async () => {
835 let pool = new FixedClusterPool(
837 './tests/worker-files/cluster/testWorker.js'
839 for (const workerNode of pool.workerNodes) {
840 expect(workerNode).toBeInstanceOf(WorkerNode)
841 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
842 expect(workerNode.tasksQueue.size).toBe(0)
843 expect(workerNode.tasksQueue.maxSize).toBe(0)
846 pool = new DynamicThreadPool(
847 Math.floor(numberOfWorkers / 2),
849 './tests/worker-files/thread/testWorker.mjs'
851 for (const workerNode of pool.workerNodes) {
852 expect(workerNode).toBeInstanceOf(WorkerNode)
853 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
854 expect(workerNode.tasksQueue.size).toBe(0)
855 expect(workerNode.tasksQueue.maxSize).toBe(0)
860 it('Verify that pool worker info are initialized', async () => {
861 let pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.js'
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.info).toStrictEqual({
868 id: expect.any(Number),
869 type: WorkerTypes.cluster,
875 pool = new DynamicThreadPool(
876 Math.floor(numberOfWorkers / 2),
878 './tests/worker-files/thread/testWorker.mjs'
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
882 expect(workerNode.info).toStrictEqual({
883 id: expect.any(Number),
884 type: WorkerTypes.thread,
892 it('Verify that pool statuses are checked at start or destroy', async () => {
893 const pool = new FixedThreadPool(
895 './tests/worker-files/thread/testWorker.mjs'
897 expect(pool.info.started).toBe(true)
898 expect(pool.info.ready).toBe(true)
899 expect(() => pool.start()).toThrow(
900 new Error('Cannot start an already started pool')
903 expect(pool.info.started).toBe(false)
904 expect(pool.info.ready).toBe(false)
905 await expect(pool.destroy()).rejects.toThrow(
906 new Error('Cannot destroy an already destroyed pool')
910 it('Verify that pool can be started after initialization', async () => {
911 const pool = new FixedClusterPool(
913 './tests/worker-files/cluster/testWorker.js',
918 expect(pool.info.started).toBe(false)
919 expect(pool.info.ready).toBe(false)
920 expect(pool.readyEventEmitted).toBe(false)
921 expect(pool.workerNodes).toStrictEqual([])
922 await expect(pool.execute()).rejects.toThrow(
923 new Error('Cannot execute a task on not started pool')
926 expect(pool.info.started).toBe(true)
927 expect(pool.info.ready).toBe(true)
928 await waitPoolEvents(pool, PoolEvents.ready, 1)
929 expect(pool.readyEventEmitted).toBe(true)
930 expect(pool.workerNodes.length).toBe(numberOfWorkers)
931 for (const workerNode of pool.workerNodes) {
932 expect(workerNode).toBeInstanceOf(WorkerNode)
937 it('Verify that pool execute() arguments are checked', async () => {
938 const pool = new FixedClusterPool(
940 './tests/worker-files/cluster/testWorker.js'
942 await expect(pool.execute(undefined, 0)).rejects.toThrow(
943 new TypeError('name argument must be a string')
945 await expect(pool.execute(undefined, '')).rejects.toThrow(
946 new TypeError('name argument must not be an empty string')
948 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
949 new TypeError('transferList argument must be an array')
951 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
952 "Task function 'unknown' not found"
955 await expect(pool.execute()).rejects.toThrow(
956 new Error('Cannot execute a task on not started pool')
960 it('Verify that pool worker tasks usage are computed', async () => {
961 const pool = new FixedClusterPool(
963 './tests/worker-files/cluster/testWorker.js'
965 const promises = new Set()
966 const maxMultiplier = 2
967 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
968 promises.add(pool.execute())
970 for (const workerNode of pool.workerNodes) {
971 expect(workerNode.usage).toStrictEqual({
974 executing: maxMultiplier,
977 sequentiallyStolen: 0,
982 history: expect.any(CircularArray)
985 history: expect.any(CircularArray)
989 history: expect.any(CircularArray)
992 history: expect.any(CircularArray)
997 await Promise.all(promises)
998 for (const workerNode of pool.workerNodes) {
999 expect(workerNode.usage).toStrictEqual({
1001 executed: maxMultiplier,
1005 sequentiallyStolen: 0,
1010 history: expect.any(CircularArray)
1013 history: expect.any(CircularArray)
1017 history: expect.any(CircularArray)
1020 history: expect.any(CircularArray)
1025 await pool.destroy()
1028 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1029 const pool = new DynamicThreadPool(
1030 Math.floor(numberOfWorkers / 2),
1032 './tests/worker-files/thread/testWorker.mjs'
1034 const promises = new Set()
1035 const maxMultiplier = 2
1036 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1037 promises.add(pool.execute())
1039 await Promise.all(promises)
1040 for (const workerNode of pool.workerNodes) {
1041 expect(workerNode.usage).toStrictEqual({
1043 executed: expect.any(Number),
1047 sequentiallyStolen: 0,
1052 history: expect.any(CircularArray)
1055 history: expect.any(CircularArray)
1059 history: expect.any(CircularArray)
1062 history: expect.any(CircularArray)
1066 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1067 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1068 numberOfWorkers * maxMultiplier
1070 expect(workerNode.usage.runTime.history.length).toBe(0)
1071 expect(workerNode.usage.waitTime.history.length).toBe(0)
1072 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1073 expect(workerNode.usage.elu.active.history.length).toBe(0)
1075 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1076 for (const workerNode of pool.workerNodes) {
1077 expect(workerNode.usage).toStrictEqual({
1083 sequentiallyStolen: 0,
1088 history: expect.any(CircularArray)
1091 history: expect.any(CircularArray)
1095 history: expect.any(CircularArray)
1098 history: expect.any(CircularArray)
1102 expect(workerNode.usage.runTime.history.length).toBe(0)
1103 expect(workerNode.usage.waitTime.history.length).toBe(0)
1104 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1105 expect(workerNode.usage.elu.active.history.length).toBe(0)
1107 await pool.destroy()
1110 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1111 const pool = new DynamicClusterPool(
1112 Math.floor(numberOfWorkers / 2),
1114 './tests/worker-files/cluster/testWorker.js'
1116 expect(pool.emitter.eventNames()).toStrictEqual([])
1119 pool.emitter.on(PoolEvents.ready, info => {
1123 await waitPoolEvents(pool, PoolEvents.ready, 1)
1124 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1125 expect(poolReady).toBe(1)
1126 expect(poolInfo).toStrictEqual({
1128 type: PoolTypes.dynamic,
1129 worker: WorkerTypes.cluster,
1132 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1133 minSize: expect.any(Number),
1134 maxSize: expect.any(Number),
1135 workerNodes: expect.any(Number),
1136 idleWorkerNodes: expect.any(Number),
1137 busyWorkerNodes: expect.any(Number),
1138 executedTasks: expect.any(Number),
1139 executingTasks: expect.any(Number),
1140 failedTasks: expect.any(Number)
1142 await pool.destroy()
1145 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1146 const pool = new FixedThreadPool(
1148 './tests/worker-files/thread/testWorker.mjs'
1150 expect(pool.emitter.eventNames()).toStrictEqual([])
1151 const promises = new Set()
1154 pool.emitter.on(PoolEvents.busy, info => {
1158 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1159 for (let i = 0; i < numberOfWorkers * 2; i++) {
1160 promises.add(pool.execute())
1162 await Promise.all(promises)
1163 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1164 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1165 expect(poolBusy).toBe(numberOfWorkers + 1)
1166 expect(poolInfo).toStrictEqual({
1168 type: PoolTypes.fixed,
1169 worker: WorkerTypes.thread,
1172 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1173 minSize: expect.any(Number),
1174 maxSize: expect.any(Number),
1175 workerNodes: expect.any(Number),
1176 idleWorkerNodes: expect.any(Number),
1177 busyWorkerNodes: expect.any(Number),
1178 executedTasks: expect.any(Number),
1179 executingTasks: expect.any(Number),
1180 failedTasks: expect.any(Number)
1182 await pool.destroy()
1185 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1186 const pool = new DynamicThreadPool(
1187 Math.floor(numberOfWorkers / 2),
1189 './tests/worker-files/thread/testWorker.mjs'
1191 expect(pool.emitter.eventNames()).toStrictEqual([])
1192 const promises = new Set()
1195 pool.emitter.on(PoolEvents.full, info => {
1199 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1200 for (let i = 0; i < numberOfWorkers * 2; i++) {
1201 promises.add(pool.execute())
1203 await Promise.all(promises)
1204 expect(poolFull).toBe(1)
1205 expect(poolInfo).toStrictEqual({
1207 type: PoolTypes.dynamic,
1208 worker: WorkerTypes.thread,
1211 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 busyWorkerNodes: expect.any(Number),
1217 executedTasks: expect.any(Number),
1218 executingTasks: expect.any(Number),
1219 failedTasks: expect.any(Number)
1221 await pool.destroy()
1224 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1225 const pool = new FixedThreadPool(
1227 './tests/worker-files/thread/testWorker.mjs',
1229 enableTasksQueue: true
1232 stub(pool, 'hasBackPressure').returns(true)
1233 expect(pool.emitter.eventNames()).toStrictEqual([])
1234 const promises = new Set()
1235 let poolBackPressure = 0
1237 pool.emitter.on(PoolEvents.backPressure, info => {
1241 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1242 for (let i = 0; i < numberOfWorkers + 1; i++) {
1243 promises.add(pool.execute())
1245 await Promise.all(promises)
1246 expect(poolBackPressure).toBe(1)
1247 expect(poolInfo).toStrictEqual({
1249 type: PoolTypes.fixed,
1250 worker: WorkerTypes.thread,
1253 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1254 minSize: expect.any(Number),
1255 maxSize: expect.any(Number),
1256 workerNodes: expect.any(Number),
1257 idleWorkerNodes: expect.any(Number),
1258 busyWorkerNodes: expect.any(Number),
1259 executedTasks: expect.any(Number),
1260 executingTasks: expect.any(Number),
1261 maxQueuedTasks: expect.any(Number),
1262 queuedTasks: expect.any(Number),
1264 stolenTasks: expect.any(Number),
1265 failedTasks: expect.any(Number)
1267 expect(pool.hasBackPressure.called).toBe(true)
1268 await pool.destroy()
1271 it('Verify that pool asynchronous resource track tasks execution', async () => {
1276 let resolveCalls = 0
1277 const hook = createHook({
1278 init (asyncId, type) {
1279 if (type === 'poolifier:task') {
1281 taskAsyncId = asyncId
1285 if (asyncId === taskAsyncId) beforeCalls++
1288 if (asyncId === taskAsyncId) afterCalls++
1291 if (executionAsyncId() === taskAsyncId) resolveCalls++
1295 const pool = new FixedThreadPool(
1297 './tests/worker-files/thread/testWorker.mjs'
1299 await pool.execute()
1301 expect(initCalls).toBe(1)
1302 expect(beforeCalls).toBe(1)
1303 expect(afterCalls).toBe(1)
1304 expect(resolveCalls).toBe(1)
1307 it('Verify that hasTaskFunction() is working', async () => {
1308 const dynamicThreadPool = new DynamicThreadPool(
1309 Math.floor(numberOfWorkers / 2),
1311 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1313 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1314 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1315 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1318 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1320 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1321 await dynamicThreadPool.destroy()
1322 const fixedClusterPool = new FixedClusterPool(
1324 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1326 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1327 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1328 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1331 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1333 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1334 await fixedClusterPool.destroy()
1337 it('Verify that addTaskFunction() is working', async () => {
1338 const dynamicThreadPool = new DynamicThreadPool(
1339 Math.floor(numberOfWorkers / 2),
1341 './tests/worker-files/thread/testWorker.mjs'
1343 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1345 dynamicThreadPool.addTaskFunction(0, () => {})
1346 ).rejects.toThrow(new TypeError('name argument must be a string'))
1348 dynamicThreadPool.addTaskFunction('', () => {})
1350 new TypeError('name argument must not be an empty string')
1352 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1353 new TypeError('fn argument must be a function')
1355 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1356 new TypeError('fn argument must be a function')
1358 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1362 const echoTaskFunction = data => {
1366 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1367 ).resolves.toBe(true)
1368 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1369 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1372 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 const taskFunctionData = { test: 'test' }
1378 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1379 expect(echoResult).toStrictEqual(taskFunctionData)
1380 for (const workerNode of dynamicThreadPool.workerNodes) {
1381 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1383 executed: expect.any(Number),
1386 sequentiallyStolen: 0,
1391 history: new CircularArray()
1394 history: new CircularArray()
1398 history: new CircularArray()
1401 history: new CircularArray()
1406 await dynamicThreadPool.destroy()
1409 it('Verify that removeTaskFunction() is working', async () => {
1410 const dynamicThreadPool = new DynamicThreadPool(
1411 Math.floor(numberOfWorkers / 2),
1413 './tests/worker-files/thread/testWorker.mjs'
1415 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1416 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1420 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1421 new Error('Cannot remove a task function not handled on the pool side')
1423 const echoTaskFunction = data => {
1426 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1427 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1428 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1431 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1436 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1439 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1440 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1441 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 await dynamicThreadPool.destroy()
1448 it('Verify that listTaskFunctionNames() is working', async () => {
1449 const dynamicThreadPool = new DynamicThreadPool(
1450 Math.floor(numberOfWorkers / 2),
1452 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1454 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1457 'jsonIntegerSerialization',
1461 await dynamicThreadPool.destroy()
1462 const fixedClusterPool = new FixedClusterPool(
1464 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1466 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1467 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1469 'jsonIntegerSerialization',
1473 await fixedClusterPool.destroy()
1476 it('Verify that setDefaultTaskFunction() is working', async () => {
1477 const dynamicThreadPool = new DynamicThreadPool(
1478 Math.floor(numberOfWorkers / 2),
1480 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1482 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1483 const workerId = dynamicThreadPool.workerNodes[0].info.id
1484 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1486 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1490 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1493 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1497 dynamicThreadPool.setDefaultTaskFunction('unknown')
1500 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1503 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1505 'jsonIntegerSerialization',
1510 dynamicThreadPool.setDefaultTaskFunction('factorial')
1511 ).resolves.toBe(true)
1512 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1515 'jsonIntegerSerialization',
1519 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1520 ).resolves.toBe(true)
1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1524 'jsonIntegerSerialization',
1527 await dynamicThreadPool.destroy()
1530 it('Verify that multiple task functions worker is working', async () => {
1531 const pool = new DynamicClusterPool(
1532 Math.floor(numberOfWorkers / 2),
1534 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1536 const data = { n: 10 }
1537 const result0 = await pool.execute(data)
1538 expect(result0).toStrictEqual({ ok: 1 })
1539 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1540 expect(result1).toStrictEqual({ ok: 1 })
1541 const result2 = await pool.execute(data, 'factorial')
1542 expect(result2).toBe(3628800)
1543 const result3 = await pool.execute(data, 'fibonacci')
1544 expect(result3).toBe(55)
1545 expect(pool.info.executingTasks).toBe(0)
1546 expect(pool.info.executedTasks).toBe(4)
1547 for (const workerNode of pool.workerNodes) {
1548 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1550 'jsonIntegerSerialization',
1554 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1555 for (const name of pool.listTaskFunctionNames()) {
1556 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1558 executed: expect.any(Number),
1562 sequentiallyStolen: 0,
1566 history: expect.any(CircularArray)
1569 history: expect.any(CircularArray)
1573 history: expect.any(CircularArray)
1576 history: expect.any(CircularArray)
1581 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1582 ).toBeGreaterThan(0)
1585 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1587 workerNode.getTaskFunctionWorkerUsage(
1588 workerNode.info.taskFunctionNames[1]
1592 await pool.destroy()
1595 it('Verify sendKillMessageToWorker()', async () => {
1596 const pool = new DynamicClusterPool(
1597 Math.floor(numberOfWorkers / 2),
1599 './tests/worker-files/cluster/testWorker.js'
1601 const workerNodeKey = 0
1603 pool.sendKillMessageToWorker(workerNodeKey)
1604 ).resolves.toBeUndefined()
1605 await pool.destroy()
1608 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1609 const pool = new DynamicClusterPool(
1610 Math.floor(numberOfWorkers / 2),
1612 './tests/worker-files/cluster/testWorker.js'
1614 const workerNodeKey = 0
1616 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1617 taskFunctionOperation: 'add',
1618 taskFunctionName: 'empty',
1619 taskFunction: (() => {}).toString()
1621 ).resolves.toBe(true)
1623 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1624 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1625 await pool.destroy()
1628 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1629 const pool = new DynamicClusterPool(
1630 Math.floor(numberOfWorkers / 2),
1632 './tests/worker-files/cluster/testWorker.js'
1635 pool.sendTaskFunctionOperationToWorkers({
1636 taskFunctionOperation: 'add',
1637 taskFunctionName: 'empty',
1638 taskFunction: (() => {}).toString()
1640 ).resolves.toBe(true)
1641 for (const workerNode of pool.workerNodes) {
1642 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1648 await pool.destroy()