1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries: pool.info.maxSize,
233 runTime: { median: false },
234 waitTime: { median: false },
235 elu: { median: false }
237 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
238 .workerChoiceStrategies) {
239 expect(workerChoiceStrategy.opts).toStrictEqual({
240 retries: pool.info.maxSize,
241 runTime: { median: false },
242 waitTime: { median: false },
243 elu: { median: false }
247 const testHandler = () => console.info('test handler executed')
248 pool = new FixedThreadPool(
250 './tests/worker-files/thread/testWorker.mjs',
252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
253 workerChoiceStrategyOptions: {
254 runTime: { median: true },
255 weights: { 0: 300, 1: 200 }
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
260 tasksQueueOptions: { concurrency: 2 },
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
267 expect(pool.emitter).toBeUndefined()
268 expect(pool.opts).toStrictEqual({
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
275 size: Math.pow(numberOfWorkers, 2),
277 tasksStealingOnBackPressure: true,
278 tasksFinishedTimeout: 2000
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
282 runTime: { median: true },
283 weights: { 0: 300, 1: 200 }
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
290 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
293 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
299 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
300 .workerChoiceStrategies) {
301 expect(workerChoiceStrategy.opts).toStrictEqual({
304 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
305 runTime: { median: true },
306 waitTime: { median: false },
307 elu: { median: false },
308 weights: { 0: 300, 1: 200 }
314 it('Verify that pool options are validated', () => {
319 './tests/worker-files/thread/testWorker.mjs',
321 workerChoiceStrategy: 'invalidStrategy'
324 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategyOptions: { weights: {} }
336 'Invalid worker choice strategy options: must have a weight for each worker node'
343 './tests/worker-files/thread/testWorker.mjs',
345 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
350 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
357 './tests/worker-files/thread/testWorker.mjs',
359 enableTasksQueue: true,
360 tasksQueueOptions: 'invalidTasksQueueOptions'
364 new TypeError('Invalid tasks queue options: must be a plain object')
370 './tests/worker-files/thread/testWorker.mjs',
372 enableTasksQueue: true,
373 tasksQueueOptions: { concurrency: 0 }
378 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
385 './tests/worker-files/thread/testWorker.mjs',
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: -1 }
393 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
400 './tests/worker-files/thread/testWorker.mjs',
402 enableTasksQueue: true,
403 tasksQueueOptions: { concurrency: 0.2 }
407 new TypeError('Invalid worker node tasks concurrency: must be an integer')
413 './tests/worker-files/thread/testWorker.mjs',
415 enableTasksQueue: true,
416 tasksQueueOptions: { size: 0 }
421 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
428 './tests/worker-files/thread/testWorker.mjs',
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: -1 }
436 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
443 './tests/worker-files/thread/testWorker.mjs',
445 enableTasksQueue: true,
446 tasksQueueOptions: { size: 0.2 }
450 new TypeError('Invalid worker node tasks queue size: must be an integer')
454 it('Verify that pool worker choice strategy options can be set', async () => {
455 const pool = new FixedThreadPool(
457 './tests/worker-files/thread/testWorker.mjs',
458 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
460 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
461 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
462 retries: pool.info.maxSize,
463 runTime: { median: false },
464 waitTime: { median: false },
465 elu: { median: false }
467 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
468 .workerChoiceStrategies) {
469 expect(workerChoiceStrategy.opts).toStrictEqual({
470 retries: pool.info.maxSize,
471 runTime: { median: false },
472 waitTime: { median: false },
473 elu: { median: false }
477 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
495 pool.setWorkerChoiceStrategyOptions({
496 runTime: { median: true },
497 elu: { median: true }
499 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
500 runTime: { median: true },
501 elu: { median: true }
503 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
504 retries: pool.info.maxSize,
505 runTime: { median: true },
506 waitTime: { median: false },
507 elu: { median: true }
509 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
510 .workerChoiceStrategies) {
511 expect(workerChoiceStrategy.opts).toStrictEqual({
512 retries: pool.info.maxSize,
513 runTime: { median: true },
514 waitTime: { median: false },
515 elu: { median: true }
519 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
537 pool.setWorkerChoiceStrategyOptions({
538 runTime: { median: false },
539 elu: { median: false }
541 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
542 runTime: { median: false },
543 elu: { median: false }
545 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
546 retries: pool.info.maxSize,
547 runTime: { median: false },
548 waitTime: { median: false },
549 elu: { median: false }
551 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
552 .workerChoiceStrategies) {
553 expect(workerChoiceStrategy.opts).toStrictEqual({
554 retries: pool.info.maxSize,
555 runTime: { median: false },
556 waitTime: { median: false },
557 elu: { median: false }
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
580 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
583 'Invalid worker choice strategy options: must be a plain object'
586 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
588 'Invalid worker choice strategy options: must have a weight for each worker node'
592 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
595 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
601 it('Verify that pool tasks queue can be enabled/disabled', async () => {
602 const pool = new FixedThreadPool(
604 './tests/worker-files/thread/testWorker.mjs'
606 expect(pool.opts.enableTasksQueue).toBe(false)
607 expect(pool.opts.tasksQueueOptions).toBeUndefined()
608 pool.enableTasksQueue(true)
609 expect(pool.opts.enableTasksQueue).toBe(true)
610 expect(pool.opts.tasksQueueOptions).toStrictEqual({
612 size: Math.pow(numberOfWorkers, 2),
614 tasksStealingOnBackPressure: true,
615 tasksFinishedTimeout: 2000
617 pool.enableTasksQueue(true, { concurrency: 2 })
618 expect(pool.opts.enableTasksQueue).toBe(true)
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
621 size: Math.pow(numberOfWorkers, 2),
623 tasksStealingOnBackPressure: true,
624 tasksFinishedTimeout: 2000
626 pool.enableTasksQueue(false)
627 expect(pool.opts.enableTasksQueue).toBe(false)
628 expect(pool.opts.tasksQueueOptions).toBeUndefined()
632 it('Verify that pool tasks queue options can be set', async () => {
633 const pool = new FixedThreadPool(
635 './tests/worker-files/thread/testWorker.mjs',
636 { enableTasksQueue: true }
638 expect(pool.opts.tasksQueueOptions).toStrictEqual({
640 size: Math.pow(numberOfWorkers, 2),
642 tasksStealingOnBackPressure: true,
643 tasksFinishedTimeout: 2000
645 for (const workerNode of pool.workerNodes) {
646 expect(workerNode.tasksQueueBackPressureSize).toBe(
647 pool.opts.tasksQueueOptions.size
650 pool.setTasksQueueOptions({
654 tasksStealingOnBackPressure: false,
655 tasksFinishedTimeout: 3000
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
661 tasksStealingOnBackPressure: false,
662 tasksFinishedTimeout: 3000
664 for (const workerNode of pool.workerNodes) {
665 expect(workerNode.tasksQueueBackPressureSize).toBe(
666 pool.opts.tasksQueueOptions.size
669 pool.setTasksQueueOptions({
672 tasksStealingOnBackPressure: true
674 expect(pool.opts.tasksQueueOptions).toStrictEqual({
676 size: Math.pow(numberOfWorkers, 2),
678 tasksStealingOnBackPressure: true,
679 tasksFinishedTimeout: 2000
681 for (const workerNode of pool.workerNodes) {
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
686 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
687 new TypeError('Invalid tasks queue options: must be a plain object')
689 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
691 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
694 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
696 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
699 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
700 new TypeError('Invalid worker node tasks concurrency: must be an integer')
702 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
704 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
707 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
709 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
712 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
713 new TypeError('Invalid worker node tasks queue size: must be an integer')
718 it('Verify that pool info is set', async () => {
719 let pool = new FixedThreadPool(
721 './tests/worker-files/thread/testWorker.mjs'
723 expect(pool.info).toStrictEqual({
725 type: PoolTypes.fixed,
726 worker: WorkerTypes.thread,
729 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
730 minSize: numberOfWorkers,
731 maxSize: numberOfWorkers,
732 workerNodes: numberOfWorkers,
733 idleWorkerNodes: numberOfWorkers,
740 pool = new DynamicClusterPool(
741 Math.floor(numberOfWorkers / 2),
743 './tests/worker-files/cluster/testWorker.js'
745 expect(pool.info).toStrictEqual({
747 type: PoolTypes.dynamic,
748 worker: WorkerTypes.cluster,
751 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
752 minSize: Math.floor(numberOfWorkers / 2),
753 maxSize: numberOfWorkers,
754 workerNodes: Math.floor(numberOfWorkers / 2),
755 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
764 it('Verify that pool worker tasks usage are initialized', async () => {
765 const pool = new FixedClusterPool(
767 './tests/worker-files/cluster/testWorker.js'
769 for (const workerNode of pool.workerNodes) {
770 expect(workerNode).toBeInstanceOf(WorkerNode)
771 expect(workerNode.usage).toStrictEqual({
777 sequentiallyStolen: 0,
782 history: new CircularArray()
785 history: new CircularArray()
789 history: new CircularArray()
792 history: new CircularArray()
800 it('Verify that pool worker tasks queue are initialized', async () => {
801 let pool = new FixedClusterPool(
803 './tests/worker-files/cluster/testWorker.js'
805 for (const workerNode of pool.workerNodes) {
806 expect(workerNode).toBeInstanceOf(WorkerNode)
807 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
808 expect(workerNode.tasksQueue.size).toBe(0)
809 expect(workerNode.tasksQueue.maxSize).toBe(0)
812 pool = new DynamicThreadPool(
813 Math.floor(numberOfWorkers / 2),
815 './tests/worker-files/thread/testWorker.mjs'
817 for (const workerNode of pool.workerNodes) {
818 expect(workerNode).toBeInstanceOf(WorkerNode)
819 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
820 expect(workerNode.tasksQueue.size).toBe(0)
821 expect(workerNode.tasksQueue.maxSize).toBe(0)
826 it('Verify that pool worker info are initialized', async () => {
827 let pool = new FixedClusterPool(
829 './tests/worker-files/cluster/testWorker.js'
831 for (const workerNode of pool.workerNodes) {
832 expect(workerNode).toBeInstanceOf(WorkerNode)
833 expect(workerNode.info).toStrictEqual({
834 id: expect.any(Number),
835 type: WorkerTypes.cluster,
841 pool = new DynamicThreadPool(
842 Math.floor(numberOfWorkers / 2),
844 './tests/worker-files/thread/testWorker.mjs'
846 for (const workerNode of pool.workerNodes) {
847 expect(workerNode).toBeInstanceOf(WorkerNode)
848 expect(workerNode.info).toStrictEqual({
849 id: expect.any(Number),
850 type: WorkerTypes.thread,
858 it('Verify that pool statuses are checked at start or destroy', async () => {
859 const pool = new FixedThreadPool(
861 './tests/worker-files/thread/testWorker.mjs'
863 expect(pool.info.started).toBe(true)
864 expect(pool.info.ready).toBe(true)
865 expect(() => pool.start()).toThrow(
866 new Error('Cannot start an already started pool')
869 expect(pool.info.started).toBe(false)
870 expect(pool.info.ready).toBe(false)
871 await expect(pool.destroy()).rejects.toThrow(
872 new Error('Cannot destroy an already destroyed pool')
876 it('Verify that pool can be started after initialization', async () => {
877 const pool = new FixedClusterPool(
879 './tests/worker-files/cluster/testWorker.js',
884 expect(pool.info.started).toBe(false)
885 expect(pool.info.ready).toBe(false)
886 expect(pool.readyEventEmitted).toBe(false)
887 expect(pool.workerNodes).toStrictEqual([])
888 await expect(pool.execute()).rejects.toThrow(
889 new Error('Cannot execute a task on not started pool')
892 expect(pool.info.started).toBe(true)
893 expect(pool.info.ready).toBe(true)
894 await waitPoolEvents(pool, PoolEvents.ready, 1)
895 expect(pool.readyEventEmitted).toBe(true)
896 expect(pool.workerNodes.length).toBe(numberOfWorkers)
897 for (const workerNode of pool.workerNodes) {
898 expect(workerNode).toBeInstanceOf(WorkerNode)
903 it('Verify that pool execute() arguments are checked', async () => {
904 const pool = new FixedClusterPool(
906 './tests/worker-files/cluster/testWorker.js'
908 await expect(pool.execute(undefined, 0)).rejects.toThrow(
909 new TypeError('name argument must be a string')
911 await expect(pool.execute(undefined, '')).rejects.toThrow(
912 new TypeError('name argument must not be an empty string')
914 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
915 new TypeError('transferList argument must be an array')
917 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
918 "Task function 'unknown' not found"
921 await expect(pool.execute()).rejects.toThrow(
922 new Error('Cannot execute a task on not started pool')
926 it('Verify that pool worker tasks usage are computed', async () => {
927 const pool = new FixedClusterPool(
929 './tests/worker-files/cluster/testWorker.js'
931 const promises = new Set()
932 const maxMultiplier = 2
933 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
934 promises.add(pool.execute())
936 for (const workerNode of pool.workerNodes) {
937 expect(workerNode.usage).toStrictEqual({
940 executing: maxMultiplier,
943 sequentiallyStolen: 0,
948 history: expect.any(CircularArray)
951 history: expect.any(CircularArray)
955 history: expect.any(CircularArray)
958 history: expect.any(CircularArray)
963 await Promise.all(promises)
964 for (const workerNode of pool.workerNodes) {
965 expect(workerNode.usage).toStrictEqual({
967 executed: maxMultiplier,
971 sequentiallyStolen: 0,
976 history: expect.any(CircularArray)
979 history: expect.any(CircularArray)
983 history: expect.any(CircularArray)
986 history: expect.any(CircularArray)
994 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
995 const pool = new DynamicThreadPool(
996 Math.floor(numberOfWorkers / 2),
998 './tests/worker-files/thread/testWorker.mjs'
1000 const promises = new Set()
1001 const maxMultiplier = 2
1002 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1003 promises.add(pool.execute())
1005 await Promise.all(promises)
1006 for (const workerNode of pool.workerNodes) {
1007 expect(workerNode.usage).toStrictEqual({
1009 executed: expect.any(Number),
1013 sequentiallyStolen: 0,
1018 history: expect.any(CircularArray)
1021 history: expect.any(CircularArray)
1025 history: expect.any(CircularArray)
1028 history: expect.any(CircularArray)
1032 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1033 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1034 numberOfWorkers * maxMultiplier
1036 expect(workerNode.usage.runTime.history.length).toBe(0)
1037 expect(workerNode.usage.waitTime.history.length).toBe(0)
1038 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1039 expect(workerNode.usage.elu.active.history.length).toBe(0)
1041 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1042 for (const workerNode of pool.workerNodes) {
1043 expect(workerNode.usage).toStrictEqual({
1049 sequentiallyStolen: 0,
1054 history: expect.any(CircularArray)
1057 history: expect.any(CircularArray)
1061 history: expect.any(CircularArray)
1064 history: expect.any(CircularArray)
1068 expect(workerNode.usage.runTime.history.length).toBe(0)
1069 expect(workerNode.usage.waitTime.history.length).toBe(0)
1070 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1071 expect(workerNode.usage.elu.active.history.length).toBe(0)
1073 await pool.destroy()
1076 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1077 const pool = new DynamicClusterPool(
1078 Math.floor(numberOfWorkers / 2),
1080 './tests/worker-files/cluster/testWorker.js'
1082 expect(pool.emitter.eventNames()).toStrictEqual([])
1085 pool.emitter.on(PoolEvents.ready, info => {
1089 await waitPoolEvents(pool, PoolEvents.ready, 1)
1090 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1091 expect(poolReady).toBe(1)
1092 expect(poolInfo).toStrictEqual({
1094 type: PoolTypes.dynamic,
1095 worker: WorkerTypes.cluster,
1098 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1099 minSize: expect.any(Number),
1100 maxSize: expect.any(Number),
1101 workerNodes: expect.any(Number),
1102 idleWorkerNodes: expect.any(Number),
1103 busyWorkerNodes: expect.any(Number),
1104 executedTasks: expect.any(Number),
1105 executingTasks: expect.any(Number),
1106 failedTasks: expect.any(Number)
1108 await pool.destroy()
1111 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1112 const pool = new FixedThreadPool(
1114 './tests/worker-files/thread/testWorker.mjs'
1116 expect(pool.emitter.eventNames()).toStrictEqual([])
1117 const promises = new Set()
1120 pool.emitter.on(PoolEvents.busy, info => {
1124 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1125 for (let i = 0; i < numberOfWorkers * 2; i++) {
1126 promises.add(pool.execute())
1128 await Promise.all(promises)
1129 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1130 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1131 expect(poolBusy).toBe(numberOfWorkers + 1)
1132 expect(poolInfo).toStrictEqual({
1134 type: PoolTypes.fixed,
1135 worker: WorkerTypes.thread,
1138 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1139 minSize: expect.any(Number),
1140 maxSize: expect.any(Number),
1141 workerNodes: expect.any(Number),
1142 idleWorkerNodes: expect.any(Number),
1143 busyWorkerNodes: expect.any(Number),
1144 executedTasks: expect.any(Number),
1145 executingTasks: expect.any(Number),
1146 failedTasks: expect.any(Number)
1148 await pool.destroy()
1151 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1152 const pool = new DynamicThreadPool(
1153 Math.floor(numberOfWorkers / 2),
1155 './tests/worker-files/thread/testWorker.mjs'
1157 expect(pool.emitter.eventNames()).toStrictEqual([])
1158 const promises = new Set()
1161 pool.emitter.on(PoolEvents.full, info => {
1165 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1166 for (let i = 0; i < numberOfWorkers * 2; i++) {
1167 promises.add(pool.execute())
1169 await Promise.all(promises)
1170 expect(poolFull).toBe(1)
1171 expect(poolInfo).toStrictEqual({
1173 type: PoolTypes.dynamic,
1174 worker: WorkerTypes.thread,
1177 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1178 minSize: expect.any(Number),
1179 maxSize: expect.any(Number),
1180 workerNodes: expect.any(Number),
1181 idleWorkerNodes: expect.any(Number),
1182 busyWorkerNodes: expect.any(Number),
1183 executedTasks: expect.any(Number),
1184 executingTasks: expect.any(Number),
1185 failedTasks: expect.any(Number)
1187 await pool.destroy()
1190 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1191 const pool = new FixedThreadPool(
1193 './tests/worker-files/thread/testWorker.mjs',
1195 enableTasksQueue: true
1198 stub(pool, 'hasBackPressure').returns(true)
1199 expect(pool.emitter.eventNames()).toStrictEqual([])
1200 const promises = new Set()
1201 let poolBackPressure = 0
1203 pool.emitter.on(PoolEvents.backPressure, info => {
1207 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1208 for (let i = 0; i < numberOfWorkers + 1; i++) {
1209 promises.add(pool.execute())
1211 await Promise.all(promises)
1212 expect(poolBackPressure).toBe(1)
1213 expect(poolInfo).toStrictEqual({
1215 type: PoolTypes.fixed,
1216 worker: WorkerTypes.thread,
1219 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1220 minSize: expect.any(Number),
1221 maxSize: expect.any(Number),
1222 workerNodes: expect.any(Number),
1223 idleWorkerNodes: expect.any(Number),
1224 busyWorkerNodes: expect.any(Number),
1225 executedTasks: expect.any(Number),
1226 executingTasks: expect.any(Number),
1227 maxQueuedTasks: expect.any(Number),
1228 queuedTasks: expect.any(Number),
1230 stolenTasks: expect.any(Number),
1231 failedTasks: expect.any(Number)
1233 expect(pool.hasBackPressure.callCount).toBe(5)
1234 await pool.destroy()
1237 it('Verify that destroy() waits for queued tasks to finish', async () => {
1238 const tasksFinishedTimeout = 2500
1239 const pool = new FixedThreadPool(
1241 './tests/worker-files/thread/asyncWorker.mjs',
1243 enableTasksQueue: true,
1244 tasksQueueOptions: { tasksFinishedTimeout }
1247 const maxMultiplier = 4
1248 let tasksFinished = 0
1249 for (const workerNode of pool.workerNodes) {
1250 workerNode.on('taskFinished', () => {
1254 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1257 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1258 const startTime = performance.now()
1259 await pool.destroy()
1260 const elapsedTime = performance.now() - startTime
1261 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1262 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1263 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1266 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1267 const tasksFinishedTimeout = 1000
1268 const pool = new FixedThreadPool(
1270 './tests/worker-files/thread/asyncWorker.mjs',
1272 enableTasksQueue: true,
1273 tasksQueueOptions: { tasksFinishedTimeout }
1276 const maxMultiplier = 4
1277 let tasksFinished = 0
1278 for (const workerNode of pool.workerNodes) {
1279 workerNode.on('taskFinished', () => {
1283 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1286 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1287 const startTime = performance.now()
1288 await pool.destroy()
1289 const elapsedTime = performance.now() - startTime
1290 expect(tasksFinished).toBe(0)
1291 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1294 it('Verify that pool asynchronous resource track tasks execution', async () => {
1299 let resolveCalls = 0
1300 const hook = createHook({
1301 init (asyncId, type) {
1302 if (type === 'poolifier:task') {
1304 taskAsyncId = asyncId
1308 if (asyncId === taskAsyncId) beforeCalls++
1311 if (asyncId === taskAsyncId) afterCalls++
1314 if (executionAsyncId() === taskAsyncId) resolveCalls++
1317 const pool = new FixedThreadPool(
1319 './tests/worker-files/thread/testWorker.mjs'
1322 await pool.execute()
1324 expect(initCalls).toBe(1)
1325 expect(beforeCalls).toBe(1)
1326 expect(afterCalls).toBe(1)
1327 expect(resolveCalls).toBe(1)
1328 await pool.destroy()
1331 it('Verify that hasTaskFunction() is working', async () => {
1332 const dynamicThreadPool = new DynamicThreadPool(
1333 Math.floor(numberOfWorkers / 2),
1335 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1337 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1338 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1339 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1342 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1343 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1344 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1345 await dynamicThreadPool.destroy()
1346 const fixedClusterPool = new FixedClusterPool(
1348 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1350 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1351 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1352 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1355 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1356 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1357 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1358 await fixedClusterPool.destroy()
1361 it('Verify that addTaskFunction() is working', async () => {
1362 const dynamicThreadPool = new DynamicThreadPool(
1363 Math.floor(numberOfWorkers / 2),
1365 './tests/worker-files/thread/testWorker.mjs'
1367 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1369 dynamicThreadPool.addTaskFunction(0, () => {})
1370 ).rejects.toThrow(new TypeError('name argument must be a string'))
1372 dynamicThreadPool.addTaskFunction('', () => {})
1374 new TypeError('name argument must not be an empty string')
1376 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1377 new TypeError('fn argument must be a function')
1379 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1380 new TypeError('fn argument must be a function')
1382 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1386 const echoTaskFunction = data => {
1390 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1391 ).resolves.toBe(true)
1392 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1393 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1396 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1401 const taskFunctionData = { test: 'test' }
1402 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1403 expect(echoResult).toStrictEqual(taskFunctionData)
1404 for (const workerNode of dynamicThreadPool.workerNodes) {
1405 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1407 executed: expect.any(Number),
1410 sequentiallyStolen: 0,
1415 history: new CircularArray()
1418 history: new CircularArray()
1422 history: new CircularArray()
1425 history: new CircularArray()
1430 await dynamicThreadPool.destroy()
1433 it('Verify that removeTaskFunction() is working', async () => {
1434 const dynamicThreadPool = new DynamicThreadPool(
1435 Math.floor(numberOfWorkers / 2),
1437 './tests/worker-files/thread/testWorker.mjs'
1439 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1440 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1444 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1445 new Error('Cannot remove a task function not handled on the pool side')
1447 const echoTaskFunction = data => {
1450 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1451 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1452 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1463 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1464 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1465 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 await dynamicThreadPool.destroy()
1472 it('Verify that listTaskFunctionNames() is working', async () => {
1473 const dynamicThreadPool = new DynamicThreadPool(
1474 Math.floor(numberOfWorkers / 2),
1476 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1478 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1479 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1481 'jsonIntegerSerialization',
1485 await dynamicThreadPool.destroy()
1486 const fixedClusterPool = new FixedClusterPool(
1488 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1490 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1491 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1493 'jsonIntegerSerialization',
1497 await fixedClusterPool.destroy()
1500 it('Verify that setDefaultTaskFunction() is working', async () => {
1501 const dynamicThreadPool = new DynamicThreadPool(
1502 Math.floor(numberOfWorkers / 2),
1504 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1506 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1507 const workerId = dynamicThreadPool.workerNodes[0].info.id
1508 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1510 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1514 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1517 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1521 dynamicThreadPool.setDefaultTaskFunction('unknown')
1524 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1527 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1529 'jsonIntegerSerialization',
1534 dynamicThreadPool.setDefaultTaskFunction('factorial')
1535 ).resolves.toBe(true)
1536 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1539 'jsonIntegerSerialization',
1543 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1544 ).resolves.toBe(true)
1545 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1548 'jsonIntegerSerialization',
1551 await dynamicThreadPool.destroy()
1554 it('Verify that multiple task functions worker is working', async () => {
1555 const pool = new DynamicClusterPool(
1556 Math.floor(numberOfWorkers / 2),
1558 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1560 const data = { n: 10 }
1561 const result0 = await pool.execute(data)
1562 expect(result0).toStrictEqual({ ok: 1 })
1563 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1564 expect(result1).toStrictEqual({ ok: 1 })
1565 const result2 = await pool.execute(data, 'factorial')
1566 expect(result2).toBe(3628800)
1567 const result3 = await pool.execute(data, 'fibonacci')
1568 expect(result3).toBe(55)
1569 expect(pool.info.executingTasks).toBe(0)
1570 expect(pool.info.executedTasks).toBe(4)
1571 for (const workerNode of pool.workerNodes) {
1572 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1574 'jsonIntegerSerialization',
1578 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1579 for (const name of pool.listTaskFunctionNames()) {
1580 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1582 executed: expect.any(Number),
1586 sequentiallyStolen: 0,
1590 history: expect.any(CircularArray)
1593 history: expect.any(CircularArray)
1597 history: expect.any(CircularArray)
1600 history: expect.any(CircularArray)
1605 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1606 ).toBeGreaterThan(0)
1609 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1611 workerNode.getTaskFunctionWorkerUsage(
1612 workerNode.info.taskFunctionNames[1]
1616 await pool.destroy()
1619 it('Verify sendKillMessageToWorker()', async () => {
1620 const pool = new DynamicClusterPool(
1621 Math.floor(numberOfWorkers / 2),
1623 './tests/worker-files/cluster/testWorker.js'
1625 const workerNodeKey = 0
1627 pool.sendKillMessageToWorker(workerNodeKey)
1628 ).resolves.toBeUndefined()
1630 pool.sendKillMessageToWorker(numberOfWorkers)
1631 ).rejects.toStrictEqual(
1632 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1634 await pool.destroy()
1637 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1638 const pool = new DynamicClusterPool(
1639 Math.floor(numberOfWorkers / 2),
1641 './tests/worker-files/cluster/testWorker.js'
1643 const workerNodeKey = 0
1645 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1646 taskFunctionOperation: 'add',
1647 taskFunctionName: 'empty',
1648 taskFunction: (() => {}).toString()
1650 ).resolves.toBe(true)
1652 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1653 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1654 await pool.destroy()
1657 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1658 const pool = new DynamicClusterPool(
1659 Math.floor(numberOfWorkers / 2),
1661 './tests/worker-files/cluster/testWorker.js'
1664 pool.sendTaskFunctionOperationToWorkers({
1665 taskFunctionOperation: 'add',
1666 taskFunctionName: 'empty',
1667 taskFunction: (() => {}).toString()
1669 ).resolves.toBe(true)
1670 for (const workerNode of pool.workerNodes) {
1671 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1677 await pool.destroy()