1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
257 const testHandler = () => console.info('test handler executed')
258 pool = new FixedThreadPool(
260 './tests/worker-files/thread/testWorker.mjs',
262 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
263 workerChoiceStrategyOptions: {
264 runTime: { median: true },
265 weights: { 0: 300, 1: 200 }
268 restartWorkerOnError: false,
269 enableTasksQueue: true,
270 tasksQueueOptions: { concurrency: 2 },
271 messageHandler: testHandler,
272 errorHandler: testHandler,
273 onlineHandler: testHandler,
274 exitHandler: testHandler
277 expect(pool.emitter).toBeUndefined()
278 expect(pool.opts).toStrictEqual({
281 restartWorkerOnError: false,
282 enableTasksQueue: true,
285 size: Math.pow(numberOfWorkers, 2),
287 tasksStealingOnBackPressure: true,
288 tasksFinishedTimeout: 2000
290 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
291 workerChoiceStrategyOptions: {
292 runTime: { median: true },
293 weights: { 0: 300, 1: 200 }
295 onlineHandler: testHandler,
296 messageHandler: testHandler,
297 errorHandler: testHandler,
298 exitHandler: testHandler
300 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
303 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
304 runTime: { median: true },
305 waitTime: { median: false },
306 elu: { median: false },
307 weights: { 0: 300, 1: 200 }
309 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
310 .workerChoiceStrategies) {
311 expect(workerChoiceStrategy.opts).toStrictEqual({
314 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
315 runTime: { median: true },
316 waitTime: { median: false },
317 elu: { median: false },
318 weights: { 0: 300, 1: 200 }
324 it('Verify that pool options are validated', () => {
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategy: 'invalidStrategy'
334 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
339 './tests/worker-files/thread/testWorker.mjs',
341 workerChoiceStrategyOptions: { weights: {} }
346 'Invalid worker choice strategy options: must have a weight for each worker node'
353 './tests/worker-files/thread/testWorker.mjs',
355 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
367 './tests/worker-files/thread/testWorker.mjs',
369 enableTasksQueue: true,
370 tasksQueueOptions: 'invalidTasksQueueOptions'
374 new TypeError('Invalid tasks queue options: must be a plain object')
380 './tests/worker-files/thread/testWorker.mjs',
382 enableTasksQueue: true,
383 tasksQueueOptions: { concurrency: 0 }
388 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
395 './tests/worker-files/thread/testWorker.mjs',
397 enableTasksQueue: true,
398 tasksQueueOptions: { concurrency: -1 }
403 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
410 './tests/worker-files/thread/testWorker.mjs',
412 enableTasksQueue: true,
413 tasksQueueOptions: { concurrency: 0.2 }
417 new TypeError('Invalid worker node tasks concurrency: must be an integer')
423 './tests/worker-files/thread/testWorker.mjs',
425 enableTasksQueue: true,
426 tasksQueueOptions: { size: 0 }
431 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
438 './tests/worker-files/thread/testWorker.mjs',
440 enableTasksQueue: true,
441 tasksQueueOptions: { size: -1 }
446 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
453 './tests/worker-files/thread/testWorker.mjs',
455 enableTasksQueue: true,
456 tasksQueueOptions: { size: 0.2 }
460 new TypeError('Invalid worker node tasks queue size: must be an integer')
464 it('Verify that pool worker choice strategy options can be set', async () => {
465 const pool = new FixedThreadPool(
467 './tests/worker-files/thread/testWorker.mjs',
468 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
470 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
471 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
474 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
475 runTime: { median: false },
476 waitTime: { median: false },
477 elu: { median: false },
478 weights: expect.objectContaining({
479 0: expect.any(Number),
480 [pool.info.maxSize - 1]: expect.any(Number)
483 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
484 .workerChoiceStrategies) {
485 expect(workerChoiceStrategy.opts).toStrictEqual(
486 expect.objectContaining({
489 Object.keys(workerChoiceStrategy.opts.weights).length,
490 runTime: { median: false },
491 waitTime: { median: false },
492 elu: { median: false }
497 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
515 pool.setWorkerChoiceStrategyOptions({
516 runTime: { median: true },
517 elu: { median: true }
519 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
520 runTime: { median: true },
521 elu: { median: true }
523 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
526 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
527 runTime: { median: true },
528 waitTime: { median: false },
529 elu: { median: true },
530 weights: expect.objectContaining({
531 0: expect.any(Number),
532 [pool.info.maxSize - 1]: expect.any(Number)
535 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
536 .workerChoiceStrategies) {
537 expect(workerChoiceStrategy.opts).toStrictEqual(
538 expect.objectContaining({
541 Object.keys(workerChoiceStrategy.opts.weights).length,
542 runTime: { median: true },
543 waitTime: { median: false },
544 elu: { median: true }
549 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
567 pool.setWorkerChoiceStrategyOptions({
568 runTime: { median: false },
569 elu: { median: false }
571 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
572 runTime: { median: false },
573 elu: { median: false }
575 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
578 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
579 runTime: { median: false },
580 waitTime: { median: false },
581 elu: { median: false },
582 weights: expect.objectContaining({
583 0: expect.any(Number),
584 [pool.info.maxSize - 1]: expect.any(Number)
587 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
588 .workerChoiceStrategies) {
589 expect(workerChoiceStrategy.opts).toStrictEqual(
590 expect.objectContaining({
593 Object.keys(workerChoiceStrategy.opts.weights).length,
594 runTime: { median: false },
595 waitTime: { median: false },
596 elu: { median: false }
601 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
620 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
623 'Invalid worker choice strategy options: must be a plain object'
626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
644 './tests/worker-files/thread/testWorker.mjs'
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 size: Math.pow(numberOfWorkers, 2),
654 tasksStealingOnBackPressure: true,
655 tasksFinishedTimeout: 2000
657 pool.enableTasksQueue(true, { concurrency: 2 })
658 expect(pool.opts.enableTasksQueue).toBe(true)
659 expect(pool.opts.tasksQueueOptions).toStrictEqual({
661 size: Math.pow(numberOfWorkers, 2),
663 tasksStealingOnBackPressure: true,
664 tasksFinishedTimeout: 2000
666 pool.enableTasksQueue(false)
667 expect(pool.opts.enableTasksQueue).toBe(false)
668 expect(pool.opts.tasksQueueOptions).toBeUndefined()
672 it('Verify that pool tasks queue options can be set', async () => {
673 const pool = new FixedThreadPool(
675 './tests/worker-files/thread/testWorker.mjs',
676 { enableTasksQueue: true }
678 expect(pool.opts.tasksQueueOptions).toStrictEqual({
680 size: Math.pow(numberOfWorkers, 2),
682 tasksStealingOnBackPressure: true,
683 tasksFinishedTimeout: 2000
685 for (const workerNode of pool.workerNodes) {
686 expect(workerNode.tasksQueueBackPressureSize).toBe(
687 pool.opts.tasksQueueOptions.size
690 pool.setTasksQueueOptions({
694 tasksStealingOnBackPressure: false,
695 tasksFinishedTimeout: 3000
697 expect(pool.opts.tasksQueueOptions).toStrictEqual({
701 tasksStealingOnBackPressure: false,
702 tasksFinishedTimeout: 3000
704 for (const workerNode of pool.workerNodes) {
705 expect(workerNode.tasksQueueBackPressureSize).toBe(
706 pool.opts.tasksQueueOptions.size
709 pool.setTasksQueueOptions({
712 tasksStealingOnBackPressure: true
714 expect(pool.opts.tasksQueueOptions).toStrictEqual({
716 size: Math.pow(numberOfWorkers, 2),
718 tasksStealingOnBackPressure: true,
719 tasksFinishedTimeout: 2000
721 for (const workerNode of pool.workerNodes) {
722 expect(workerNode.tasksQueueBackPressureSize).toBe(
723 pool.opts.tasksQueueOptions.size
726 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
727 new TypeError('Invalid tasks queue options: must be a plain object')
729 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
731 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
734 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
736 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
739 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
740 new TypeError('Invalid worker node tasks concurrency: must be an integer')
742 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
744 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
747 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
749 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
752 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
753 new TypeError('Invalid worker node tasks queue size: must be an integer')
758 it('Verify that pool info is set', async () => {
759 let pool = new FixedThreadPool(
761 './tests/worker-files/thread/testWorker.mjs'
763 expect(pool.info).toStrictEqual({
765 type: PoolTypes.fixed,
766 worker: WorkerTypes.thread,
769 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
770 minSize: numberOfWorkers,
771 maxSize: numberOfWorkers,
772 workerNodes: numberOfWorkers,
773 idleWorkerNodes: numberOfWorkers,
780 pool = new DynamicClusterPool(
781 Math.floor(numberOfWorkers / 2),
783 './tests/worker-files/cluster/testWorker.js'
785 expect(pool.info).toStrictEqual({
787 type: PoolTypes.dynamic,
788 worker: WorkerTypes.cluster,
791 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
792 minSize: Math.floor(numberOfWorkers / 2),
793 maxSize: numberOfWorkers,
794 workerNodes: Math.floor(numberOfWorkers / 2),
795 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
804 it('Verify that pool worker tasks usage are initialized', async () => {
805 const pool = new FixedClusterPool(
807 './tests/worker-files/cluster/testWorker.js'
809 for (const workerNode of pool.workerNodes) {
810 expect(workerNode).toBeInstanceOf(WorkerNode)
811 expect(workerNode.usage).toStrictEqual({
817 sequentiallyStolen: 0,
822 history: new CircularArray()
825 history: new CircularArray()
829 history: new CircularArray()
832 history: new CircularArray()
840 it('Verify that pool worker tasks queue are initialized', async () => {
841 let pool = new FixedClusterPool(
843 './tests/worker-files/cluster/testWorker.js'
845 for (const workerNode of pool.workerNodes) {
846 expect(workerNode).toBeInstanceOf(WorkerNode)
847 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
848 expect(workerNode.tasksQueue.size).toBe(0)
849 expect(workerNode.tasksQueue.maxSize).toBe(0)
852 pool = new DynamicThreadPool(
853 Math.floor(numberOfWorkers / 2),
855 './tests/worker-files/thread/testWorker.mjs'
857 for (const workerNode of pool.workerNodes) {
858 expect(workerNode).toBeInstanceOf(WorkerNode)
859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
860 expect(workerNode.tasksQueue.size).toBe(0)
861 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 it('Verify that pool worker info are initialized', async () => {
867 let pool = new FixedClusterPool(
869 './tests/worker-files/cluster/testWorker.js'
871 for (const workerNode of pool.workerNodes) {
872 expect(workerNode).toBeInstanceOf(WorkerNode)
873 expect(workerNode.info).toStrictEqual({
874 id: expect.any(Number),
875 type: WorkerTypes.cluster,
881 pool = new DynamicThreadPool(
882 Math.floor(numberOfWorkers / 2),
884 './tests/worker-files/thread/testWorker.mjs'
886 for (const workerNode of pool.workerNodes) {
887 expect(workerNode).toBeInstanceOf(WorkerNode)
888 expect(workerNode.info).toStrictEqual({
889 id: expect.any(Number),
890 type: WorkerTypes.thread,
898 it('Verify that pool statuses are checked at start or destroy', async () => {
899 const pool = new FixedThreadPool(
901 './tests/worker-files/thread/testWorker.mjs'
903 expect(pool.info.started).toBe(true)
904 expect(pool.info.ready).toBe(true)
905 expect(() => pool.start()).toThrow(
906 new Error('Cannot start an already started pool')
909 expect(pool.info.started).toBe(false)
910 expect(pool.info.ready).toBe(false)
911 await expect(pool.destroy()).rejects.toThrow(
912 new Error('Cannot destroy an already destroyed pool')
916 it('Verify that pool can be started after initialization', async () => {
917 const pool = new FixedClusterPool(
919 './tests/worker-files/cluster/testWorker.js',
924 expect(pool.info.started).toBe(false)
925 expect(pool.info.ready).toBe(false)
926 expect(pool.readyEventEmitted).toBe(false)
927 expect(pool.workerNodes).toStrictEqual([])
928 await expect(pool.execute()).rejects.toThrow(
929 new Error('Cannot execute a task on not started pool')
932 expect(pool.info.started).toBe(true)
933 expect(pool.info.ready).toBe(true)
934 await waitPoolEvents(pool, PoolEvents.ready, 1)
935 expect(pool.readyEventEmitted).toBe(true)
936 expect(pool.workerNodes.length).toBe(numberOfWorkers)
937 for (const workerNode of pool.workerNodes) {
938 expect(workerNode).toBeInstanceOf(WorkerNode)
943 it('Verify that pool execute() arguments are checked', async () => {
944 const pool = new FixedClusterPool(
946 './tests/worker-files/cluster/testWorker.js'
948 await expect(pool.execute(undefined, 0)).rejects.toThrow(
949 new TypeError('name argument must be a string')
951 await expect(pool.execute(undefined, '')).rejects.toThrow(
952 new TypeError('name argument must not be an empty string')
954 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
955 new TypeError('transferList argument must be an array')
957 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
958 "Task function 'unknown' not found"
961 await expect(pool.execute()).rejects.toThrow(
962 new Error('Cannot execute a task on not started pool')
966 it('Verify that pool worker tasks usage are computed', async () => {
967 const pool = new FixedClusterPool(
969 './tests/worker-files/cluster/testWorker.js'
971 const promises = new Set()
972 const maxMultiplier = 2
973 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
974 promises.add(pool.execute())
976 for (const workerNode of pool.workerNodes) {
977 expect(workerNode.usage).toStrictEqual({
980 executing: maxMultiplier,
983 sequentiallyStolen: 0,
988 history: expect.any(CircularArray)
991 history: expect.any(CircularArray)
995 history: expect.any(CircularArray)
998 history: expect.any(CircularArray)
1003 await Promise.all(promises)
1004 for (const workerNode of pool.workerNodes) {
1005 expect(workerNode.usage).toStrictEqual({
1007 executed: maxMultiplier,
1011 sequentiallyStolen: 0,
1016 history: expect.any(CircularArray)
1019 history: expect.any(CircularArray)
1023 history: expect.any(CircularArray)
1026 history: expect.any(CircularArray)
1031 await pool.destroy()
1034 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1035 const pool = new DynamicThreadPool(
1036 Math.floor(numberOfWorkers / 2),
1038 './tests/worker-files/thread/testWorker.mjs'
1040 const promises = new Set()
1041 const maxMultiplier = 2
1042 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1043 promises.add(pool.execute())
1045 await Promise.all(promises)
1046 for (const workerNode of pool.workerNodes) {
1047 expect(workerNode.usage).toStrictEqual({
1049 executed: expect.any(Number),
1053 sequentiallyStolen: 0,
1058 history: expect.any(CircularArray)
1061 history: expect.any(CircularArray)
1065 history: expect.any(CircularArray)
1068 history: expect.any(CircularArray)
1072 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 numberOfWorkers * maxMultiplier
1076 expect(workerNode.usage.runTime.history.length).toBe(0)
1077 expect(workerNode.usage.waitTime.history.length).toBe(0)
1078 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1079 expect(workerNode.usage.elu.active.history.length).toBe(0)
1081 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1082 for (const workerNode of pool.workerNodes) {
1083 expect(workerNode.usage).toStrictEqual({
1089 sequentiallyStolen: 0,
1094 history: expect.any(CircularArray)
1097 history: expect.any(CircularArray)
1101 history: expect.any(CircularArray)
1104 history: expect.any(CircularArray)
1108 expect(workerNode.usage.runTime.history.length).toBe(0)
1109 expect(workerNode.usage.waitTime.history.length).toBe(0)
1110 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1111 expect(workerNode.usage.elu.active.history.length).toBe(0)
1113 await pool.destroy()
1116 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1117 const pool = new DynamicClusterPool(
1118 Math.floor(numberOfWorkers / 2),
1120 './tests/worker-files/cluster/testWorker.js'
1122 expect(pool.emitter.eventNames()).toStrictEqual([])
1125 pool.emitter.on(PoolEvents.ready, info => {
1129 await waitPoolEvents(pool, PoolEvents.ready, 1)
1130 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1131 expect(poolReady).toBe(1)
1132 expect(poolInfo).toStrictEqual({
1134 type: PoolTypes.dynamic,
1135 worker: WorkerTypes.cluster,
1138 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1139 minSize: expect.any(Number),
1140 maxSize: expect.any(Number),
1141 workerNodes: expect.any(Number),
1142 idleWorkerNodes: expect.any(Number),
1143 busyWorkerNodes: expect.any(Number),
1144 executedTasks: expect.any(Number),
1145 executingTasks: expect.any(Number),
1146 failedTasks: expect.any(Number)
1148 await pool.destroy()
1151 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1152 const pool = new FixedThreadPool(
1154 './tests/worker-files/thread/testWorker.mjs'
1156 expect(pool.emitter.eventNames()).toStrictEqual([])
1157 const promises = new Set()
1160 pool.emitter.on(PoolEvents.busy, info => {
1164 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1165 for (let i = 0; i < numberOfWorkers * 2; i++) {
1166 promises.add(pool.execute())
1168 await Promise.all(promises)
1169 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1170 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1171 expect(poolBusy).toBe(numberOfWorkers + 1)
1172 expect(poolInfo).toStrictEqual({
1174 type: PoolTypes.fixed,
1175 worker: WorkerTypes.thread,
1178 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1179 minSize: expect.any(Number),
1180 maxSize: expect.any(Number),
1181 workerNodes: expect.any(Number),
1182 idleWorkerNodes: expect.any(Number),
1183 busyWorkerNodes: expect.any(Number),
1184 executedTasks: expect.any(Number),
1185 executingTasks: expect.any(Number),
1186 failedTasks: expect.any(Number)
1188 await pool.destroy()
1191 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1192 const pool = new DynamicThreadPool(
1193 Math.floor(numberOfWorkers / 2),
1195 './tests/worker-files/thread/testWorker.mjs'
1197 expect(pool.emitter.eventNames()).toStrictEqual([])
1198 const promises = new Set()
1201 pool.emitter.on(PoolEvents.full, info => {
1205 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1206 for (let i = 0; i < numberOfWorkers * 2; i++) {
1207 promises.add(pool.execute())
1209 await Promise.all(promises)
1210 expect(poolFull).toBe(1)
1211 expect(poolInfo).toStrictEqual({
1213 type: PoolTypes.dynamic,
1214 worker: WorkerTypes.thread,
1217 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1218 minSize: expect.any(Number),
1219 maxSize: expect.any(Number),
1220 workerNodes: expect.any(Number),
1221 idleWorkerNodes: expect.any(Number),
1222 busyWorkerNodes: expect.any(Number),
1223 executedTasks: expect.any(Number),
1224 executingTasks: expect.any(Number),
1225 failedTasks: expect.any(Number)
1227 await pool.destroy()
1230 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1231 const pool = new FixedThreadPool(
1233 './tests/worker-files/thread/testWorker.mjs',
1235 enableTasksQueue: true
1238 stub(pool, 'hasBackPressure').returns(true)
1239 expect(pool.emitter.eventNames()).toStrictEqual([])
1240 const promises = new Set()
1241 let poolBackPressure = 0
1243 pool.emitter.on(PoolEvents.backPressure, info => {
1247 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1248 for (let i = 0; i < numberOfWorkers + 1; i++) {
1249 promises.add(pool.execute())
1251 await Promise.all(promises)
1252 expect(poolBackPressure).toBe(1)
1253 expect(poolInfo).toStrictEqual({
1255 type: PoolTypes.fixed,
1256 worker: WorkerTypes.thread,
1259 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1260 minSize: expect.any(Number),
1261 maxSize: expect.any(Number),
1262 workerNodes: expect.any(Number),
1263 idleWorkerNodes: expect.any(Number),
1264 busyWorkerNodes: expect.any(Number),
1265 executedTasks: expect.any(Number),
1266 executingTasks: expect.any(Number),
1267 maxQueuedTasks: expect.any(Number),
1268 queuedTasks: expect.any(Number),
1270 stolenTasks: expect.any(Number),
1271 failedTasks: expect.any(Number)
1273 expect(pool.hasBackPressure.callCount).toBe(5)
1274 await pool.destroy()
1277 it('Verify that destroy() waits for queued tasks to finish', async () => {
1278 const tasksFinishedTimeout = 2500
1279 const pool = new FixedThreadPool(
1281 './tests/worker-files/thread/asyncWorker.mjs',
1283 enableTasksQueue: true,
1284 tasksQueueOptions: { tasksFinishedTimeout }
1287 const maxMultiplier = 4
1288 let tasksFinished = 0
1289 for (const workerNode of pool.workerNodes) {
1290 workerNode.on('taskFinished', () => {
1294 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1297 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1298 const startTime = performance.now()
1299 await pool.destroy()
1300 const elapsedTime = performance.now() - startTime
1301 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1302 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1303 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1306 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1307 const tasksFinishedTimeout = 1000
1308 const pool = new FixedThreadPool(
1310 './tests/worker-files/thread/asyncWorker.mjs',
1312 enableTasksQueue: true,
1313 tasksQueueOptions: { tasksFinishedTimeout }
1316 const maxMultiplier = 4
1317 let tasksFinished = 0
1318 for (const workerNode of pool.workerNodes) {
1319 workerNode.on('taskFinished', () => {
1323 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1326 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1327 const startTime = performance.now()
1328 await pool.destroy()
1329 const elapsedTime = performance.now() - startTime
1330 expect(tasksFinished).toBe(0)
1331 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1334 it('Verify that pool asynchronous resource track tasks execution', async () => {
1339 let resolveCalls = 0
1340 const hook = createHook({
1341 init (asyncId, type) {
1342 if (type === 'poolifier:task') {
1344 taskAsyncId = asyncId
1348 if (asyncId === taskAsyncId) beforeCalls++
1351 if (asyncId === taskAsyncId) afterCalls++
1354 if (executionAsyncId() === taskAsyncId) resolveCalls++
1357 const pool = new FixedThreadPool(
1359 './tests/worker-files/thread/testWorker.mjs'
1362 await pool.execute()
1364 expect(initCalls).toBe(1)
1365 expect(beforeCalls).toBe(1)
1366 expect(afterCalls).toBe(1)
1367 expect(resolveCalls).toBe(1)
1368 await pool.destroy()
1371 it('Verify that hasTaskFunction() is working', async () => {
1372 const dynamicThreadPool = new DynamicThreadPool(
1373 Math.floor(numberOfWorkers / 2),
1375 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1377 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1378 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1379 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1382 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1383 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1384 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1385 await dynamicThreadPool.destroy()
1386 const fixedClusterPool = new FixedClusterPool(
1388 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1390 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1391 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1392 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1395 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1396 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1397 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1398 await fixedClusterPool.destroy()
1401 it('Verify that addTaskFunction() is working', async () => {
1402 const dynamicThreadPool = new DynamicThreadPool(
1403 Math.floor(numberOfWorkers / 2),
1405 './tests/worker-files/thread/testWorker.mjs'
1407 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1409 dynamicThreadPool.addTaskFunction(0, () => {})
1410 ).rejects.toThrow(new TypeError('name argument must be a string'))
1412 dynamicThreadPool.addTaskFunction('', () => {})
1414 new TypeError('name argument must not be an empty string')
1416 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1417 new TypeError('fn argument must be a function')
1419 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1420 new TypeError('fn argument must be a function')
1422 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1426 const echoTaskFunction = data => {
1430 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 ).resolves.toBe(true)
1432 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1433 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1436 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1441 const taskFunctionData = { test: 'test' }
1442 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1443 expect(echoResult).toStrictEqual(taskFunctionData)
1444 for (const workerNode of dynamicThreadPool.workerNodes) {
1445 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1447 executed: expect.any(Number),
1450 sequentiallyStolen: 0,
1455 history: new CircularArray()
1458 history: new CircularArray()
1462 history: new CircularArray()
1465 history: new CircularArray()
1470 await dynamicThreadPool.destroy()
1473 it('Verify that removeTaskFunction() is working', async () => {
1474 const dynamicThreadPool = new DynamicThreadPool(
1475 Math.floor(numberOfWorkers / 2),
1477 './tests/worker-files/thread/testWorker.mjs'
1479 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1480 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1484 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1485 new Error('Cannot remove a task function not handled on the pool side')
1487 const echoTaskFunction = data => {
1490 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1491 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1492 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1495 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1500 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1503 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1504 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1505 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1509 await dynamicThreadPool.destroy()
1512 it('Verify that listTaskFunctionNames() is working', async () => {
1513 const dynamicThreadPool = new DynamicThreadPool(
1514 Math.floor(numberOfWorkers / 2),
1516 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1518 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1519 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1521 'jsonIntegerSerialization',
1525 await dynamicThreadPool.destroy()
1526 const fixedClusterPool = new FixedClusterPool(
1528 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1530 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1531 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1533 'jsonIntegerSerialization',
1537 await fixedClusterPool.destroy()
1540 it('Verify that setDefaultTaskFunction() is working', async () => {
1541 const dynamicThreadPool = new DynamicThreadPool(
1542 Math.floor(numberOfWorkers / 2),
1544 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1546 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1547 const workerId = dynamicThreadPool.workerNodes[0].info.id
1548 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1550 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1554 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1557 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1561 dynamicThreadPool.setDefaultTaskFunction('unknown')
1564 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1567 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1569 'jsonIntegerSerialization',
1574 dynamicThreadPool.setDefaultTaskFunction('factorial')
1575 ).resolves.toBe(true)
1576 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1579 'jsonIntegerSerialization',
1583 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1584 ).resolves.toBe(true)
1585 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1588 'jsonIntegerSerialization',
1591 await dynamicThreadPool.destroy()
1594 it('Verify that multiple task functions worker is working', async () => {
1595 const pool = new DynamicClusterPool(
1596 Math.floor(numberOfWorkers / 2),
1598 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1600 const data = { n: 10 }
1601 const result0 = await pool.execute(data)
1602 expect(result0).toStrictEqual({ ok: 1 })
1603 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1604 expect(result1).toStrictEqual({ ok: 1 })
1605 const result2 = await pool.execute(data, 'factorial')
1606 expect(result2).toBe(3628800)
1607 const result3 = await pool.execute(data, 'fibonacci')
1608 expect(result3).toBe(55)
1609 expect(pool.info.executingTasks).toBe(0)
1610 expect(pool.info.executedTasks).toBe(4)
1611 for (const workerNode of pool.workerNodes) {
1612 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1614 'jsonIntegerSerialization',
1618 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1619 for (const name of pool.listTaskFunctionNames()) {
1620 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1622 executed: expect.any(Number),
1626 sequentiallyStolen: 0,
1630 history: expect.any(CircularArray)
1633 history: expect.any(CircularArray)
1637 history: expect.any(CircularArray)
1640 history: expect.any(CircularArray)
1645 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1646 ).toBeGreaterThan(0)
1649 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1651 workerNode.getTaskFunctionWorkerUsage(
1652 workerNode.info.taskFunctionNames[1]
1656 await pool.destroy()
1659 it('Verify sendKillMessageToWorker()', async () => {
1660 const pool = new DynamicClusterPool(
1661 Math.floor(numberOfWorkers / 2),
1663 './tests/worker-files/cluster/testWorker.js'
1665 const workerNodeKey = 0
1667 pool.sendKillMessageToWorker(workerNodeKey)
1668 ).resolves.toBeUndefined()
1670 pool.sendKillMessageToWorker(numberOfWorkers)
1671 ).rejects.toStrictEqual(
1672 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1674 await pool.destroy()
1677 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1678 const pool = new DynamicClusterPool(
1679 Math.floor(numberOfWorkers / 2),
1681 './tests/worker-files/cluster/testWorker.js'
1683 const workerNodeKey = 0
1685 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1686 taskFunctionOperation: 'add',
1687 taskFunctionName: 'empty',
1688 taskFunction: (() => {}).toString()
1690 ).resolves.toBe(true)
1692 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1693 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1694 await pool.destroy()
1697 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1698 const pool = new DynamicClusterPool(
1699 Math.floor(numberOfWorkers / 2),
1701 './tests/worker-files/cluster/testWorker.js'
1704 pool.sendTaskFunctionOperationToWorkers({
1705 taskFunctionOperation: 'add',
1706 taskFunctionName: 'empty',
1707 taskFunction: (() => {}).toString()
1709 ).resolves.toBe(true)
1710 for (const workerNode of pool.workerNodes) {
1711 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1717 await pool.destroy()