1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.cjs'
18 import { CircularArray } from '../../lib/circular-array.cjs'
19 import { Deque } from '../../lib/deque.cjs'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21 import { waitPoolEvents } from '../test-utils.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.cjs'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.cjs'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.cjs'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.emitter.eventNames()).toStrictEqual([])
225 expect(pool.opts).toStrictEqual({
228 restartWorkerOnError: true,
229 enableTasksQueue: false,
230 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
232 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
233 .workerChoiceStrategies) {
234 expect(workerChoiceStrategy.opts).toStrictEqual({
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
245 const testHandler = () => console.info('test handler executed')
246 pool = new FixedThreadPool(
248 './tests/worker-files/thread/testWorker.mjs',
250 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
251 workerChoiceStrategyOptions: {
252 runTime: { median: true },
253 weights: { 0: 300, 1: 200 }
256 restartWorkerOnError: false,
257 enableTasksQueue: true,
258 tasksQueueOptions: { concurrency: 2 },
259 messageHandler: testHandler,
260 errorHandler: testHandler,
261 onlineHandler: testHandler,
262 exitHandler: testHandler
265 expect(pool.emitter).toBeUndefined()
266 expect(pool.opts).toStrictEqual({
269 restartWorkerOnError: false,
270 enableTasksQueue: true,
273 size: Math.pow(numberOfWorkers, 2),
275 tasksStealingOnBackPressure: true,
276 tasksFinishedTimeout: 2000
278 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
279 workerChoiceStrategyOptions: {
280 runTime: { median: true },
281 weights: { 0: 300, 1: 200 }
283 onlineHandler: testHandler,
284 messageHandler: testHandler,
285 errorHandler: testHandler,
286 exitHandler: testHandler
288 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
289 .workerChoiceStrategies) {
290 expect(workerChoiceStrategy.opts).toStrictEqual({
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
300 it('Verify that pool options are validated', () => {
305 './tests/worker-files/thread/testWorker.mjs',
307 workerChoiceStrategy: 'invalidStrategy'
310 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
315 './tests/worker-files/thread/testWorker.mjs',
317 workerChoiceStrategyOptions: { weights: {} }
322 'Invalid worker choice strategy options: must have a weight for each worker node'
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
336 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
343 './tests/worker-files/thread/testWorker.mjs',
345 enableTasksQueue: true,
346 tasksQueueOptions: 'invalidTasksQueueOptions'
350 new TypeError('Invalid tasks queue options: must be a plain object')
356 './tests/worker-files/thread/testWorker.mjs',
358 enableTasksQueue: true,
359 tasksQueueOptions: { concurrency: 0 }
364 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
371 './tests/worker-files/thread/testWorker.mjs',
373 enableTasksQueue: true,
374 tasksQueueOptions: { concurrency: -1 }
379 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
386 './tests/worker-files/thread/testWorker.mjs',
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0.2 }
393 new TypeError('Invalid worker node tasks concurrency: must be an integer')
399 './tests/worker-files/thread/testWorker.mjs',
401 enableTasksQueue: true,
402 tasksQueueOptions: { size: 0 }
407 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
414 './tests/worker-files/thread/testWorker.mjs',
416 enableTasksQueue: true,
417 tasksQueueOptions: { size: -1 }
422 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
429 './tests/worker-files/thread/testWorker.mjs',
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0.2 }
436 new TypeError('Invalid worker node tasks queue size: must be an integer')
440 it('Verify that pool worker choice strategy options can be set', async () => {
441 const pool = new FixedThreadPool(
443 './tests/worker-files/thread/testWorker.mjs',
444 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
446 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
447 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
448 .workerChoiceStrategies) {
449 expect(workerChoiceStrategy.opts).toStrictEqual({
450 runTime: { median: false },
451 waitTime: { median: false },
452 elu: { median: false },
453 weights: expect.objectContaining({
454 0: expect.any(Number),
455 [pool.info.maxSize - 1]: expect.any(Number)
460 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
478 pool.setWorkerChoiceStrategyOptions({
479 runTime: { median: true },
480 elu: { median: true }
482 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
483 runTime: { median: true },
484 elu: { median: true }
486 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
487 .workerChoiceStrategies) {
488 expect(workerChoiceStrategy.opts).toStrictEqual({
489 runTime: { median: true },
490 waitTime: { median: false },
491 elu: { median: true },
492 weights: expect.objectContaining({
493 0: expect.any(Number),
494 [pool.info.maxSize - 1]: expect.any(Number)
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: false },
519 elu: { median: false }
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: false },
523 elu: { median: false }
525 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
526 .workerChoiceStrategies) {
527 expect(workerChoiceStrategy.opts).toStrictEqual({
528 runTime: { median: false },
529 waitTime: { median: false },
530 elu: { median: false },
531 weights: expect.objectContaining({
532 0: expect.any(Number),
533 [pool.info.maxSize - 1]: expect.any(Number)
538 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
557 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
560 'Invalid worker choice strategy options: must be a plain object'
563 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
565 'Invalid worker choice strategy options: must have a weight for each worker node'
569 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
572 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
578 it('Verify that pool tasks queue can be enabled/disabled', async () => {
579 const pool = new FixedThreadPool(
581 './tests/worker-files/thread/testWorker.mjs'
583 expect(pool.opts.enableTasksQueue).toBe(false)
584 expect(pool.opts.tasksQueueOptions).toBeUndefined()
585 pool.enableTasksQueue(true)
586 expect(pool.opts.enableTasksQueue).toBe(true)
587 expect(pool.opts.tasksQueueOptions).toStrictEqual({
589 size: Math.pow(numberOfWorkers, 2),
591 tasksStealingOnBackPressure: true,
592 tasksFinishedTimeout: 2000
594 pool.enableTasksQueue(true, { concurrency: 2 })
595 expect(pool.opts.enableTasksQueue).toBe(true)
596 expect(pool.opts.tasksQueueOptions).toStrictEqual({
598 size: Math.pow(numberOfWorkers, 2),
600 tasksStealingOnBackPressure: true,
601 tasksFinishedTimeout: 2000
603 pool.enableTasksQueue(false)
604 expect(pool.opts.enableTasksQueue).toBe(false)
605 expect(pool.opts.tasksQueueOptions).toBeUndefined()
609 it('Verify that pool tasks queue options can be set', async () => {
610 const pool = new FixedThreadPool(
612 './tests/worker-files/thread/testWorker.mjs',
613 { enableTasksQueue: true }
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
617 size: Math.pow(numberOfWorkers, 2),
619 tasksStealingOnBackPressure: true,
620 tasksFinishedTimeout: 2000
622 for (const workerNode of pool.workerNodes) {
623 expect(workerNode.tasksQueueBackPressureSize).toBe(
624 pool.opts.tasksQueueOptions.size
627 pool.setTasksQueueOptions({
631 tasksStealingOnBackPressure: false,
632 tasksFinishedTimeout: 3000
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
638 tasksStealingOnBackPressure: false,
639 tasksFinishedTimeout: 3000
641 for (const workerNode of pool.workerNodes) {
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
646 pool.setTasksQueueOptions({
649 tasksStealingOnBackPressure: true
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
653 size: Math.pow(numberOfWorkers, 2),
655 tasksStealingOnBackPressure: true,
656 tasksFinishedTimeout: 2000
658 for (const workerNode of pool.workerNodes) {
659 expect(workerNode.tasksQueueBackPressureSize).toBe(
660 pool.opts.tasksQueueOptions.size
663 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
664 new TypeError('Invalid tasks queue options: must be a plain object')
666 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
671 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
676 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
679 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
681 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
684 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
686 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
689 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
690 new TypeError('Invalid worker node tasks queue size: must be an integer')
695 it('Verify that pool info is set', async () => {
696 let pool = new FixedThreadPool(
698 './tests/worker-files/thread/testWorker.mjs'
700 expect(pool.info).toStrictEqual({
702 type: PoolTypes.fixed,
703 worker: WorkerTypes.thread,
706 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
708 minSize: numberOfWorkers,
709 maxSize: numberOfWorkers,
710 workerNodes: numberOfWorkers,
711 idleWorkerNodes: numberOfWorkers,
718 pool = new DynamicClusterPool(
719 Math.floor(numberOfWorkers / 2),
721 './tests/worker-files/cluster/testWorker.cjs'
723 expect(pool.info).toStrictEqual({
725 type: PoolTypes.dynamic,
726 worker: WorkerTypes.cluster,
729 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
731 minSize: Math.floor(numberOfWorkers / 2),
732 maxSize: numberOfWorkers,
733 workerNodes: Math.floor(numberOfWorkers / 2),
734 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
743 it('Verify that pool worker tasks usage are initialized', async () => {
744 const pool = new FixedClusterPool(
746 './tests/worker-files/cluster/testWorker.cjs'
748 for (const workerNode of pool.workerNodes) {
749 expect(workerNode).toBeInstanceOf(WorkerNode)
750 expect(workerNode.usage).toStrictEqual({
756 sequentiallyStolen: 0,
761 history: new CircularArray()
764 history: new CircularArray()
768 history: new CircularArray()
771 history: new CircularArray()
779 it('Verify that pool worker tasks queue are initialized', async () => {
780 let pool = new FixedClusterPool(
782 './tests/worker-files/cluster/testWorker.cjs'
784 for (const workerNode of pool.workerNodes) {
785 expect(workerNode).toBeInstanceOf(WorkerNode)
786 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
787 expect(workerNode.tasksQueue.size).toBe(0)
788 expect(workerNode.tasksQueue.maxSize).toBe(0)
791 pool = new DynamicThreadPool(
792 Math.floor(numberOfWorkers / 2),
794 './tests/worker-files/thread/testWorker.mjs'
796 for (const workerNode of pool.workerNodes) {
797 expect(workerNode).toBeInstanceOf(WorkerNode)
798 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
799 expect(workerNode.tasksQueue.size).toBe(0)
800 expect(workerNode.tasksQueue.maxSize).toBe(0)
805 it('Verify that pool worker info are initialized', async () => {
806 let pool = new FixedClusterPool(
808 './tests/worker-files/cluster/testWorker.cjs'
810 for (const workerNode of pool.workerNodes) {
811 expect(workerNode).toBeInstanceOf(WorkerNode)
812 expect(workerNode.info).toStrictEqual({
813 id: expect.any(Number),
814 type: WorkerTypes.cluster,
821 pool = new DynamicThreadPool(
822 Math.floor(numberOfWorkers / 2),
824 './tests/worker-files/thread/testWorker.mjs'
826 for (const workerNode of pool.workerNodes) {
827 expect(workerNode).toBeInstanceOf(WorkerNode)
828 expect(workerNode.info).toStrictEqual({
829 id: expect.any(Number),
830 type: WorkerTypes.thread,
839 it('Verify that pool statuses are checked at start or destroy', async () => {
840 const pool = new FixedThreadPool(
842 './tests/worker-files/thread/testWorker.mjs'
844 expect(pool.info.started).toBe(true)
845 expect(pool.info.ready).toBe(true)
846 expect(() => pool.start()).toThrow(
847 new Error('Cannot start an already started pool')
850 expect(pool.info.started).toBe(false)
851 expect(pool.info.ready).toBe(false)
852 await expect(pool.destroy()).rejects.toThrow(
853 new Error('Cannot destroy an already destroyed pool')
857 it('Verify that pool can be started after initialization', async () => {
858 const pool = new FixedClusterPool(
860 './tests/worker-files/cluster/testWorker.cjs',
865 expect(pool.info.started).toBe(false)
866 expect(pool.info.ready).toBe(false)
867 expect(pool.workerNodes).toStrictEqual([])
868 expect(pool.readyEventEmitted).toBe(false)
869 await expect(pool.execute()).rejects.toThrow(
870 new Error('Cannot execute a task on not started pool')
873 expect(pool.info.started).toBe(true)
874 expect(pool.info.ready).toBe(true)
875 await waitPoolEvents(pool, PoolEvents.ready, 1)
876 expect(pool.readyEventEmitted).toBe(true)
877 expect(pool.workerNodes.length).toBe(numberOfWorkers)
878 for (const workerNode of pool.workerNodes) {
879 expect(workerNode).toBeInstanceOf(WorkerNode)
884 it('Verify that pool execute() arguments are checked', async () => {
885 const pool = new FixedClusterPool(
887 './tests/worker-files/cluster/testWorker.cjs'
889 await expect(pool.execute(undefined, 0)).rejects.toThrow(
890 new TypeError('name argument must be a string')
892 await expect(pool.execute(undefined, '')).rejects.toThrow(
893 new TypeError('name argument must not be an empty string')
895 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
896 new TypeError('transferList argument must be an array')
898 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
899 "Task function 'unknown' not found"
902 await expect(pool.execute()).rejects.toThrow(
903 new Error('Cannot execute a task on not started pool')
907 it('Verify that pool worker tasks usage are computed', async () => {
908 const pool = new FixedClusterPool(
910 './tests/worker-files/cluster/testWorker.cjs'
912 const promises = new Set()
913 const maxMultiplier = 2
914 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
915 promises.add(pool.execute())
917 for (const workerNode of pool.workerNodes) {
918 expect(workerNode.usage).toStrictEqual({
921 executing: maxMultiplier,
924 sequentiallyStolen: 0,
929 history: expect.any(CircularArray)
932 history: expect.any(CircularArray)
936 history: expect.any(CircularArray)
939 history: expect.any(CircularArray)
944 await Promise.all(promises)
945 for (const workerNode of pool.workerNodes) {
946 expect(workerNode.usage).toStrictEqual({
948 executed: maxMultiplier,
952 sequentiallyStolen: 0,
957 history: expect.any(CircularArray)
960 history: expect.any(CircularArray)
964 history: expect.any(CircularArray)
967 history: expect.any(CircularArray)
975 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
976 const pool = new DynamicThreadPool(
977 Math.floor(numberOfWorkers / 2),
979 './tests/worker-files/thread/testWorker.mjs'
981 const promises = new Set()
982 const maxMultiplier = 2
983 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
984 promises.add(pool.execute())
986 await Promise.all(promises)
987 for (const workerNode of pool.workerNodes) {
988 expect(workerNode.usage).toStrictEqual({
990 executed: expect.any(Number),
994 sequentiallyStolen: 0,
999 history: expect.any(CircularArray)
1002 history: expect.any(CircularArray)
1006 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1013 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1014 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1015 numberOfWorkers * maxMultiplier
1017 expect(workerNode.usage.runTime.history.length).toBe(0)
1018 expect(workerNode.usage.waitTime.history.length).toBe(0)
1019 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1020 expect(workerNode.usage.elu.active.history.length).toBe(0)
1022 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1023 for (const workerNode of pool.workerNodes) {
1024 expect(workerNode.usage).toStrictEqual({
1030 sequentiallyStolen: 0,
1035 history: expect.any(CircularArray)
1038 history: expect.any(CircularArray)
1042 history: expect.any(CircularArray)
1045 history: expect.any(CircularArray)
1049 expect(workerNode.usage.runTime.history.length).toBe(0)
1050 expect(workerNode.usage.waitTime.history.length).toBe(0)
1051 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1052 expect(workerNode.usage.elu.active.history.length).toBe(0)
1054 await pool.destroy()
1057 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1058 const pool = new DynamicClusterPool(
1059 Math.floor(numberOfWorkers / 2),
1061 './tests/worker-files/cluster/testWorker.cjs'
1063 expect(pool.emitter.eventNames()).toStrictEqual([])
1066 pool.emitter.on(PoolEvents.ready, info => {
1070 await waitPoolEvents(pool, PoolEvents.ready, 1)
1071 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1072 expect(poolReady).toBe(1)
1073 expect(poolInfo).toStrictEqual({
1075 type: PoolTypes.dynamic,
1076 worker: WorkerTypes.cluster,
1079 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1080 strategyRetries: expect.any(Number),
1081 minSize: expect.any(Number),
1082 maxSize: expect.any(Number),
1083 workerNodes: expect.any(Number),
1084 idleWorkerNodes: expect.any(Number),
1085 busyWorkerNodes: expect.any(Number),
1086 executedTasks: expect.any(Number),
1087 executingTasks: expect.any(Number),
1088 failedTasks: expect.any(Number)
1090 await pool.destroy()
1093 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1094 const pool = new FixedThreadPool(
1096 './tests/worker-files/thread/testWorker.mjs'
1098 expect(pool.emitter.eventNames()).toStrictEqual([])
1099 const promises = new Set()
1102 pool.emitter.on(PoolEvents.busy, info => {
1106 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1107 for (let i = 0; i < numberOfWorkers * 2; i++) {
1108 promises.add(pool.execute())
1110 await Promise.all(promises)
1111 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1112 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1113 expect(poolBusy).toBe(numberOfWorkers + 1)
1114 expect(poolInfo).toStrictEqual({
1116 type: PoolTypes.fixed,
1117 worker: WorkerTypes.thread,
1120 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1121 strategyRetries: expect.any(Number),
1122 minSize: expect.any(Number),
1123 maxSize: expect.any(Number),
1124 workerNodes: expect.any(Number),
1125 idleWorkerNodes: expect.any(Number),
1126 busyWorkerNodes: expect.any(Number),
1127 executedTasks: expect.any(Number),
1128 executingTasks: expect.any(Number),
1129 failedTasks: expect.any(Number)
1131 await pool.destroy()
1134 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1135 const pool = new DynamicThreadPool(
1136 Math.floor(numberOfWorkers / 2),
1138 './tests/worker-files/thread/testWorker.mjs'
1140 expect(pool.emitter.eventNames()).toStrictEqual([])
1141 const promises = new Set()
1144 pool.emitter.on(PoolEvents.full, info => {
1148 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1149 for (let i = 0; i < numberOfWorkers * 2; i++) {
1150 promises.add(pool.execute())
1152 await Promise.all(promises)
1153 expect(poolFull).toBe(1)
1154 expect(poolInfo).toStrictEqual({
1156 type: PoolTypes.dynamic,
1157 worker: WorkerTypes.thread,
1160 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1161 strategyRetries: expect.any(Number),
1162 minSize: expect.any(Number),
1163 maxSize: expect.any(Number),
1164 workerNodes: expect.any(Number),
1165 idleWorkerNodes: expect.any(Number),
1166 busyWorkerNodes: expect.any(Number),
1167 executedTasks: expect.any(Number),
1168 executingTasks: expect.any(Number),
1169 failedTasks: expect.any(Number)
1171 await pool.destroy()
1174 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1175 const pool = new FixedThreadPool(
1177 './tests/worker-files/thread/testWorker.mjs',
1179 enableTasksQueue: true
1182 stub(pool, 'hasBackPressure').returns(true)
1183 expect(pool.emitter.eventNames()).toStrictEqual([])
1184 const promises = new Set()
1185 let poolBackPressure = 0
1187 pool.emitter.on(PoolEvents.backPressure, info => {
1191 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1192 for (let i = 0; i < numberOfWorkers + 1; i++) {
1193 promises.add(pool.execute())
1195 await Promise.all(promises)
1196 expect(poolBackPressure).toBe(1)
1197 expect(poolInfo).toStrictEqual({
1199 type: PoolTypes.fixed,
1200 worker: WorkerTypes.thread,
1203 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1204 strategyRetries: expect.any(Number),
1205 minSize: expect.any(Number),
1206 maxSize: expect.any(Number),
1207 workerNodes: expect.any(Number),
1208 idleWorkerNodes: expect.any(Number),
1209 stealingWorkerNodes: expect.any(Number),
1210 busyWorkerNodes: expect.any(Number),
1211 executedTasks: expect.any(Number),
1212 executingTasks: expect.any(Number),
1213 maxQueuedTasks: expect.any(Number),
1214 queuedTasks: expect.any(Number),
1216 stolenTasks: expect.any(Number),
1217 failedTasks: expect.any(Number)
1219 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1220 await pool.destroy()
1223 it('Verify that destroy() waits for queued tasks to finish', async () => {
1224 const tasksFinishedTimeout = 2500
1225 const pool = new FixedThreadPool(
1227 './tests/worker-files/thread/asyncWorker.mjs',
1229 enableTasksQueue: true,
1230 tasksQueueOptions: { tasksFinishedTimeout }
1233 const maxMultiplier = 4
1234 let tasksFinished = 0
1235 for (const workerNode of pool.workerNodes) {
1236 workerNode.on('taskFinished', () => {
1240 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1243 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1244 const startTime = performance.now()
1245 await pool.destroy()
1246 const elapsedTime = performance.now() - startTime
1247 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1248 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1249 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1252 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1253 const tasksFinishedTimeout = 1000
1254 const pool = new FixedThreadPool(
1256 './tests/worker-files/thread/asyncWorker.mjs',
1258 enableTasksQueue: true,
1259 tasksQueueOptions: { tasksFinishedTimeout }
1262 const maxMultiplier = 4
1263 let tasksFinished = 0
1264 for (const workerNode of pool.workerNodes) {
1265 workerNode.on('taskFinished', () => {
1269 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1272 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1273 const startTime = performance.now()
1274 await pool.destroy()
1275 const elapsedTime = performance.now() - startTime
1276 expect(tasksFinished).toBe(0)
1277 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1280 it('Verify that pool asynchronous resource track tasks execution', async () => {
1285 let resolveCalls = 0
1286 const hook = createHook({
1287 init (asyncId, type) {
1288 if (type === 'poolifier:task') {
1290 taskAsyncId = asyncId
1294 if (asyncId === taskAsyncId) beforeCalls++
1297 if (asyncId === taskAsyncId) afterCalls++
1300 if (executionAsyncId() === taskAsyncId) resolveCalls++
1303 const pool = new FixedThreadPool(
1305 './tests/worker-files/thread/testWorker.mjs'
1308 await pool.execute()
1310 expect(initCalls).toBe(1)
1311 expect(beforeCalls).toBe(1)
1312 expect(afterCalls).toBe(1)
1313 expect(resolveCalls).toBe(1)
1314 await pool.destroy()
1317 it('Verify that hasTaskFunction() is working', async () => {
1318 const dynamicThreadPool = new DynamicThreadPool(
1319 Math.floor(numberOfWorkers / 2),
1321 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1323 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1324 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1325 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1328 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1329 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1330 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1331 await dynamicThreadPool.destroy()
1332 const fixedClusterPool = new FixedClusterPool(
1334 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1336 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1337 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1338 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1341 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1342 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1343 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1344 await fixedClusterPool.destroy()
1347 it('Verify that addTaskFunction() is working', async () => {
1348 const dynamicThreadPool = new DynamicThreadPool(
1349 Math.floor(numberOfWorkers / 2),
1351 './tests/worker-files/thread/testWorker.mjs'
1353 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1355 dynamicThreadPool.addTaskFunction(0, () => {})
1356 ).rejects.toThrow(new TypeError('name argument must be a string'))
1358 dynamicThreadPool.addTaskFunction('', () => {})
1360 new TypeError('name argument must not be an empty string')
1362 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1363 new TypeError('fn argument must be a function')
1365 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1366 new TypeError('fn argument must be a function')
1368 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1372 const echoTaskFunction = data => {
1376 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1377 ).resolves.toBe(true)
1378 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1379 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1382 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1387 const taskFunctionData = { test: 'test' }
1388 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1389 expect(echoResult).toStrictEqual(taskFunctionData)
1390 for (const workerNode of dynamicThreadPool.workerNodes) {
1391 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1393 executed: expect.any(Number),
1396 sequentiallyStolen: 0,
1401 history: new CircularArray()
1404 history: new CircularArray()
1408 history: new CircularArray()
1411 history: new CircularArray()
1416 await dynamicThreadPool.destroy()
1419 it('Verify that removeTaskFunction() is working', async () => {
1420 const dynamicThreadPool = new DynamicThreadPool(
1421 Math.floor(numberOfWorkers / 2),
1423 './tests/worker-files/thread/testWorker.mjs'
1425 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1426 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1430 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1431 new Error('Cannot remove a task function not handled on the pool side')
1433 const echoTaskFunction = data => {
1436 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1437 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1438 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1441 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1446 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1449 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1450 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1451 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1455 await dynamicThreadPool.destroy()
1458 it('Verify that listTaskFunctionNames() is working', async () => {
1459 const dynamicThreadPool = new DynamicThreadPool(
1460 Math.floor(numberOfWorkers / 2),
1462 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1464 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1465 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1467 'jsonIntegerSerialization',
1471 await dynamicThreadPool.destroy()
1472 const fixedClusterPool = new FixedClusterPool(
1474 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1476 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1477 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1479 'jsonIntegerSerialization',
1483 await fixedClusterPool.destroy()
1486 it('Verify that setDefaultTaskFunction() is working', async () => {
1487 const dynamicThreadPool = new DynamicThreadPool(
1488 Math.floor(numberOfWorkers / 2),
1490 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1492 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1493 const workerId = dynamicThreadPool.workerNodes[0].info.id
1494 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1496 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1500 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1503 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1507 dynamicThreadPool.setDefaultTaskFunction('unknown')
1510 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1513 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1515 'jsonIntegerSerialization',
1520 dynamicThreadPool.setDefaultTaskFunction('factorial')
1521 ).resolves.toBe(true)
1522 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 'jsonIntegerSerialization',
1529 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1530 ).resolves.toBe(true)
1531 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1534 'jsonIntegerSerialization',
1537 await dynamicThreadPool.destroy()
1540 it('Verify that multiple task functions worker is working', async () => {
1541 const pool = new DynamicClusterPool(
1542 Math.floor(numberOfWorkers / 2),
1544 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1546 const data = { n: 10 }
1547 const result0 = await pool.execute(data)
1548 expect(result0).toStrictEqual({ ok: 1 })
1549 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1550 expect(result1).toStrictEqual({ ok: 1 })
1551 const result2 = await pool.execute(data, 'factorial')
1552 expect(result2).toBe(3628800)
1553 const result3 = await pool.execute(data, 'fibonacci')
1554 expect(result3).toBe(55)
1555 expect(pool.info.executingTasks).toBe(0)
1556 expect(pool.info.executedTasks).toBe(4)
1557 for (const workerNode of pool.workerNodes) {
1558 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1560 'jsonIntegerSerialization',
1564 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1565 for (const name of pool.listTaskFunctionNames()) {
1566 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1568 executed: expect.any(Number),
1572 sequentiallyStolen: 0,
1576 history: expect.any(CircularArray)
1579 history: expect.any(CircularArray)
1583 history: expect.any(CircularArray)
1586 history: expect.any(CircularArray)
1591 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1592 ).toBeGreaterThan(0)
1595 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1597 workerNode.getTaskFunctionWorkerUsage(
1598 workerNode.info.taskFunctionNames[1]
1602 await pool.destroy()
1605 it('Verify sendKillMessageToWorker()', async () => {
1606 const pool = new DynamicClusterPool(
1607 Math.floor(numberOfWorkers / 2),
1609 './tests/worker-files/cluster/testWorker.cjs'
1611 const workerNodeKey = 0
1613 pool.sendKillMessageToWorker(workerNodeKey)
1614 ).resolves.toBeUndefined()
1615 await pool.destroy()
1618 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1619 const pool = new DynamicClusterPool(
1620 Math.floor(numberOfWorkers / 2),
1622 './tests/worker-files/cluster/testWorker.cjs'
1624 const workerNodeKey = 0
1626 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1627 taskFunctionOperation: 'add',
1628 taskFunctionName: 'empty',
1629 taskFunction: (() => {}).toString()
1631 ).resolves.toBe(true)
1633 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1634 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1635 await pool.destroy()
1638 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1639 const pool = new DynamicClusterPool(
1640 Math.floor(numberOfWorkers / 2),
1642 './tests/worker-files/cluster/testWorker.cjs'
1645 pool.sendTaskFunctionOperationToWorkers({
1646 taskFunctionOperation: 'add',
1647 taskFunctionName: 'empty',
1648 taskFunction: (() => {}).toString()
1650 ).resolves.toBe(true)
1651 for (const workerNode of pool.workerNodes) {
1652 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1658 await pool.destroy()