1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.mjs',
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
289 size: Math.pow(numberOfWorkers, 2),
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
328 it('Verify that pool options are validated', () => {
333 './tests/worker-files/thread/testWorker.mjs',
335 workerChoiceStrategy: 'invalidStrategy'
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
343 './tests/worker-files/thread/testWorker.mjs',
345 workerChoiceStrategyOptions: { weights: {} }
350 'Invalid worker choice strategy options: must have a weight for each worker node'
357 './tests/worker-files/thread/testWorker.mjs',
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 './tests/worker-files/thread/testWorker.mjs',
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
378 new TypeError('Invalid tasks queue options: must be a plain object')
384 './tests/worker-files/thread/testWorker.mjs',
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
399 './tests/worker-files/thread/testWorker.mjs',
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 './tests/worker-files/thread/testWorker.mjs',
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
427 './tests/worker-files/thread/testWorker.mjs',
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 './tests/worker-files/thread/testWorker.mjs',
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 './tests/worker-files/thread/testWorker.mjs',
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 // weights: expect.objectContaining({
498 // 0: expect.any(Number),
499 // [pool.info.maxSize - 1]: expect.any(Number)
505 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
523 pool.setWorkerChoiceStrategyOptions({
524 runTime: { median: true },
525 elu: { median: true }
527 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
528 runTime: { median: true },
529 elu: { median: true }
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
534 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
535 runTime: { median: true },
536 waitTime: { median: false },
537 elu: { median: true },
538 weights: expect.objectContaining({
539 0: expect.any(Number),
540 [pool.info.maxSize - 1]: expect.any(Number)
543 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
544 .workerChoiceStrategies) {
545 expect(workerChoiceStrategy.opts).toStrictEqual({
548 Object.keys(workerChoiceStrategy.opts.weights).length,
549 runTime: { median: true },
550 waitTime: { median: false },
551 elu: { median: true },
552 weights: expect.objectContaining({
553 0: expect.any(Number),
554 [pool.info.maxSize - 1]: expect.any(Number)
559 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
577 pool.setWorkerChoiceStrategyOptions({
578 runTime: { median: false },
579 elu: { median: false }
581 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
582 runTime: { median: false },
583 elu: { median: false }
585 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
588 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
589 runTime: { median: false },
590 waitTime: { median: false },
591 elu: { median: false },
592 weights: expect.objectContaining({
593 0: expect.any(Number),
594 [pool.info.maxSize - 1]: expect.any(Number)
597 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
598 .workerChoiceStrategies) {
599 expect(workerChoiceStrategy.opts).toStrictEqual({
602 Object.keys(workerChoiceStrategy.opts.weights).length,
603 runTime: { median: false },
604 waitTime: { median: false },
605 elu: { median: false },
606 weights: expect.objectContaining({
607 0: expect.any(Number),
608 [pool.info.maxSize - 1]: expect.any(Number)
613 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
632 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
635 'Invalid worker choice strategy options: must be a plain object'
638 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
640 'Invalid worker choice strategy options: must have a weight for each worker node'
644 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
647 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
653 it('Verify that pool tasks queue can be enabled/disabled', async () => {
654 const pool = new FixedThreadPool(
656 './tests/worker-files/thread/testWorker.mjs'
658 expect(pool.opts.enableTasksQueue).toBe(false)
659 expect(pool.opts.tasksQueueOptions).toBeUndefined()
660 pool.enableTasksQueue(true)
661 expect(pool.opts.enableTasksQueue).toBe(true)
662 expect(pool.opts.tasksQueueOptions).toStrictEqual({
664 size: Math.pow(numberOfWorkers, 2),
666 tasksStealingOnBackPressure: true,
667 tasksFinishedTimeout: 2000
669 pool.enableTasksQueue(true, { concurrency: 2 })
670 expect(pool.opts.enableTasksQueue).toBe(true)
671 expect(pool.opts.tasksQueueOptions).toStrictEqual({
673 size: Math.pow(numberOfWorkers, 2),
675 tasksStealingOnBackPressure: true,
676 tasksFinishedTimeout: 2000
678 pool.enableTasksQueue(false)
679 expect(pool.opts.enableTasksQueue).toBe(false)
680 expect(pool.opts.tasksQueueOptions).toBeUndefined()
684 it('Verify that pool tasks queue options can be set', async () => {
685 const pool = new FixedThreadPool(
687 './tests/worker-files/thread/testWorker.mjs',
688 { enableTasksQueue: true }
690 expect(pool.opts.tasksQueueOptions).toStrictEqual({
692 size: Math.pow(numberOfWorkers, 2),
694 tasksStealingOnBackPressure: true,
695 tasksFinishedTimeout: 2000
697 for (const workerNode of pool.workerNodes) {
698 expect(workerNode.tasksQueueBackPressureSize).toBe(
699 pool.opts.tasksQueueOptions.size
702 pool.setTasksQueueOptions({
706 tasksStealingOnBackPressure: false,
707 tasksFinishedTimeout: 3000
709 expect(pool.opts.tasksQueueOptions).toStrictEqual({
713 tasksStealingOnBackPressure: false,
714 tasksFinishedTimeout: 3000
716 for (const workerNode of pool.workerNodes) {
717 expect(workerNode.tasksQueueBackPressureSize).toBe(
718 pool.opts.tasksQueueOptions.size
721 pool.setTasksQueueOptions({
724 tasksStealingOnBackPressure: true
726 expect(pool.opts.tasksQueueOptions).toStrictEqual({
728 size: Math.pow(numberOfWorkers, 2),
730 tasksStealingOnBackPressure: true,
731 tasksFinishedTimeout: 2000
733 for (const workerNode of pool.workerNodes) {
734 expect(workerNode.tasksQueueBackPressureSize).toBe(
735 pool.opts.tasksQueueOptions.size
738 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
739 new TypeError('Invalid tasks queue options: must be a plain object')
741 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
743 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
746 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
748 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
751 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
752 new TypeError('Invalid worker node tasks concurrency: must be an integer')
754 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
756 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
759 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
761 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
764 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
765 new TypeError('Invalid worker node tasks queue size: must be an integer')
770 it('Verify that pool info is set', async () => {
771 let pool = new FixedThreadPool(
773 './tests/worker-files/thread/testWorker.mjs'
775 expect(pool.info).toStrictEqual({
777 type: PoolTypes.fixed,
778 worker: WorkerTypes.thread,
781 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
782 minSize: numberOfWorkers,
783 maxSize: numberOfWorkers,
784 workerNodes: numberOfWorkers,
785 idleWorkerNodes: numberOfWorkers,
792 pool = new DynamicClusterPool(
793 Math.floor(numberOfWorkers / 2),
795 './tests/worker-files/cluster/testWorker.js'
797 expect(pool.info).toStrictEqual({
799 type: PoolTypes.dynamic,
800 worker: WorkerTypes.cluster,
803 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
804 minSize: Math.floor(numberOfWorkers / 2),
805 maxSize: numberOfWorkers,
806 workerNodes: Math.floor(numberOfWorkers / 2),
807 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
816 it('Verify that pool worker tasks usage are initialized', async () => {
817 const pool = new FixedClusterPool(
819 './tests/worker-files/cluster/testWorker.js'
821 for (const workerNode of pool.workerNodes) {
822 expect(workerNode).toBeInstanceOf(WorkerNode)
823 expect(workerNode.usage).toStrictEqual({
829 sequentiallyStolen: 0,
834 history: new CircularArray()
837 history: new CircularArray()
841 history: new CircularArray()
844 history: new CircularArray()
852 it('Verify that pool worker tasks queue are initialized', async () => {
853 let pool = new FixedClusterPool(
855 './tests/worker-files/cluster/testWorker.js'
857 for (const workerNode of pool.workerNodes) {
858 expect(workerNode).toBeInstanceOf(WorkerNode)
859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
860 expect(workerNode.tasksQueue.size).toBe(0)
861 expect(workerNode.tasksQueue.maxSize).toBe(0)
864 pool = new DynamicThreadPool(
865 Math.floor(numberOfWorkers / 2),
867 './tests/worker-files/thread/testWorker.mjs'
869 for (const workerNode of pool.workerNodes) {
870 expect(workerNode).toBeInstanceOf(WorkerNode)
871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
872 expect(workerNode.tasksQueue.size).toBe(0)
873 expect(workerNode.tasksQueue.maxSize).toBe(0)
878 it('Verify that pool worker info are initialized', async () => {
879 let pool = new FixedClusterPool(
881 './tests/worker-files/cluster/testWorker.js'
883 for (const workerNode of pool.workerNodes) {
884 expect(workerNode).toBeInstanceOf(WorkerNode)
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.cluster,
893 pool = new DynamicThreadPool(
894 Math.floor(numberOfWorkers / 2),
896 './tests/worker-files/thread/testWorker.mjs'
898 for (const workerNode of pool.workerNodes) {
899 expect(workerNode).toBeInstanceOf(WorkerNode)
900 expect(workerNode.info).toStrictEqual({
901 id: expect.any(Number),
902 type: WorkerTypes.thread,
910 it('Verify that pool statuses are checked at start or destroy', async () => {
911 const pool = new FixedThreadPool(
913 './tests/worker-files/thread/testWorker.mjs'
915 expect(pool.info.started).toBe(true)
916 expect(pool.info.ready).toBe(true)
917 expect(() => pool.start()).toThrow(
918 new Error('Cannot start an already started pool')
921 expect(pool.info.started).toBe(false)
922 expect(pool.info.ready).toBe(false)
923 await expect(pool.destroy()).rejects.toThrow(
924 new Error('Cannot destroy an already destroyed pool')
928 it('Verify that pool can be started after initialization', async () => {
929 const pool = new FixedClusterPool(
931 './tests/worker-files/cluster/testWorker.js',
936 expect(pool.info.started).toBe(false)
937 expect(pool.info.ready).toBe(false)
938 expect(pool.readyEventEmitted).toBe(false)
939 expect(pool.workerNodes).toStrictEqual([])
940 await expect(pool.execute()).rejects.toThrow(
941 new Error('Cannot execute a task on not started pool')
944 expect(pool.info.started).toBe(true)
945 expect(pool.info.ready).toBe(true)
946 await waitPoolEvents(pool, PoolEvents.ready, 1)
947 expect(pool.readyEventEmitted).toBe(true)
948 expect(pool.workerNodes.length).toBe(numberOfWorkers)
949 for (const workerNode of pool.workerNodes) {
950 expect(workerNode).toBeInstanceOf(WorkerNode)
955 it('Verify that pool execute() arguments are checked', async () => {
956 const pool = new FixedClusterPool(
958 './tests/worker-files/cluster/testWorker.js'
960 await expect(pool.execute(undefined, 0)).rejects.toThrow(
961 new TypeError('name argument must be a string')
963 await expect(pool.execute(undefined, '')).rejects.toThrow(
964 new TypeError('name argument must not be an empty string')
966 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
967 new TypeError('transferList argument must be an array')
969 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
970 "Task function 'unknown' not found"
973 await expect(pool.execute()).rejects.toThrow(
974 new Error('Cannot execute a task on not started pool')
978 it('Verify that pool worker tasks usage are computed', async () => {
979 const pool = new FixedClusterPool(
981 './tests/worker-files/cluster/testWorker.js'
983 const promises = new Set()
984 const maxMultiplier = 2
985 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
986 promises.add(pool.execute())
988 for (const workerNode of pool.workerNodes) {
989 expect(workerNode.usage).toStrictEqual({
992 executing: maxMultiplier,
995 sequentiallyStolen: 0,
1000 history: expect.any(CircularArray)
1003 history: expect.any(CircularArray)
1007 history: expect.any(CircularArray)
1010 history: expect.any(CircularArray)
1015 await Promise.all(promises)
1016 for (const workerNode of pool.workerNodes) {
1017 expect(workerNode.usage).toStrictEqual({
1019 executed: maxMultiplier,
1023 sequentiallyStolen: 0,
1028 history: expect.any(CircularArray)
1031 history: expect.any(CircularArray)
1035 history: expect.any(CircularArray)
1038 history: expect.any(CircularArray)
1043 await pool.destroy()
1046 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1047 const pool = new DynamicThreadPool(
1048 Math.floor(numberOfWorkers / 2),
1050 './tests/worker-files/thread/testWorker.mjs'
1052 const promises = new Set()
1053 const maxMultiplier = 2
1054 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1055 promises.add(pool.execute())
1057 await Promise.all(promises)
1058 for (const workerNode of pool.workerNodes) {
1059 expect(workerNode.usage).toStrictEqual({
1061 executed: expect.any(Number),
1065 sequentiallyStolen: 0,
1070 history: expect.any(CircularArray)
1073 history: expect.any(CircularArray)
1077 history: expect.any(CircularArray)
1080 history: expect.any(CircularArray)
1084 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1085 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1086 numberOfWorkers * maxMultiplier
1088 expect(workerNode.usage.runTime.history.length).toBe(0)
1089 expect(workerNode.usage.waitTime.history.length).toBe(0)
1090 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1091 expect(workerNode.usage.elu.active.history.length).toBe(0)
1093 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1094 for (const workerNode of pool.workerNodes) {
1095 expect(workerNode.usage).toStrictEqual({
1101 sequentiallyStolen: 0,
1106 history: expect.any(CircularArray)
1109 history: expect.any(CircularArray)
1113 history: expect.any(CircularArray)
1116 history: expect.any(CircularArray)
1120 expect(workerNode.usage.runTime.history.length).toBe(0)
1121 expect(workerNode.usage.waitTime.history.length).toBe(0)
1122 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1123 expect(workerNode.usage.elu.active.history.length).toBe(0)
1125 await pool.destroy()
1128 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1129 const pool = new DynamicClusterPool(
1130 Math.floor(numberOfWorkers / 2),
1132 './tests/worker-files/cluster/testWorker.js'
1134 expect(pool.emitter.eventNames()).toStrictEqual([])
1137 pool.emitter.on(PoolEvents.ready, info => {
1141 await waitPoolEvents(pool, PoolEvents.ready, 1)
1142 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1143 expect(poolReady).toBe(1)
1144 expect(poolInfo).toStrictEqual({
1146 type: PoolTypes.dynamic,
1147 worker: WorkerTypes.cluster,
1150 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1151 minSize: expect.any(Number),
1152 maxSize: expect.any(Number),
1153 workerNodes: expect.any(Number),
1154 idleWorkerNodes: expect.any(Number),
1155 busyWorkerNodes: expect.any(Number),
1156 executedTasks: expect.any(Number),
1157 executingTasks: expect.any(Number),
1158 failedTasks: expect.any(Number)
1160 await pool.destroy()
1163 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1164 const pool = new FixedThreadPool(
1166 './tests/worker-files/thread/testWorker.mjs'
1168 expect(pool.emitter.eventNames()).toStrictEqual([])
1169 const promises = new Set()
1172 pool.emitter.on(PoolEvents.busy, info => {
1176 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1177 for (let i = 0; i < numberOfWorkers * 2; i++) {
1178 promises.add(pool.execute())
1180 await Promise.all(promises)
1181 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1182 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1183 expect(poolBusy).toBe(numberOfWorkers + 1)
1184 expect(poolInfo).toStrictEqual({
1186 type: PoolTypes.fixed,
1187 worker: WorkerTypes.thread,
1190 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1191 minSize: expect.any(Number),
1192 maxSize: expect.any(Number),
1193 workerNodes: expect.any(Number),
1194 idleWorkerNodes: expect.any(Number),
1195 busyWorkerNodes: expect.any(Number),
1196 executedTasks: expect.any(Number),
1197 executingTasks: expect.any(Number),
1198 failedTasks: expect.any(Number)
1200 await pool.destroy()
1203 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1204 const pool = new DynamicThreadPool(
1205 Math.floor(numberOfWorkers / 2),
1207 './tests/worker-files/thread/testWorker.mjs'
1209 expect(pool.emitter.eventNames()).toStrictEqual([])
1210 const promises = new Set()
1213 pool.emitter.on(PoolEvents.full, info => {
1217 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1218 for (let i = 0; i < numberOfWorkers * 2; i++) {
1219 promises.add(pool.execute())
1221 await Promise.all(promises)
1222 expect(poolFull).toBe(1)
1223 expect(poolInfo).toStrictEqual({
1225 type: PoolTypes.dynamic,
1226 worker: WorkerTypes.thread,
1229 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1230 minSize: expect.any(Number),
1231 maxSize: expect.any(Number),
1232 workerNodes: expect.any(Number),
1233 idleWorkerNodes: expect.any(Number),
1234 busyWorkerNodes: expect.any(Number),
1235 executedTasks: expect.any(Number),
1236 executingTasks: expect.any(Number),
1237 failedTasks: expect.any(Number)
1239 await pool.destroy()
1242 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1243 const pool = new FixedThreadPool(
1245 './tests/worker-files/thread/testWorker.mjs',
1247 enableTasksQueue: true
1250 stub(pool, 'hasBackPressure').returns(true)
1251 expect(pool.emitter.eventNames()).toStrictEqual([])
1252 const promises = new Set()
1253 let poolBackPressure = 0
1255 pool.emitter.on(PoolEvents.backPressure, info => {
1259 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1260 for (let i = 0; i < numberOfWorkers + 1; i++) {
1261 promises.add(pool.execute())
1263 await Promise.all(promises)
1264 expect(poolBackPressure).toBe(1)
1265 expect(poolInfo).toStrictEqual({
1267 type: PoolTypes.fixed,
1268 worker: WorkerTypes.thread,
1271 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1272 minSize: expect.any(Number),
1273 maxSize: expect.any(Number),
1274 workerNodes: expect.any(Number),
1275 idleWorkerNodes: expect.any(Number),
1276 busyWorkerNodes: expect.any(Number),
1277 executedTasks: expect.any(Number),
1278 executingTasks: expect.any(Number),
1279 maxQueuedTasks: expect.any(Number),
1280 queuedTasks: expect.any(Number),
1282 stolenTasks: expect.any(Number),
1283 failedTasks: expect.any(Number)
1285 expect(pool.hasBackPressure.callCount).toBe(5)
1286 await pool.destroy()
1289 it('Verify that destroy() waits for queued tasks to finish', async () => {
1290 const tasksFinishedTimeout = 2500
1291 const pool = new FixedThreadPool(
1293 './tests/worker-files/thread/asyncWorker.mjs',
1295 enableTasksQueue: true,
1296 tasksQueueOptions: { tasksFinishedTimeout }
1299 const maxMultiplier = 4
1300 let tasksFinished = 0
1301 for (const workerNode of pool.workerNodes) {
1302 workerNode.on('taskFinished', () => {
1306 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1309 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1310 const startTime = performance.now()
1311 await pool.destroy()
1312 const elapsedTime = performance.now() - startTime
1313 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1314 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1315 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1318 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1319 const tasksFinishedTimeout = 1000
1320 const pool = new FixedThreadPool(
1322 './tests/worker-files/thread/asyncWorker.mjs',
1324 enableTasksQueue: true,
1325 tasksQueueOptions: { tasksFinishedTimeout }
1328 const maxMultiplier = 4
1329 let tasksFinished = 0
1330 for (const workerNode of pool.workerNodes) {
1331 workerNode.on('taskFinished', () => {
1335 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1338 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1339 const startTime = performance.now()
1340 await pool.destroy()
1341 const elapsedTime = performance.now() - startTime
1342 expect(tasksFinished).toBe(0)
1343 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1346 it('Verify that pool asynchronous resource track tasks execution', async () => {
1351 let resolveCalls = 0
1352 const hook = createHook({
1353 init (asyncId, type) {
1354 if (type === 'poolifier:task') {
1356 taskAsyncId = asyncId
1360 if (asyncId === taskAsyncId) beforeCalls++
1363 if (asyncId === taskAsyncId) afterCalls++
1366 if (executionAsyncId() === taskAsyncId) resolveCalls++
1369 const pool = new FixedThreadPool(
1371 './tests/worker-files/thread/testWorker.mjs'
1374 await pool.execute()
1376 expect(initCalls).toBe(1)
1377 expect(beforeCalls).toBe(1)
1378 expect(afterCalls).toBe(1)
1379 expect(resolveCalls).toBe(1)
1380 await pool.destroy()
1383 it('Verify that hasTaskFunction() is working', async () => {
1384 const dynamicThreadPool = new DynamicThreadPool(
1385 Math.floor(numberOfWorkers / 2),
1387 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1389 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1390 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1391 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1394 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1395 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1396 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1397 await dynamicThreadPool.destroy()
1398 const fixedClusterPool = new FixedClusterPool(
1400 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1402 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1403 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1404 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1407 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1408 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1409 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1410 await fixedClusterPool.destroy()
1413 it('Verify that addTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1417 './tests/worker-files/thread/testWorker.mjs'
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1421 dynamicThreadPool.addTaskFunction(0, () => {})
1422 ).rejects.toThrow(new TypeError('name argument must be a string'))
1424 dynamicThreadPool.addTaskFunction('', () => {})
1426 new TypeError('name argument must not be an empty string')
1428 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1429 new TypeError('fn argument must be a function')
1431 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1432 new TypeError('fn argument must be a function')
1434 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1438 const echoTaskFunction = data => {
1442 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1443 ).resolves.toBe(true)
1444 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1445 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1448 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1453 const taskFunctionData = { test: 'test' }
1454 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1455 expect(echoResult).toStrictEqual(taskFunctionData)
1456 for (const workerNode of dynamicThreadPool.workerNodes) {
1457 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1459 executed: expect.any(Number),
1462 sequentiallyStolen: 0,
1467 history: new CircularArray()
1470 history: new CircularArray()
1474 history: new CircularArray()
1477 history: new CircularArray()
1482 await dynamicThreadPool.destroy()
1485 it('Verify that removeTaskFunction() is working', async () => {
1486 const dynamicThreadPool = new DynamicThreadPool(
1487 Math.floor(numberOfWorkers / 2),
1489 './tests/worker-files/thread/testWorker.mjs'
1491 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1492 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1496 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1497 new Error('Cannot remove a task function not handled on the pool side')
1499 const echoTaskFunction = data => {
1502 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1503 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1504 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1512 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1515 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1516 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1517 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1521 await dynamicThreadPool.destroy()
1524 it('Verify that listTaskFunctionNames() is working', async () => {
1525 const dynamicThreadPool = new DynamicThreadPool(
1526 Math.floor(numberOfWorkers / 2),
1528 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1530 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1531 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1533 'jsonIntegerSerialization',
1537 await dynamicThreadPool.destroy()
1538 const fixedClusterPool = new FixedClusterPool(
1540 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1542 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1543 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1545 'jsonIntegerSerialization',
1549 await fixedClusterPool.destroy()
1552 it('Verify that setDefaultTaskFunction() is working', async () => {
1553 const dynamicThreadPool = new DynamicThreadPool(
1554 Math.floor(numberOfWorkers / 2),
1556 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1558 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1559 const workerId = dynamicThreadPool.workerNodes[0].info.id
1560 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1562 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1566 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1569 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1573 dynamicThreadPool.setDefaultTaskFunction('unknown')
1576 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1579 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1581 'jsonIntegerSerialization',
1586 dynamicThreadPool.setDefaultTaskFunction('factorial')
1587 ).resolves.toBe(true)
1588 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1591 'jsonIntegerSerialization',
1595 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1596 ).resolves.toBe(true)
1597 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1600 'jsonIntegerSerialization',
1603 await dynamicThreadPool.destroy()
1606 it('Verify that multiple task functions worker is working', async () => {
1607 const pool = new DynamicClusterPool(
1608 Math.floor(numberOfWorkers / 2),
1610 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1612 const data = { n: 10 }
1613 const result0 = await pool.execute(data)
1614 expect(result0).toStrictEqual({ ok: 1 })
1615 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1616 expect(result1).toStrictEqual({ ok: 1 })
1617 const result2 = await pool.execute(data, 'factorial')
1618 expect(result2).toBe(3628800)
1619 const result3 = await pool.execute(data, 'fibonacci')
1620 expect(result3).toBe(55)
1621 expect(pool.info.executingTasks).toBe(0)
1622 expect(pool.info.executedTasks).toBe(4)
1623 for (const workerNode of pool.workerNodes) {
1624 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1626 'jsonIntegerSerialization',
1630 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1631 for (const name of pool.listTaskFunctionNames()) {
1632 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1634 executed: expect.any(Number),
1638 sequentiallyStolen: 0,
1642 history: expect.any(CircularArray)
1645 history: expect.any(CircularArray)
1649 history: expect.any(CircularArray)
1652 history: expect.any(CircularArray)
1657 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1658 ).toBeGreaterThan(0)
1661 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1663 workerNode.getTaskFunctionWorkerUsage(
1664 workerNode.info.taskFunctionNames[1]
1668 await pool.destroy()
1671 it('Verify sendKillMessageToWorker()', async () => {
1672 const pool = new DynamicClusterPool(
1673 Math.floor(numberOfWorkers / 2),
1675 './tests/worker-files/cluster/testWorker.js'
1677 const workerNodeKey = 0
1679 pool.sendKillMessageToWorker(workerNodeKey)
1680 ).resolves.toBeUndefined()
1682 pool.sendKillMessageToWorker(numberOfWorkers)
1683 ).rejects.toStrictEqual(
1684 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1686 await pool.destroy()
1689 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1690 const pool = new DynamicClusterPool(
1691 Math.floor(numberOfWorkers / 2),
1693 './tests/worker-files/cluster/testWorker.js'
1695 const workerNodeKey = 0
1697 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1698 taskFunctionOperation: 'add',
1699 taskFunctionName: 'empty',
1700 taskFunction: (() => {}).toString()
1702 ).resolves.toBe(true)
1704 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1705 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1706 await pool.destroy()
1709 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1710 const pool = new DynamicClusterPool(
1711 Math.floor(numberOfWorkers / 2),
1713 './tests/worker-files/cluster/testWorker.js'
1716 pool.sendTaskFunctionOperationToWorkers({
1717 taskFunctionOperation: 'add',
1718 taskFunctionName: 'empty',
1719 taskFunction: (() => {}).toString()
1721 ).resolves.toBe(true)
1722 for (const workerNode of pool.workerNodes) {
1723 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1729 await pool.destroy()