1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual({
248 Object.keys(workerChoiceStrategy.opts.weights).length,
249 runTime: { median: false },
250 waitTime: { median: false },
251 elu: { median: false },
252 weights: expect.objectContaining({
253 0: expect.any(Number),
254 [pool.info.maxSize - 1]: expect.any(Number)
259 const testHandler = () => console.info('test handler executed')
260 pool = new FixedThreadPool(
262 './tests/worker-files/thread/testWorker.mjs',
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
266 runTime: { median: true },
267 weights: { 0: 300, 1: 200 }
270 restartWorkerOnError: false,
271 enableTasksQueue: true,
272 tasksQueueOptions: { concurrency: 2 },
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 onlineHandler: testHandler,
276 exitHandler: testHandler
279 expect(pool.emitter).toBeUndefined()
280 expect(pool.opts).toStrictEqual({
283 restartWorkerOnError: false,
284 enableTasksQueue: true,
287 size: Math.pow(numberOfWorkers, 2),
289 tasksStealingOnBackPressure: true,
290 tasksFinishedTimeout: 2000
292 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
293 workerChoiceStrategyOptions: {
294 runTime: { median: true },
295 weights: { 0: 300, 1: 200 }
297 onlineHandler: testHandler,
298 messageHandler: testHandler,
299 errorHandler: testHandler,
300 exitHandler: testHandler
302 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
305 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
306 runTime: { median: true },
307 waitTime: { median: false },
308 elu: { median: false },
309 weights: { 0: 300, 1: 200 }
311 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
312 .workerChoiceStrategies) {
313 expect(workerChoiceStrategy.opts).toStrictEqual({
316 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
317 runTime: { median: true },
318 waitTime: { median: false },
319 elu: { median: false },
320 weights: { 0: 300, 1: 200 }
326 it('Verify that pool options are validated', () => {
331 './tests/worker-files/thread/testWorker.mjs',
333 workerChoiceStrategy: 'invalidStrategy'
336 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
341 './tests/worker-files/thread/testWorker.mjs',
343 workerChoiceStrategyOptions: { weights: {} }
348 'Invalid worker choice strategy options: must have a weight for each worker node'
355 './tests/worker-files/thread/testWorker.mjs',
357 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
362 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
369 './tests/worker-files/thread/testWorker.mjs',
371 enableTasksQueue: true,
372 tasksQueueOptions: 'invalidTasksQueueOptions'
376 new TypeError('Invalid tasks queue options: must be a plain object')
382 './tests/worker-files/thread/testWorker.mjs',
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: 0 }
390 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
397 './tests/worker-files/thread/testWorker.mjs',
399 enableTasksQueue: true,
400 tasksQueueOptions: { concurrency: -1 }
405 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
412 './tests/worker-files/thread/testWorker.mjs',
414 enableTasksQueue: true,
415 tasksQueueOptions: { concurrency: 0.2 }
419 new TypeError('Invalid worker node tasks concurrency: must be an integer')
425 './tests/worker-files/thread/testWorker.mjs',
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
440 './tests/worker-files/thread/testWorker.mjs',
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
455 './tests/worker-files/thread/testWorker.mjs',
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
466 it('Verify that pool worker choice strategy options can be set', async () => {
467 const pool = new FixedThreadPool(
469 './tests/worker-files/thread/testWorker.mjs',
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
472 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
473 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
476 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false },
480 weights: expect.objectContaining({
481 0: expect.any(Number),
482 [pool.info.maxSize - 1]: expect.any(Number)
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual({
490 Object.keys(workerChoiceStrategy.opts.weights).length,
491 runTime: { median: false },
492 waitTime: { median: false },
493 elu: { median: false },
494 weights: expect.objectContaining({
495 0: expect.any(Number),
496 [pool.info.maxSize - 1]: expect.any(Number)
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: true },
521 elu: { median: true }
523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
524 runTime: { median: true },
525 elu: { median: true }
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
530 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
531 runTime: { median: true },
532 waitTime: { median: false },
533 elu: { median: true },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
539 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
540 .workerChoiceStrategies) {
541 expect(workerChoiceStrategy.opts).toStrictEqual({
544 Object.keys(workerChoiceStrategy.opts.weights).length,
545 runTime: { median: true },
546 waitTime: { median: false },
547 elu: { median: true },
548 weights: expect.objectContaining({
549 0: expect.any(Number),
550 [pool.info.maxSize - 1]: expect.any(Number)
555 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
573 pool.setWorkerChoiceStrategyOptions({
574 runTime: { median: false },
575 elu: { median: false }
577 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
578 runTime: { median: false },
579 elu: { median: false }
581 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
584 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
585 runTime: { median: false },
586 waitTime: { median: false },
587 elu: { median: false },
588 weights: expect.objectContaining({
589 0: expect.any(Number),
590 [pool.info.maxSize - 1]: expect.any(Number)
593 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
594 .workerChoiceStrategies) {
595 expect(workerChoiceStrategy.opts).toStrictEqual({
598 Object.keys(workerChoiceStrategy.opts.weights).length,
599 runTime: { median: false },
600 waitTime: { median: false },
601 elu: { median: false },
602 weights: expect.objectContaining({
603 0: expect.any(Number),
604 [pool.info.maxSize - 1]: expect.any(Number)
609 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
628 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
631 'Invalid worker choice strategy options: must be a plain object'
634 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
636 'Invalid worker choice strategy options: must have a weight for each worker node'
640 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
643 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
649 it('Verify that pool tasks queue can be enabled/disabled', async () => {
650 const pool = new FixedThreadPool(
652 './tests/worker-files/thread/testWorker.mjs'
654 expect(pool.opts.enableTasksQueue).toBe(false)
655 expect(pool.opts.tasksQueueOptions).toBeUndefined()
656 pool.enableTasksQueue(true)
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
660 size: Math.pow(numberOfWorkers, 2),
662 tasksStealingOnBackPressure: true,
663 tasksFinishedTimeout: 2000
665 pool.enableTasksQueue(true, { concurrency: 2 })
666 expect(pool.opts.enableTasksQueue).toBe(true)
667 expect(pool.opts.tasksQueueOptions).toStrictEqual({
669 size: Math.pow(numberOfWorkers, 2),
671 tasksStealingOnBackPressure: true,
672 tasksFinishedTimeout: 2000
674 pool.enableTasksQueue(false)
675 expect(pool.opts.enableTasksQueue).toBe(false)
676 expect(pool.opts.tasksQueueOptions).toBeUndefined()
680 it('Verify that pool tasks queue options can be set', async () => {
681 const pool = new FixedThreadPool(
683 './tests/worker-files/thread/testWorker.mjs',
684 { enableTasksQueue: true }
686 expect(pool.opts.tasksQueueOptions).toStrictEqual({
688 size: Math.pow(numberOfWorkers, 2),
690 tasksStealingOnBackPressure: true,
691 tasksFinishedTimeout: 2000
693 for (const workerNode of pool.workerNodes) {
694 expect(workerNode.tasksQueueBackPressureSize).toBe(
695 pool.opts.tasksQueueOptions.size
698 pool.setTasksQueueOptions({
702 tasksStealingOnBackPressure: false,
703 tasksFinishedTimeout: 3000
705 expect(pool.opts.tasksQueueOptions).toStrictEqual({
709 tasksStealingOnBackPressure: false,
710 tasksFinishedTimeout: 3000
712 for (const workerNode of pool.workerNodes) {
713 expect(workerNode.tasksQueueBackPressureSize).toBe(
714 pool.opts.tasksQueueOptions.size
717 pool.setTasksQueueOptions({
720 tasksStealingOnBackPressure: true
722 expect(pool.opts.tasksQueueOptions).toStrictEqual({
724 size: Math.pow(numberOfWorkers, 2),
726 tasksStealingOnBackPressure: true,
727 tasksFinishedTimeout: 2000
729 for (const workerNode of pool.workerNodes) {
730 expect(workerNode.tasksQueueBackPressureSize).toBe(
731 pool.opts.tasksQueueOptions.size
734 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
735 new TypeError('Invalid tasks queue options: must be a plain object')
737 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
739 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
742 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
744 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
747 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
748 new TypeError('Invalid worker node tasks concurrency: must be an integer')
750 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
752 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
755 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
757 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
760 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
761 new TypeError('Invalid worker node tasks queue size: must be an integer')
766 it('Verify that pool info is set', async () => {
767 let pool = new FixedThreadPool(
769 './tests/worker-files/thread/testWorker.mjs'
771 expect(pool.info).toStrictEqual({
773 type: PoolTypes.fixed,
774 worker: WorkerTypes.thread,
777 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
778 minSize: numberOfWorkers,
779 maxSize: numberOfWorkers,
780 workerNodes: numberOfWorkers,
781 idleWorkerNodes: numberOfWorkers,
788 pool = new DynamicClusterPool(
789 Math.floor(numberOfWorkers / 2),
791 './tests/worker-files/cluster/testWorker.js'
793 expect(pool.info).toStrictEqual({
795 type: PoolTypes.dynamic,
796 worker: WorkerTypes.cluster,
799 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
800 minSize: Math.floor(numberOfWorkers / 2),
801 maxSize: numberOfWorkers,
802 workerNodes: Math.floor(numberOfWorkers / 2),
803 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
812 it('Verify that pool worker tasks usage are initialized', async () => {
813 const pool = new FixedClusterPool(
815 './tests/worker-files/cluster/testWorker.js'
817 for (const workerNode of pool.workerNodes) {
818 expect(workerNode).toBeInstanceOf(WorkerNode)
819 expect(workerNode.usage).toStrictEqual({
825 sequentiallyStolen: 0,
830 history: new CircularArray()
833 history: new CircularArray()
837 history: new CircularArray()
840 history: new CircularArray()
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
851 './tests/worker-files/cluster/testWorker.js'
853 for (const workerNode of pool.workerNodes) {
854 expect(workerNode).toBeInstanceOf(WorkerNode)
855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
856 expect(workerNode.tasksQueue.size).toBe(0)
857 expect(workerNode.tasksQueue.maxSize).toBe(0)
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
863 './tests/worker-files/thread/testWorker.mjs'
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
877 './tests/worker-files/cluster/testWorker.js'
879 for (const workerNode of pool.workerNodes) {
880 expect(workerNode).toBeInstanceOf(WorkerNode)
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
889 pool = new DynamicThreadPool(
890 Math.floor(numberOfWorkers / 2),
892 './tests/worker-files/thread/testWorker.mjs'
894 for (const workerNode of pool.workerNodes) {
895 expect(workerNode).toBeInstanceOf(WorkerNode)
896 expect(workerNode.info).toStrictEqual({
897 id: expect.any(Number),
898 type: WorkerTypes.thread,
906 it('Verify that pool statuses are checked at start or destroy', async () => {
907 const pool = new FixedThreadPool(
909 './tests/worker-files/thread/testWorker.mjs'
911 expect(pool.info.started).toBe(true)
912 expect(pool.info.ready).toBe(true)
913 expect(() => pool.start()).toThrow(
914 new Error('Cannot start an already started pool')
917 expect(pool.info.started).toBe(false)
918 expect(pool.info.ready).toBe(false)
919 await expect(pool.destroy()).rejects.toThrow(
920 new Error('Cannot destroy an already destroyed pool')
924 it('Verify that pool can be started after initialization', async () => {
925 const pool = new FixedClusterPool(
927 './tests/worker-files/cluster/testWorker.js',
932 expect(pool.info.started).toBe(false)
933 expect(pool.info.ready).toBe(false)
934 expect(pool.readyEventEmitted).toBe(false)
935 expect(pool.workerNodes).toStrictEqual([])
936 await expect(pool.execute()).rejects.toThrow(
937 new Error('Cannot execute a task on not started pool')
940 expect(pool.info.started).toBe(true)
941 expect(pool.info.ready).toBe(true)
942 await waitPoolEvents(pool, PoolEvents.ready, 1)
943 expect(pool.readyEventEmitted).toBe(true)
944 expect(pool.workerNodes.length).toBe(numberOfWorkers)
945 for (const workerNode of pool.workerNodes) {
946 expect(workerNode).toBeInstanceOf(WorkerNode)
951 it('Verify that pool execute() arguments are checked', async () => {
952 const pool = new FixedClusterPool(
954 './tests/worker-files/cluster/testWorker.js'
956 await expect(pool.execute(undefined, 0)).rejects.toThrow(
957 new TypeError('name argument must be a string')
959 await expect(pool.execute(undefined, '')).rejects.toThrow(
960 new TypeError('name argument must not be an empty string')
962 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
963 new TypeError('transferList argument must be an array')
965 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
966 "Task function 'unknown' not found"
969 await expect(pool.execute()).rejects.toThrow(
970 new Error('Cannot execute a task on not started pool')
974 it('Verify that pool worker tasks usage are computed', async () => {
975 const pool = new FixedClusterPool(
977 './tests/worker-files/cluster/testWorker.js'
979 const promises = new Set()
980 const maxMultiplier = 2
981 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
982 promises.add(pool.execute())
984 for (const workerNode of pool.workerNodes) {
985 expect(workerNode.usage).toStrictEqual({
988 executing: maxMultiplier,
991 sequentiallyStolen: 0,
996 history: expect.any(CircularArray)
999 history: expect.any(CircularArray)
1003 history: expect.any(CircularArray)
1006 history: expect.any(CircularArray)
1011 await Promise.all(promises)
1012 for (const workerNode of pool.workerNodes) {
1013 expect(workerNode.usage).toStrictEqual({
1015 executed: maxMultiplier,
1019 sequentiallyStolen: 0,
1024 history: expect.any(CircularArray)
1027 history: expect.any(CircularArray)
1031 history: expect.any(CircularArray)
1034 history: expect.any(CircularArray)
1039 await pool.destroy()
1042 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1043 const pool = new DynamicThreadPool(
1044 Math.floor(numberOfWorkers / 2),
1046 './tests/worker-files/thread/testWorker.mjs'
1048 const promises = new Set()
1049 const maxMultiplier = 2
1050 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1051 promises.add(pool.execute())
1053 await Promise.all(promises)
1054 for (const workerNode of pool.workerNodes) {
1055 expect(workerNode.usage).toStrictEqual({
1057 executed: expect.any(Number),
1061 sequentiallyStolen: 0,
1066 history: expect.any(CircularArray)
1069 history: expect.any(CircularArray)
1073 history: expect.any(CircularArray)
1076 history: expect.any(CircularArray)
1080 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1081 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1082 numberOfWorkers * maxMultiplier
1084 expect(workerNode.usage.runTime.history.length).toBe(0)
1085 expect(workerNode.usage.waitTime.history.length).toBe(0)
1086 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1087 expect(workerNode.usage.elu.active.history.length).toBe(0)
1089 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1090 for (const workerNode of pool.workerNodes) {
1091 expect(workerNode.usage).toStrictEqual({
1097 sequentiallyStolen: 0,
1102 history: expect.any(CircularArray)
1105 history: expect.any(CircularArray)
1109 history: expect.any(CircularArray)
1112 history: expect.any(CircularArray)
1116 expect(workerNode.usage.runTime.history.length).toBe(0)
1117 expect(workerNode.usage.waitTime.history.length).toBe(0)
1118 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1119 expect(workerNode.usage.elu.active.history.length).toBe(0)
1121 await pool.destroy()
1124 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1125 const pool = new DynamicClusterPool(
1126 Math.floor(numberOfWorkers / 2),
1128 './tests/worker-files/cluster/testWorker.js'
1130 expect(pool.emitter.eventNames()).toStrictEqual([])
1133 pool.emitter.on(PoolEvents.ready, info => {
1137 await waitPoolEvents(pool, PoolEvents.ready, 1)
1138 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1139 expect(poolReady).toBe(1)
1140 expect(poolInfo).toStrictEqual({
1142 type: PoolTypes.dynamic,
1143 worker: WorkerTypes.cluster,
1146 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1147 minSize: expect.any(Number),
1148 maxSize: expect.any(Number),
1149 workerNodes: expect.any(Number),
1150 idleWorkerNodes: expect.any(Number),
1151 busyWorkerNodes: expect.any(Number),
1152 executedTasks: expect.any(Number),
1153 executingTasks: expect.any(Number),
1154 failedTasks: expect.any(Number)
1156 await pool.destroy()
1159 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1160 const pool = new FixedThreadPool(
1162 './tests/worker-files/thread/testWorker.mjs'
1164 expect(pool.emitter.eventNames()).toStrictEqual([])
1165 const promises = new Set()
1168 pool.emitter.on(PoolEvents.busy, info => {
1172 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1173 for (let i = 0; i < numberOfWorkers * 2; i++) {
1174 promises.add(pool.execute())
1176 await Promise.all(promises)
1177 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1178 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1179 expect(poolBusy).toBe(numberOfWorkers + 1)
1180 expect(poolInfo).toStrictEqual({
1182 type: PoolTypes.fixed,
1183 worker: WorkerTypes.thread,
1186 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1187 minSize: expect.any(Number),
1188 maxSize: expect.any(Number),
1189 workerNodes: expect.any(Number),
1190 idleWorkerNodes: expect.any(Number),
1191 busyWorkerNodes: expect.any(Number),
1192 executedTasks: expect.any(Number),
1193 executingTasks: expect.any(Number),
1194 failedTasks: expect.any(Number)
1196 await pool.destroy()
1199 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1200 const pool = new DynamicThreadPool(
1201 Math.floor(numberOfWorkers / 2),
1203 './tests/worker-files/thread/testWorker.mjs'
1205 expect(pool.emitter.eventNames()).toStrictEqual([])
1206 const promises = new Set()
1209 pool.emitter.on(PoolEvents.full, info => {
1213 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1214 for (let i = 0; i < numberOfWorkers * 2; i++) {
1215 promises.add(pool.execute())
1217 await Promise.all(promises)
1218 expect(poolFull).toBe(1)
1219 expect(poolInfo).toStrictEqual({
1221 type: PoolTypes.dynamic,
1222 worker: WorkerTypes.thread,
1225 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1226 minSize: expect.any(Number),
1227 maxSize: expect.any(Number),
1228 workerNodes: expect.any(Number),
1229 idleWorkerNodes: expect.any(Number),
1230 busyWorkerNodes: expect.any(Number),
1231 executedTasks: expect.any(Number),
1232 executingTasks: expect.any(Number),
1233 failedTasks: expect.any(Number)
1235 await pool.destroy()
1238 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1239 const pool = new FixedThreadPool(
1241 './tests/worker-files/thread/testWorker.mjs',
1243 enableTasksQueue: true
1246 stub(pool, 'hasBackPressure').returns(true)
1247 expect(pool.emitter.eventNames()).toStrictEqual([])
1248 const promises = new Set()
1249 let poolBackPressure = 0
1251 pool.emitter.on(PoolEvents.backPressure, info => {
1255 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1256 for (let i = 0; i < numberOfWorkers + 1; i++) {
1257 promises.add(pool.execute())
1259 await Promise.all(promises)
1260 expect(poolBackPressure).toBe(1)
1261 expect(poolInfo).toStrictEqual({
1263 type: PoolTypes.fixed,
1264 worker: WorkerTypes.thread,
1267 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1268 minSize: expect.any(Number),
1269 maxSize: expect.any(Number),
1270 workerNodes: expect.any(Number),
1271 idleWorkerNodes: expect.any(Number),
1272 busyWorkerNodes: expect.any(Number),
1273 executedTasks: expect.any(Number),
1274 executingTasks: expect.any(Number),
1275 maxQueuedTasks: expect.any(Number),
1276 queuedTasks: expect.any(Number),
1278 stolenTasks: expect.any(Number),
1279 failedTasks: expect.any(Number)
1281 expect(pool.hasBackPressure.callCount).toBe(5)
1282 await pool.destroy()
1285 it('Verify that destroy() waits for queued tasks to finish', async () => {
1286 const tasksFinishedTimeout = 2500
1287 const pool = new FixedThreadPool(
1289 './tests/worker-files/thread/asyncWorker.mjs',
1291 enableTasksQueue: true,
1292 tasksQueueOptions: { tasksFinishedTimeout }
1295 const maxMultiplier = 4
1296 let tasksFinished = 0
1297 for (const workerNode of pool.workerNodes) {
1298 workerNode.on('taskFinished', () => {
1302 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1305 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1306 const startTime = performance.now()
1307 await pool.destroy()
1308 const elapsedTime = performance.now() - startTime
1309 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1310 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1311 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
1314 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1315 const tasksFinishedTimeout = 1000
1316 const pool = new FixedThreadPool(
1318 './tests/worker-files/thread/asyncWorker.mjs',
1320 enableTasksQueue: true,
1321 tasksQueueOptions: { tasksFinishedTimeout }
1324 const maxMultiplier = 4
1325 let tasksFinished = 0
1326 for (const workerNode of pool.workerNodes) {
1327 workerNode.on('taskFinished', () => {
1331 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1334 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1335 const startTime = performance.now()
1336 await pool.destroy()
1337 const elapsedTime = performance.now() - startTime
1338 expect(tasksFinished).toBe(0)
1339 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1342 it('Verify that pool asynchronous resource track tasks execution', async () => {
1347 let resolveCalls = 0
1348 const hook = createHook({
1349 init (asyncId, type) {
1350 if (type === 'poolifier:task') {
1352 taskAsyncId = asyncId
1356 if (asyncId === taskAsyncId) beforeCalls++
1359 if (asyncId === taskAsyncId) afterCalls++
1362 if (executionAsyncId() === taskAsyncId) resolveCalls++
1365 const pool = new FixedThreadPool(
1367 './tests/worker-files/thread/testWorker.mjs'
1370 await pool.execute()
1372 expect(initCalls).toBe(1)
1373 expect(beforeCalls).toBe(1)
1374 expect(afterCalls).toBe(1)
1375 expect(resolveCalls).toBe(1)
1376 await pool.destroy()
1379 it('Verify that hasTaskFunction() is working', async () => {
1380 const dynamicThreadPool = new DynamicThreadPool(
1381 Math.floor(numberOfWorkers / 2),
1383 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1385 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1386 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1387 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1390 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1391 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1392 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1393 await dynamicThreadPool.destroy()
1394 const fixedClusterPool = new FixedClusterPool(
1396 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1398 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1399 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1400 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1403 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1404 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1405 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1406 await fixedClusterPool.destroy()
1409 it('Verify that addTaskFunction() is working', async () => {
1410 const dynamicThreadPool = new DynamicThreadPool(
1411 Math.floor(numberOfWorkers / 2),
1413 './tests/worker-files/thread/testWorker.mjs'
1415 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1417 dynamicThreadPool.addTaskFunction(0, () => {})
1418 ).rejects.toThrow(new TypeError('name argument must be a string'))
1420 dynamicThreadPool.addTaskFunction('', () => {})
1422 new TypeError('name argument must not be an empty string')
1424 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1425 new TypeError('fn argument must be a function')
1427 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1428 new TypeError('fn argument must be a function')
1430 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1434 const echoTaskFunction = data => {
1438 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1439 ).resolves.toBe(true)
1440 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1441 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1449 const taskFunctionData = { test: 'test' }
1450 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1451 expect(echoResult).toStrictEqual(taskFunctionData)
1452 for (const workerNode of dynamicThreadPool.workerNodes) {
1453 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1455 executed: expect.any(Number),
1458 sequentiallyStolen: 0,
1463 history: new CircularArray()
1466 history: new CircularArray()
1470 history: new CircularArray()
1473 history: new CircularArray()
1478 await dynamicThreadPool.destroy()
1481 it('Verify that removeTaskFunction() is working', async () => {
1482 const dynamicThreadPool = new DynamicThreadPool(
1483 Math.floor(numberOfWorkers / 2),
1485 './tests/worker-files/thread/testWorker.mjs'
1487 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1488 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1492 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1493 new Error('Cannot remove a task function not handled on the pool side')
1495 const echoTaskFunction = data => {
1498 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1499 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1500 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1503 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1511 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1512 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1513 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 await dynamicThreadPool.destroy()
1520 it('Verify that listTaskFunctionNames() is working', async () => {
1521 const dynamicThreadPool = new DynamicThreadPool(
1522 Math.floor(numberOfWorkers / 2),
1524 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1526 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1527 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1529 'jsonIntegerSerialization',
1533 await dynamicThreadPool.destroy()
1534 const fixedClusterPool = new FixedClusterPool(
1536 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1538 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1539 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1541 'jsonIntegerSerialization',
1545 await fixedClusterPool.destroy()
1548 it('Verify that setDefaultTaskFunction() is working', async () => {
1549 const dynamicThreadPool = new DynamicThreadPool(
1550 Math.floor(numberOfWorkers / 2),
1552 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1554 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1555 const workerId = dynamicThreadPool.workerNodes[0].info.id
1556 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1558 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1562 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1565 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1569 dynamicThreadPool.setDefaultTaskFunction('unknown')
1572 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1575 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1577 'jsonIntegerSerialization',
1582 dynamicThreadPool.setDefaultTaskFunction('factorial')
1583 ).resolves.toBe(true)
1584 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1587 'jsonIntegerSerialization',
1591 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1592 ).resolves.toBe(true)
1593 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1596 'jsonIntegerSerialization',
1599 await dynamicThreadPool.destroy()
1602 it('Verify that multiple task functions worker is working', async () => {
1603 const pool = new DynamicClusterPool(
1604 Math.floor(numberOfWorkers / 2),
1606 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1608 const data = { n: 10 }
1609 const result0 = await pool.execute(data)
1610 expect(result0).toStrictEqual({ ok: 1 })
1611 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1612 expect(result1).toStrictEqual({ ok: 1 })
1613 const result2 = await pool.execute(data, 'factorial')
1614 expect(result2).toBe(3628800)
1615 const result3 = await pool.execute(data, 'fibonacci')
1616 expect(result3).toBe(55)
1617 expect(pool.info.executingTasks).toBe(0)
1618 expect(pool.info.executedTasks).toBe(4)
1619 for (const workerNode of pool.workerNodes) {
1620 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1622 'jsonIntegerSerialization',
1626 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1627 for (const name of pool.listTaskFunctionNames()) {
1628 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1630 executed: expect.any(Number),
1634 sequentiallyStolen: 0,
1638 history: expect.any(CircularArray)
1641 history: expect.any(CircularArray)
1645 history: expect.any(CircularArray)
1648 history: expect.any(CircularArray)
1653 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1654 ).toBeGreaterThan(0)
1657 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1659 workerNode.getTaskFunctionWorkerUsage(
1660 workerNode.info.taskFunctionNames[1]
1664 await pool.destroy()
1667 it('Verify sendKillMessageToWorker()', async () => {
1668 const pool = new DynamicClusterPool(
1669 Math.floor(numberOfWorkers / 2),
1671 './tests/worker-files/cluster/testWorker.js'
1673 const workerNodeKey = 0
1675 pool.sendKillMessageToWorker(workerNodeKey)
1676 ).resolves.toBeUndefined()
1677 await pool.destroy()
1680 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1681 const pool = new DynamicClusterPool(
1682 Math.floor(numberOfWorkers / 2),
1684 './tests/worker-files/cluster/testWorker.js'
1686 const workerNodeKey = 0
1688 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1689 taskFunctionOperation: 'add',
1690 taskFunctionName: 'empty',
1691 taskFunction: (() => {}).toString()
1693 ).resolves.toBe(true)
1695 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1696 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1697 await pool.destroy()
1700 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1701 const pool = new DynamicClusterPool(
1702 Math.floor(numberOfWorkers / 2),
1704 './tests/worker-files/cluster/testWorker.js'
1707 pool.sendTaskFunctionOperationToWorkers({
1708 taskFunctionOperation: 'add',
1709 taskFunctionName: 'empty',
1710 taskFunction: (() => {}).toString()
1712 ).resolves.toBe(true)
1713 for (const workerNode of pool.workerNodes) {
1714 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1720 await pool.destroy()