1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.mjs',
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
289 size: Math.pow(numberOfWorkers, 2),
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
328 it('Verify that pool options are validated', () => {
333 './tests/worker-files/thread/testWorker.mjs',
335 workerChoiceStrategy: 'invalidStrategy'
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
343 './tests/worker-files/thread/testWorker.mjs',
345 workerChoiceStrategyOptions: { weights: {} }
350 'Invalid worker choice strategy options: must have a weight for each worker node'
357 './tests/worker-files/thread/testWorker.mjs',
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 './tests/worker-files/thread/testWorker.mjs',
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
378 new TypeError('Invalid tasks queue options: must be a plain object')
384 './tests/worker-files/thread/testWorker.mjs',
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
399 './tests/worker-files/thread/testWorker.mjs',
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 './tests/worker-files/thread/testWorker.mjs',
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
427 './tests/worker-files/thread/testWorker.mjs',
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 './tests/worker-files/thread/testWorker.mjs',
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 './tests/worker-files/thread/testWorker.mjs',
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 // weights: expect.objectContaining({
498 // 0: expect.any(Number),
499 // [pool.info.maxSize - 1]: expect.any(Number)
505 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
523 pool.setWorkerChoiceStrategyOptions({
524 runTime: { median: true },
525 elu: { median: true }
527 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
528 runTime: { median: true },
529 elu: { median: true }
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
534 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
535 runTime: { median: true },
536 waitTime: { median: false },
537 elu: { median: true },
538 weights: expect.objectContaining({
539 0: expect.any(Number),
540 [pool.info.maxSize - 1]: expect.any(Number)
543 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
544 .workerChoiceStrategies) {
545 expect(workerChoiceStrategy.opts).toStrictEqual(
546 expect.objectContaining({
549 Object.keys(workerChoiceStrategy.opts.weights).length,
550 runTime: { median: true },
551 waitTime: { median: false },
552 elu: { median: true }
553 // weights: expect.objectContaining({
554 // 0: expect.any(Number),
555 // [pool.info.maxSize - 1]: expect.any(Number)
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
579 pool.setWorkerChoiceStrategyOptions({
580 runTime: { median: false },
581 elu: { median: false }
583 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
584 runTime: { median: false },
585 elu: { median: false }
587 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
590 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
591 runTime: { median: false },
592 waitTime: { median: false },
593 elu: { median: false },
594 weights: expect.objectContaining({
595 0: expect.any(Number),
596 [pool.info.maxSize - 1]: expect.any(Number)
599 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
600 .workerChoiceStrategies) {
601 expect(workerChoiceStrategy.opts).toStrictEqual({
604 Object.keys(workerChoiceStrategy.opts.weights).length,
605 runTime: { median: false },
606 waitTime: { median: false },
607 elu: { median: false },
608 weights: expect.objectContaining({
609 0: expect.any(Number),
610 [pool.info.maxSize - 1]: expect.any(Number)
615 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
634 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
637 'Invalid worker choice strategy options: must be a plain object'
640 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
642 'Invalid worker choice strategy options: must have a weight for each worker node'
646 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
649 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
655 it('Verify that pool tasks queue can be enabled/disabled', async () => {
656 const pool = new FixedThreadPool(
658 './tests/worker-files/thread/testWorker.mjs'
660 expect(pool.opts.enableTasksQueue).toBe(false)
661 expect(pool.opts.tasksQueueOptions).toBeUndefined()
662 pool.enableTasksQueue(true)
663 expect(pool.opts.enableTasksQueue).toBe(true)
664 expect(pool.opts.tasksQueueOptions).toStrictEqual({
666 size: Math.pow(numberOfWorkers, 2),
668 tasksStealingOnBackPressure: true,
669 tasksFinishedTimeout: 2000
671 pool.enableTasksQueue(true, { concurrency: 2 })
672 expect(pool.opts.enableTasksQueue).toBe(true)
673 expect(pool.opts.tasksQueueOptions).toStrictEqual({
675 size: Math.pow(numberOfWorkers, 2),
677 tasksStealingOnBackPressure: true,
678 tasksFinishedTimeout: 2000
680 pool.enableTasksQueue(false)
681 expect(pool.opts.enableTasksQueue).toBe(false)
682 expect(pool.opts.tasksQueueOptions).toBeUndefined()
686 it('Verify that pool tasks queue options can be set', async () => {
687 const pool = new FixedThreadPool(
689 './tests/worker-files/thread/testWorker.mjs',
690 { enableTasksQueue: true }
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
694 size: Math.pow(numberOfWorkers, 2),
696 tasksStealingOnBackPressure: true,
697 tasksFinishedTimeout: 2000
699 for (const workerNode of pool.workerNodes) {
700 expect(workerNode.tasksQueueBackPressureSize).toBe(
701 pool.opts.tasksQueueOptions.size
704 pool.setTasksQueueOptions({
708 tasksStealingOnBackPressure: false,
709 tasksFinishedTimeout: 3000
711 expect(pool.opts.tasksQueueOptions).toStrictEqual({
715 tasksStealingOnBackPressure: false,
716 tasksFinishedTimeout: 3000
718 for (const workerNode of pool.workerNodes) {
719 expect(workerNode.tasksQueueBackPressureSize).toBe(
720 pool.opts.tasksQueueOptions.size
723 pool.setTasksQueueOptions({
726 tasksStealingOnBackPressure: true
728 expect(pool.opts.tasksQueueOptions).toStrictEqual({
730 size: Math.pow(numberOfWorkers, 2),
732 tasksStealingOnBackPressure: true,
733 tasksFinishedTimeout: 2000
735 for (const workerNode of pool.workerNodes) {
736 expect(workerNode.tasksQueueBackPressureSize).toBe(
737 pool.opts.tasksQueueOptions.size
740 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
741 new TypeError('Invalid tasks queue options: must be a plain object')
743 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
745 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
748 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
750 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
753 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
754 new TypeError('Invalid worker node tasks concurrency: must be an integer')
756 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
758 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
761 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
763 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
766 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
767 new TypeError('Invalid worker node tasks queue size: must be an integer')
772 it('Verify that pool info is set', async () => {
773 let pool = new FixedThreadPool(
775 './tests/worker-files/thread/testWorker.mjs'
777 expect(pool.info).toStrictEqual({
779 type: PoolTypes.fixed,
780 worker: WorkerTypes.thread,
783 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
784 minSize: numberOfWorkers,
785 maxSize: numberOfWorkers,
786 workerNodes: numberOfWorkers,
787 idleWorkerNodes: numberOfWorkers,
794 pool = new DynamicClusterPool(
795 Math.floor(numberOfWorkers / 2),
797 './tests/worker-files/cluster/testWorker.js'
799 expect(pool.info).toStrictEqual({
801 type: PoolTypes.dynamic,
802 worker: WorkerTypes.cluster,
805 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
806 minSize: Math.floor(numberOfWorkers / 2),
807 maxSize: numberOfWorkers,
808 workerNodes: Math.floor(numberOfWorkers / 2),
809 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
818 it('Verify that pool worker tasks usage are initialized', async () => {
819 const pool = new FixedClusterPool(
821 './tests/worker-files/cluster/testWorker.js'
823 for (const workerNode of pool.workerNodes) {
824 expect(workerNode).toBeInstanceOf(WorkerNode)
825 expect(workerNode.usage).toStrictEqual({
831 sequentiallyStolen: 0,
836 history: new CircularArray()
839 history: new CircularArray()
843 history: new CircularArray()
846 history: new CircularArray()
854 it('Verify that pool worker tasks queue are initialized', async () => {
855 let pool = new FixedClusterPool(
857 './tests/worker-files/cluster/testWorker.js'
859 for (const workerNode of pool.workerNodes) {
860 expect(workerNode).toBeInstanceOf(WorkerNode)
861 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
862 expect(workerNode.tasksQueue.size).toBe(0)
863 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 pool = new DynamicThreadPool(
867 Math.floor(numberOfWorkers / 2),
869 './tests/worker-files/thread/testWorker.mjs'
871 for (const workerNode of pool.workerNodes) {
872 expect(workerNode).toBeInstanceOf(WorkerNode)
873 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
874 expect(workerNode.tasksQueue.size).toBe(0)
875 expect(workerNode.tasksQueue.maxSize).toBe(0)
880 it('Verify that pool worker info are initialized', async () => {
881 let pool = new FixedClusterPool(
883 './tests/worker-files/cluster/testWorker.js'
885 for (const workerNode of pool.workerNodes) {
886 expect(workerNode).toBeInstanceOf(WorkerNode)
887 expect(workerNode.info).toStrictEqual({
888 id: expect.any(Number),
889 type: WorkerTypes.cluster,
895 pool = new DynamicThreadPool(
896 Math.floor(numberOfWorkers / 2),
898 './tests/worker-files/thread/testWorker.mjs'
900 for (const workerNode of pool.workerNodes) {
901 expect(workerNode).toBeInstanceOf(WorkerNode)
902 expect(workerNode.info).toStrictEqual({
903 id: expect.any(Number),
904 type: WorkerTypes.thread,
912 it('Verify that pool statuses are checked at start or destroy', async () => {
913 const pool = new FixedThreadPool(
915 './tests/worker-files/thread/testWorker.mjs'
917 expect(pool.info.started).toBe(true)
918 expect(pool.info.ready).toBe(true)
919 expect(() => pool.start()).toThrow(
920 new Error('Cannot start an already started pool')
923 expect(pool.info.started).toBe(false)
924 expect(pool.info.ready).toBe(false)
925 await expect(pool.destroy()).rejects.toThrow(
926 new Error('Cannot destroy an already destroyed pool')
930 it('Verify that pool can be started after initialization', async () => {
931 const pool = new FixedClusterPool(
933 './tests/worker-files/cluster/testWorker.js',
938 expect(pool.info.started).toBe(false)
939 expect(pool.info.ready).toBe(false)
940 expect(pool.readyEventEmitted).toBe(false)
941 expect(pool.workerNodes).toStrictEqual([])
942 await expect(pool.execute()).rejects.toThrow(
943 new Error('Cannot execute a task on not started pool')
946 expect(pool.info.started).toBe(true)
947 expect(pool.info.ready).toBe(true)
948 await waitPoolEvents(pool, PoolEvents.ready, 1)
949 expect(pool.readyEventEmitted).toBe(true)
950 expect(pool.workerNodes.length).toBe(numberOfWorkers)
951 for (const workerNode of pool.workerNodes) {
952 expect(workerNode).toBeInstanceOf(WorkerNode)
957 it('Verify that pool execute() arguments are checked', async () => {
958 const pool = new FixedClusterPool(
960 './tests/worker-files/cluster/testWorker.js'
962 await expect(pool.execute(undefined, 0)).rejects.toThrow(
963 new TypeError('name argument must be a string')
965 await expect(pool.execute(undefined, '')).rejects.toThrow(
966 new TypeError('name argument must not be an empty string')
968 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
969 new TypeError('transferList argument must be an array')
971 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
972 "Task function 'unknown' not found"
975 await expect(pool.execute()).rejects.toThrow(
976 new Error('Cannot execute a task on not started pool')
980 it('Verify that pool worker tasks usage are computed', async () => {
981 const pool = new FixedClusterPool(
983 './tests/worker-files/cluster/testWorker.js'
985 const promises = new Set()
986 const maxMultiplier = 2
987 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
988 promises.add(pool.execute())
990 for (const workerNode of pool.workerNodes) {
991 expect(workerNode.usage).toStrictEqual({
994 executing: maxMultiplier,
997 sequentiallyStolen: 0,
1002 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1017 await Promise.all(promises)
1018 for (const workerNode of pool.workerNodes) {
1019 expect(workerNode.usage).toStrictEqual({
1021 executed: maxMultiplier,
1025 sequentiallyStolen: 0,
1030 history: expect.any(CircularArray)
1033 history: expect.any(CircularArray)
1037 history: expect.any(CircularArray)
1040 history: expect.any(CircularArray)
1045 await pool.destroy()
1048 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1049 const pool = new DynamicThreadPool(
1050 Math.floor(numberOfWorkers / 2),
1052 './tests/worker-files/thread/testWorker.mjs'
1054 const promises = new Set()
1055 const maxMultiplier = 2
1056 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1057 promises.add(pool.execute())
1059 await Promise.all(promises)
1060 for (const workerNode of pool.workerNodes) {
1061 expect(workerNode.usage).toStrictEqual({
1063 executed: expect.any(Number),
1067 sequentiallyStolen: 0,
1072 history: expect.any(CircularArray)
1075 history: expect.any(CircularArray)
1079 history: expect.any(CircularArray)
1082 history: expect.any(CircularArray)
1086 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1087 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1088 numberOfWorkers * maxMultiplier
1090 expect(workerNode.usage.runTime.history.length).toBe(0)
1091 expect(workerNode.usage.waitTime.history.length).toBe(0)
1092 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1093 expect(workerNode.usage.elu.active.history.length).toBe(0)
1095 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1096 for (const workerNode of pool.workerNodes) {
1097 expect(workerNode.usage).toStrictEqual({
1103 sequentiallyStolen: 0,
1108 history: expect.any(CircularArray)
1111 history: expect.any(CircularArray)
1115 history: expect.any(CircularArray)
1118 history: expect.any(CircularArray)
1122 expect(workerNode.usage.runTime.history.length).toBe(0)
1123 expect(workerNode.usage.waitTime.history.length).toBe(0)
1124 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1125 expect(workerNode.usage.elu.active.history.length).toBe(0)
1127 await pool.destroy()
1130 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1131 const pool = new DynamicClusterPool(
1132 Math.floor(numberOfWorkers / 2),
1134 './tests/worker-files/cluster/testWorker.js'
1136 expect(pool.emitter.eventNames()).toStrictEqual([])
1139 pool.emitter.on(PoolEvents.ready, info => {
1143 await waitPoolEvents(pool, PoolEvents.ready, 1)
1144 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1145 expect(poolReady).toBe(1)
1146 expect(poolInfo).toStrictEqual({
1148 type: PoolTypes.dynamic,
1149 worker: WorkerTypes.cluster,
1152 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1153 minSize: expect.any(Number),
1154 maxSize: expect.any(Number),
1155 workerNodes: expect.any(Number),
1156 idleWorkerNodes: expect.any(Number),
1157 busyWorkerNodes: expect.any(Number),
1158 executedTasks: expect.any(Number),
1159 executingTasks: expect.any(Number),
1160 failedTasks: expect.any(Number)
1162 await pool.destroy()
1165 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1166 const pool = new FixedThreadPool(
1168 './tests/worker-files/thread/testWorker.mjs'
1170 expect(pool.emitter.eventNames()).toStrictEqual([])
1171 const promises = new Set()
1174 pool.emitter.on(PoolEvents.busy, info => {
1178 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1179 for (let i = 0; i < numberOfWorkers * 2; i++) {
1180 promises.add(pool.execute())
1182 await Promise.all(promises)
1183 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1184 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1185 expect(poolBusy).toBe(numberOfWorkers + 1)
1186 expect(poolInfo).toStrictEqual({
1188 type: PoolTypes.fixed,
1189 worker: WorkerTypes.thread,
1192 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1193 minSize: expect.any(Number),
1194 maxSize: expect.any(Number),
1195 workerNodes: expect.any(Number),
1196 idleWorkerNodes: expect.any(Number),
1197 busyWorkerNodes: expect.any(Number),
1198 executedTasks: expect.any(Number),
1199 executingTasks: expect.any(Number),
1200 failedTasks: expect.any(Number)
1202 await pool.destroy()
1205 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1206 const pool = new DynamicThreadPool(
1207 Math.floor(numberOfWorkers / 2),
1209 './tests/worker-files/thread/testWorker.mjs'
1211 expect(pool.emitter.eventNames()).toStrictEqual([])
1212 const promises = new Set()
1215 pool.emitter.on(PoolEvents.full, info => {
1219 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1220 for (let i = 0; i < numberOfWorkers * 2; i++) {
1221 promises.add(pool.execute())
1223 await Promise.all(promises)
1224 expect(poolFull).toBe(1)
1225 expect(poolInfo).toStrictEqual({
1227 type: PoolTypes.dynamic,
1228 worker: WorkerTypes.thread,
1231 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1232 minSize: expect.any(Number),
1233 maxSize: expect.any(Number),
1234 workerNodes: expect.any(Number),
1235 idleWorkerNodes: expect.any(Number),
1236 busyWorkerNodes: expect.any(Number),
1237 executedTasks: expect.any(Number),
1238 executingTasks: expect.any(Number),
1239 failedTasks: expect.any(Number)
1241 await pool.destroy()
1244 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1245 const pool = new FixedThreadPool(
1247 './tests/worker-files/thread/testWorker.mjs',
1249 enableTasksQueue: true
1252 stub(pool, 'hasBackPressure').returns(true)
1253 expect(pool.emitter.eventNames()).toStrictEqual([])
1254 const promises = new Set()
1255 let poolBackPressure = 0
1257 pool.emitter.on(PoolEvents.backPressure, info => {
1261 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1262 for (let i = 0; i < numberOfWorkers + 1; i++) {
1263 promises.add(pool.execute())
1265 await Promise.all(promises)
1266 expect(poolBackPressure).toBe(1)
1267 expect(poolInfo).toStrictEqual({
1269 type: PoolTypes.fixed,
1270 worker: WorkerTypes.thread,
1273 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1274 minSize: expect.any(Number),
1275 maxSize: expect.any(Number),
1276 workerNodes: expect.any(Number),
1277 idleWorkerNodes: expect.any(Number),
1278 busyWorkerNodes: expect.any(Number),
1279 executedTasks: expect.any(Number),
1280 executingTasks: expect.any(Number),
1281 maxQueuedTasks: expect.any(Number),
1282 queuedTasks: expect.any(Number),
1284 stolenTasks: expect.any(Number),
1285 failedTasks: expect.any(Number)
1287 expect(pool.hasBackPressure.callCount).toBe(5)
1288 await pool.destroy()
1291 it('Verify that destroy() waits for queued tasks to finish', async () => {
1292 const tasksFinishedTimeout = 2500
1293 const pool = new FixedThreadPool(
1295 './tests/worker-files/thread/asyncWorker.mjs',
1297 enableTasksQueue: true,
1298 tasksQueueOptions: { tasksFinishedTimeout }
1301 const maxMultiplier = 4
1302 let tasksFinished = 0
1303 for (const workerNode of pool.workerNodes) {
1304 workerNode.on('taskFinished', () => {
1308 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1311 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1312 const startTime = performance.now()
1313 await pool.destroy()
1314 const elapsedTime = performance.now() - startTime
1315 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1316 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1317 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1320 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1321 const tasksFinishedTimeout = 1000
1322 const pool = new FixedThreadPool(
1324 './tests/worker-files/thread/asyncWorker.mjs',
1326 enableTasksQueue: true,
1327 tasksQueueOptions: { tasksFinishedTimeout }
1330 const maxMultiplier = 4
1331 let tasksFinished = 0
1332 for (const workerNode of pool.workerNodes) {
1333 workerNode.on('taskFinished', () => {
1337 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1340 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1341 const startTime = performance.now()
1342 await pool.destroy()
1343 const elapsedTime = performance.now() - startTime
1344 expect(tasksFinished).toBe(0)
1345 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1348 it('Verify that pool asynchronous resource track tasks execution', async () => {
1353 let resolveCalls = 0
1354 const hook = createHook({
1355 init (asyncId, type) {
1356 if (type === 'poolifier:task') {
1358 taskAsyncId = asyncId
1362 if (asyncId === taskAsyncId) beforeCalls++
1365 if (asyncId === taskAsyncId) afterCalls++
1368 if (executionAsyncId() === taskAsyncId) resolveCalls++
1371 const pool = new FixedThreadPool(
1373 './tests/worker-files/thread/testWorker.mjs'
1376 await pool.execute()
1378 expect(initCalls).toBe(1)
1379 expect(beforeCalls).toBe(1)
1380 expect(afterCalls).toBe(1)
1381 expect(resolveCalls).toBe(1)
1382 await pool.destroy()
1385 it('Verify that hasTaskFunction() is working', async () => {
1386 const dynamicThreadPool = new DynamicThreadPool(
1387 Math.floor(numberOfWorkers / 2),
1389 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1391 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1392 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1393 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1396 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1397 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1398 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1399 await dynamicThreadPool.destroy()
1400 const fixedClusterPool = new FixedClusterPool(
1402 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1404 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1405 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1406 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1409 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1410 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1411 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1412 await fixedClusterPool.destroy()
1415 it('Verify that addTaskFunction() is working', async () => {
1416 const dynamicThreadPool = new DynamicThreadPool(
1417 Math.floor(numberOfWorkers / 2),
1419 './tests/worker-files/thread/testWorker.mjs'
1421 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1423 dynamicThreadPool.addTaskFunction(0, () => {})
1424 ).rejects.toThrow(new TypeError('name argument must be a string'))
1426 dynamicThreadPool.addTaskFunction('', () => {})
1428 new TypeError('name argument must not be an empty string')
1430 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1431 new TypeError('fn argument must be a function')
1433 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1434 new TypeError('fn argument must be a function')
1436 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1440 const echoTaskFunction = data => {
1444 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1447 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1450 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1455 const taskFunctionData = { test: 'test' }
1456 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1457 expect(echoResult).toStrictEqual(taskFunctionData)
1458 for (const workerNode of dynamicThreadPool.workerNodes) {
1459 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1461 executed: expect.any(Number),
1464 sequentiallyStolen: 0,
1469 history: new CircularArray()
1472 history: new CircularArray()
1476 history: new CircularArray()
1479 history: new CircularArray()
1484 await dynamicThreadPool.destroy()
1487 it('Verify that removeTaskFunction() is working', async () => {
1488 const dynamicThreadPool = new DynamicThreadPool(
1489 Math.floor(numberOfWorkers / 2),
1491 './tests/worker-files/thread/testWorker.mjs'
1493 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1494 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1498 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1499 new Error('Cannot remove a task function not handled on the pool side')
1501 const echoTaskFunction = data => {
1504 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1505 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1506 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1509 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1514 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1517 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1518 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1519 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1523 await dynamicThreadPool.destroy()
1526 it('Verify that listTaskFunctionNames() is working', async () => {
1527 const dynamicThreadPool = new DynamicThreadPool(
1528 Math.floor(numberOfWorkers / 2),
1530 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1532 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1533 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1535 'jsonIntegerSerialization',
1539 await dynamicThreadPool.destroy()
1540 const fixedClusterPool = new FixedClusterPool(
1542 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1544 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1545 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1547 'jsonIntegerSerialization',
1551 await fixedClusterPool.destroy()
1554 it('Verify that setDefaultTaskFunction() is working', async () => {
1555 const dynamicThreadPool = new DynamicThreadPool(
1556 Math.floor(numberOfWorkers / 2),
1558 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1560 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1561 const workerId = dynamicThreadPool.workerNodes[0].info.id
1562 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1564 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1568 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1571 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1575 dynamicThreadPool.setDefaultTaskFunction('unknown')
1578 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1581 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1583 'jsonIntegerSerialization',
1588 dynamicThreadPool.setDefaultTaskFunction('factorial')
1589 ).resolves.toBe(true)
1590 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1593 'jsonIntegerSerialization',
1597 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1598 ).resolves.toBe(true)
1599 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1602 'jsonIntegerSerialization',
1605 await dynamicThreadPool.destroy()
1608 it('Verify that multiple task functions worker is working', async () => {
1609 const pool = new DynamicClusterPool(
1610 Math.floor(numberOfWorkers / 2),
1612 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1614 const data = { n: 10 }
1615 const result0 = await pool.execute(data)
1616 expect(result0).toStrictEqual({ ok: 1 })
1617 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1618 expect(result1).toStrictEqual({ ok: 1 })
1619 const result2 = await pool.execute(data, 'factorial')
1620 expect(result2).toBe(3628800)
1621 const result3 = await pool.execute(data, 'fibonacci')
1622 expect(result3).toBe(55)
1623 expect(pool.info.executingTasks).toBe(0)
1624 expect(pool.info.executedTasks).toBe(4)
1625 for (const workerNode of pool.workerNodes) {
1626 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1628 'jsonIntegerSerialization',
1632 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1633 for (const name of pool.listTaskFunctionNames()) {
1634 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1636 executed: expect.any(Number),
1640 sequentiallyStolen: 0,
1644 history: expect.any(CircularArray)
1647 history: expect.any(CircularArray)
1651 history: expect.any(CircularArray)
1654 history: expect.any(CircularArray)
1659 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1660 ).toBeGreaterThan(0)
1663 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1665 workerNode.getTaskFunctionWorkerUsage(
1666 workerNode.info.taskFunctionNames[1]
1670 await pool.destroy()
1673 it('Verify sendKillMessageToWorker()', async () => {
1674 const pool = new DynamicClusterPool(
1675 Math.floor(numberOfWorkers / 2),
1677 './tests/worker-files/cluster/testWorker.js'
1679 const workerNodeKey = 0
1681 pool.sendKillMessageToWorker(workerNodeKey)
1682 ).resolves.toBeUndefined()
1684 pool.sendKillMessageToWorker(numberOfWorkers)
1685 ).rejects.toStrictEqual(
1686 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1688 await pool.destroy()
1691 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1692 const pool = new DynamicClusterPool(
1693 Math.floor(numberOfWorkers / 2),
1695 './tests/worker-files/cluster/testWorker.js'
1697 const workerNodeKey = 0
1699 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1700 taskFunctionOperation: 'add',
1701 taskFunctionName: 'empty',
1702 taskFunction: (() => {}).toString()
1704 ).resolves.toBe(true)
1706 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1707 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1708 await pool.destroy()
1711 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1712 const pool = new DynamicClusterPool(
1713 Math.floor(numberOfWorkers / 2),
1715 './tests/worker-files/cluster/testWorker.js'
1718 pool.sendTaskFunctionOperationToWorkers({
1719 taskFunctionOperation: 'add',
1720 taskFunctionName: 'empty',
1721 taskFunction: (() => {}).toString()
1723 ).resolves.toBe(true)
1724 for (const workerNode of pool.workerNodes) {
1725 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1731 await pool.destroy()