1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
257 const testHandler = () => console.info('test handler executed')
258 pool = new FixedThreadPool(
260 './tests/worker-files/thread/testWorker.mjs',
262 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
263 workerChoiceStrategyOptions: {
264 runTime: { median: true },
265 weights: { 0: 300, 1: 200 }
268 restartWorkerOnError: false,
269 enableTasksQueue: true,
270 tasksQueueOptions: { concurrency: 2 },
271 messageHandler: testHandler,
272 errorHandler: testHandler,
273 onlineHandler: testHandler,
274 exitHandler: testHandler
277 expect(pool.emitter).toBeUndefined()
278 expect(pool.opts).toStrictEqual({
281 restartWorkerOnError: false,
282 enableTasksQueue: true,
285 size: Math.pow(numberOfWorkers, 2),
287 tasksStealingOnBackPressure: true,
288 tasksFinishedTimeout: 2000
290 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
291 workerChoiceStrategyOptions: {
292 runTime: { median: true },
293 weights: { 0: 300, 1: 200 }
295 onlineHandler: testHandler,
296 messageHandler: testHandler,
297 errorHandler: testHandler,
298 exitHandler: testHandler
300 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
303 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
304 runTime: { median: true },
305 waitTime: { median: false },
306 elu: { median: false },
307 weights: { 0: 300, 1: 200 }
309 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
310 .workerChoiceStrategies) {
311 expect(workerChoiceStrategy.opts).toStrictEqual({
314 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
315 runTime: { median: true },
316 waitTime: { median: false },
317 elu: { median: false },
318 weights: { 0: 300, 1: 200 }
324 it('Verify that pool options are validated', () => {
329 './tests/worker-files/thread/testWorker.mjs',
331 workerChoiceStrategy: 'invalidStrategy'
334 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
339 './tests/worker-files/thread/testWorker.mjs',
341 workerChoiceStrategyOptions: { weights: {} }
346 'Invalid worker choice strategy options: must have a weight for each worker node'
353 './tests/worker-files/thread/testWorker.mjs',
355 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
367 './tests/worker-files/thread/testWorker.mjs',
369 enableTasksQueue: true,
370 tasksQueueOptions: 'invalidTasksQueueOptions'
374 new TypeError('Invalid tasks queue options: must be a plain object')
380 './tests/worker-files/thread/testWorker.mjs',
382 enableTasksQueue: true,
383 tasksQueueOptions: { concurrency: 0 }
388 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
395 './tests/worker-files/thread/testWorker.mjs',
397 enableTasksQueue: true,
398 tasksQueueOptions: { concurrency: -1 }
403 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
410 './tests/worker-files/thread/testWorker.mjs',
412 enableTasksQueue: true,
413 tasksQueueOptions: { concurrency: 0.2 }
417 new TypeError('Invalid worker node tasks concurrency: must be an integer')
423 './tests/worker-files/thread/testWorker.mjs',
425 enableTasksQueue: true,
426 tasksQueueOptions: { size: 0 }
431 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
438 './tests/worker-files/thread/testWorker.mjs',
440 enableTasksQueue: true,
441 tasksQueueOptions: { size: -1 }
446 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
453 './tests/worker-files/thread/testWorker.mjs',
455 enableTasksQueue: true,
456 tasksQueueOptions: { size: 0.2 }
460 new TypeError('Invalid worker node tasks queue size: must be an integer')
464 it('Verify that pool worker choice strategy options can be set', async () => {
465 const pool = new FixedThreadPool(
467 './tests/worker-files/thread/testWorker.mjs',
468 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
470 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
471 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
474 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
475 runTime: { median: false },
476 waitTime: { median: false },
477 elu: { median: false },
478 weights: expect.objectContaining({
479 0: expect.any(Number),
480 [pool.info.maxSize - 1]: expect.any(Number)
483 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
484 .workerChoiceStrategies) {
485 expect(workerChoiceStrategy.opts).toStrictEqual(
486 expect.objectContaining({
489 Object.keys(workerChoiceStrategy.opts.weights).length,
490 runTime: { median: false },
491 waitTime: { median: false },
492 elu: { median: false }
497 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
515 pool.setWorkerChoiceStrategyOptions({
516 runTime: { median: true },
517 elu: { median: true }
519 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
520 runTime: { median: true },
521 elu: { median: true }
523 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
526 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
527 runTime: { median: true },
528 waitTime: { median: false },
529 elu: { median: true },
530 weights: expect.objectContaining({
531 0: expect.any(Number),
532 [pool.info.maxSize - 1]: expect.any(Number)
535 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
536 .workerChoiceStrategies) {
537 expect(workerChoiceStrategy.opts).toStrictEqual(
538 expect.objectContaining({
541 Object.keys(workerChoiceStrategy.opts.weights).length,
542 runTime: { median: true },
543 waitTime: { median: false },
544 elu: { median: true }
549 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
567 pool.setWorkerChoiceStrategyOptions({
568 runTime: { median: false },
569 elu: { median: false }
571 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
572 runTime: { median: false },
573 elu: { median: false }
575 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
578 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
579 runTime: { median: false },
580 waitTime: { median: false },
581 elu: { median: false },
582 weights: expect.objectContaining({
583 0: expect.any(Number),
584 [pool.info.maxSize - 1]: expect.any(Number)
587 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
588 .workerChoiceStrategies) {
589 expect(workerChoiceStrategy.opts).toStrictEqual(
590 expect.objectContaining({
593 Object.keys(workerChoiceStrategy.opts.weights).length,
594 runTime: { median: false },
595 waitTime: { median: false },
596 elu: { median: false }
601 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
620 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
623 'Invalid worker choice strategy options: must be a plain object'
626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
644 './tests/worker-files/thread/testWorker.mjs'
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 size: Math.pow(numberOfWorkers, 2),
654 tasksStealingOnBackPressure: true,
655 tasksFinishedTimeout: 2000
657 pool.enableTasksQueue(true, { concurrency: 2 })
658 expect(pool.opts.enableTasksQueue).toBe(true)
659 expect(pool.opts.tasksQueueOptions).toStrictEqual({
661 size: Math.pow(numberOfWorkers, 2),
663 tasksStealingOnBackPressure: true,
664 tasksFinishedTimeout: 2000
666 pool.enableTasksQueue(false)
667 expect(pool.opts.enableTasksQueue).toBe(false)
668 expect(pool.opts.tasksQueueOptions).toBeUndefined()
672 it('Verify that pool tasks queue options can be set', async () => {
673 const pool = new FixedThreadPool(
675 './tests/worker-files/thread/testWorker.mjs',
676 { enableTasksQueue: true }
678 expect(pool.opts.tasksQueueOptions).toStrictEqual({
680 size: Math.pow(numberOfWorkers, 2),
682 tasksStealingOnBackPressure: true,
683 tasksFinishedTimeout: 2000
685 for (const workerNode of pool.workerNodes) {
686 expect(workerNode.tasksQueueBackPressureSize).toBe(
687 pool.opts.tasksQueueOptions.size
690 pool.setTasksQueueOptions({
694 tasksStealingOnBackPressure: false,
695 tasksFinishedTimeout: 3000
697 expect(pool.opts.tasksQueueOptions).toStrictEqual({
701 tasksStealingOnBackPressure: false,
702 tasksFinishedTimeout: 3000
704 for (const workerNode of pool.workerNodes) {
705 expect(workerNode.tasksQueueBackPressureSize).toBe(
706 pool.opts.tasksQueueOptions.size
709 pool.setTasksQueueOptions({
712 tasksStealingOnBackPressure: true
714 expect(pool.opts.tasksQueueOptions).toStrictEqual({
716 size: Math.pow(numberOfWorkers, 2),
718 tasksStealingOnBackPressure: true,
719 tasksFinishedTimeout: 2000
721 for (const workerNode of pool.workerNodes) {
722 expect(workerNode.tasksQueueBackPressureSize).toBe(
723 pool.opts.tasksQueueOptions.size
726 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
727 new TypeError('Invalid tasks queue options: must be a plain object')
729 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
731 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
734 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
736 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
739 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
740 new TypeError('Invalid worker node tasks concurrency: must be an integer')
742 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
744 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
747 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
749 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
752 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
753 new TypeError('Invalid worker node tasks queue size: must be an integer')
758 it('Verify that pool info is set', async () => {
759 let pool = new FixedThreadPool(
761 './tests/worker-files/thread/testWorker.mjs'
763 expect(pool.info).toStrictEqual({
765 type: PoolTypes.fixed,
766 worker: WorkerTypes.thread,
769 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
770 minSize: numberOfWorkers,
771 maxSize: numberOfWorkers,
772 workerNodes: numberOfWorkers,
773 idleWorkerNodes: numberOfWorkers,
780 pool = new DynamicClusterPool(
781 Math.floor(numberOfWorkers / 2),
783 './tests/worker-files/cluster/testWorker.js'
785 expect(pool.info).toStrictEqual({
787 type: PoolTypes.dynamic,
788 worker: WorkerTypes.cluster,
791 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
792 minSize: Math.floor(numberOfWorkers / 2),
793 maxSize: numberOfWorkers,
794 workerNodes: Math.floor(numberOfWorkers / 2),
795 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
804 it('Verify that pool worker tasks usage are initialized', async () => {
805 const pool = new FixedClusterPool(
807 './tests/worker-files/cluster/testWorker.js'
809 for (const workerNode of pool.workerNodes) {
810 expect(workerNode).toBeInstanceOf(WorkerNode)
811 expect(workerNode.usage).toStrictEqual({
817 sequentiallyStolen: 0,
822 history: new CircularArray()
825 history: new CircularArray()
829 history: new CircularArray()
832 history: new CircularArray()
840 it('Verify that pool worker tasks queue are initialized', async () => {
841 let pool = new FixedClusterPool(
843 './tests/worker-files/cluster/testWorker.js'
845 for (const workerNode of pool.workerNodes) {
846 expect(workerNode).toBeInstanceOf(WorkerNode)
847 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
848 expect(workerNode.tasksQueue.size).toBe(0)
849 expect(workerNode.tasksQueue.maxSize).toBe(0)
852 pool = new DynamicThreadPool(
853 Math.floor(numberOfWorkers / 2),
855 './tests/worker-files/thread/testWorker.mjs'
857 for (const workerNode of pool.workerNodes) {
858 expect(workerNode).toBeInstanceOf(WorkerNode)
859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
860 expect(workerNode.tasksQueue.size).toBe(0)
861 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 it('Verify that pool worker info are initialized', async () => {
867 let pool = new FixedClusterPool(
869 './tests/worker-files/cluster/testWorker.js'
871 for (const workerNode of pool.workerNodes) {
872 expect(workerNode).toBeInstanceOf(WorkerNode)
873 expect(workerNode.info).toStrictEqual({
874 id: expect.any(Number),
875 type: WorkerTypes.cluster,
881 pool = new DynamicThreadPool(
882 Math.floor(numberOfWorkers / 2),
884 './tests/worker-files/thread/testWorker.mjs'
886 for (const workerNode of pool.workerNodes) {
887 expect(workerNode).toBeInstanceOf(WorkerNode)
888 expect(workerNode.info).toStrictEqual({
889 id: expect.any(Number),
890 type: WorkerTypes.thread,
898 it('Verify that pool statuses are checked at start or destroy', async () => {
899 const pool = new FixedThreadPool(
901 './tests/worker-files/thread/testWorker.mjs'
903 expect(pool.info.started).toBe(true)
904 expect(pool.info.ready).toBe(true)
905 expect(() => pool.start()).toThrow(
906 new Error('Cannot start an already started pool')
909 expect(pool.info.started).toBe(false)
910 expect(pool.info.ready).toBe(false)
911 await expect(pool.destroy()).rejects.toThrow(
912 new Error('Cannot destroy an already destroyed pool')
916 it('Verify that pool can be started after initialization', async () => {
917 const pool = new FixedClusterPool(
919 './tests/worker-files/cluster/testWorker.js',
924 expect(pool.info.started).toBe(false)
925 expect(pool.info.ready).toBe(false)
926 expect(pool.readyEventEmitted).toBe(false)
927 expect(pool.workerNodes).toStrictEqual([])
928 await expect(pool.execute()).rejects.toThrow(
929 new Error('Cannot execute a task on not started pool')
932 expect(pool.info.started).toBe(true)
933 expect(pool.info.ready).toBe(true)
934 await waitPoolEvents(pool, PoolEvents.ready, 1)
935 expect(pool.readyEventEmitted).toBe(true)
936 expect(pool.workerNodes.length).toBe(numberOfWorkers)
937 for (const workerNode of pool.workerNodes) {
938 expect(workerNode).toBeInstanceOf(WorkerNode)
943 it('Verify that pool execute() arguments are checked', async () => {
944 const pool = new FixedClusterPool(
946 './tests/worker-files/cluster/testWorker.js'
948 await expect(pool.execute(undefined, 0)).rejects.toThrow(
949 new TypeError('name argument must be a string')
951 await expect(pool.execute(undefined, '')).rejects.toThrow(
952 new TypeError('name argument must not be an empty string')
954 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
955 new TypeError('transferList argument must be an array')
957 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
958 "Task function 'unknown' not found"
961 await expect(pool.execute()).rejects.toThrow(
962 new Error('Cannot execute a task on not started pool')
966 it('Verify that pool worker tasks usage are computed', async () => {
967 const pool = new FixedClusterPool(
969 './tests/worker-files/cluster/testWorker.js'
971 const promises = new Set()
972 const maxMultiplier = 2
973 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
974 promises.add(pool.execute())
976 for (const workerNode of pool.workerNodes) {
977 expect(workerNode.usage).toStrictEqual({
980 executing: maxMultiplier,
983 sequentiallyStolen: 0,
988 history: expect.any(CircularArray)
991 history: expect.any(CircularArray)
995 history: expect.any(CircularArray)
998 history: expect.any(CircularArray)
1003 await Promise.all(promises)
1004 for (const workerNode of pool.workerNodes) {
1005 expect(workerNode.usage).toStrictEqual({
1007 executed: maxMultiplier,
1011 sequentiallyStolen: 0,
1016 history: expect.any(CircularArray)
1019 history: expect.any(CircularArray)
1023 history: expect.any(CircularArray)
1026 history: expect.any(CircularArray)
1031 await pool.destroy()
1034 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1035 const pool = new DynamicThreadPool(
1036 Math.floor(numberOfWorkers / 2),
1038 './tests/worker-files/thread/testWorker.mjs'
1040 const promises = new Set()
1041 const maxMultiplier = 2
1042 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1043 promises.add(pool.execute())
1045 await Promise.all(promises)
1046 for (const workerNode of pool.workerNodes) {
1047 expect(workerNode.usage).toStrictEqual({
1049 executed: expect.any(Number),
1053 sequentiallyStolen: 0,
1058 history: expect.any(CircularArray)
1061 history: expect.any(CircularArray)
1065 history: expect.any(CircularArray)
1068 history: expect.any(CircularArray)
1072 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 numberOfWorkers * maxMultiplier
1076 expect(workerNode.usage.runTime.history.length).toBe(0)
1077 expect(workerNode.usage.waitTime.history.length).toBe(0)
1078 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1079 expect(workerNode.usage.elu.active.history.length).toBe(0)
1081 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1082 for (const workerNode of pool.workerNodes) {
1083 expect(workerNode.usage).toStrictEqual({
1089 sequentiallyStolen: 0,
1094 history: expect.any(CircularArray)
1097 history: expect.any(CircularArray)
1101 history: expect.any(CircularArray)
1104 history: expect.any(CircularArray)
1108 expect(workerNode.usage.runTime.history.length).toBe(0)
1109 expect(workerNode.usage.waitTime.history.length).toBe(0)
1110 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1111 expect(workerNode.usage.elu.active.history.length).toBe(0)
1113 await pool.destroy()
1116 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1117 const pool = new DynamicClusterPool(
1118 Math.floor(numberOfWorkers / 2),
1120 './tests/worker-files/cluster/testWorker.js'
1122 expect(pool.emitter.eventNames()).toStrictEqual([])
1125 pool.emitter.on(PoolEvents.ready, info => {
1129 await waitPoolEvents(pool, PoolEvents.ready, 1)
1130 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1131 expect(poolReady).toBe(1)
1132 expect(poolInfo).toStrictEqual({
1134 type: PoolTypes.dynamic,
1135 worker: WorkerTypes.cluster,
1138 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1139 minSize: expect.any(Number),
1140 maxSize: expect.any(Number),
1141 workerNodes: expect.any(Number),
1142 idleWorkerNodes: expect.any(Number),
1143 busyWorkerNodes: expect.any(Number),
1144 executedTasks: expect.any(Number),
1145 executingTasks: expect.any(Number),
1146 failedTasks: expect.any(Number)
1148 await pool.destroy()
1151 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1152 const pool = new FixedThreadPool(
1154 './tests/worker-files/thread/testWorker.mjs'
1156 expect(pool.emitter.eventNames()).toStrictEqual([])
1157 const promises = new Set()
1160 pool.emitter.on(PoolEvents.busy, info => {
1164 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1165 for (let i = 0; i < numberOfWorkers * 2; i++) {
1166 promises.add(pool.execute())
1168 await Promise.all(promises)
1169 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1170 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1171 expect(poolBusy).toBe(numberOfWorkers + 1)
1172 expect(poolInfo).toStrictEqual({
1174 type: PoolTypes.fixed,
1175 worker: WorkerTypes.thread,
1178 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1179 minSize: expect.any(Number),
1180 maxSize: expect.any(Number),
1181 workerNodes: expect.any(Number),
1182 idleWorkerNodes: expect.any(Number),
1183 busyWorkerNodes: expect.any(Number),
1184 executedTasks: expect.any(Number),
1185 executingTasks: expect.any(Number),
1186 failedTasks: expect.any(Number)
1188 await pool.destroy()
1191 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1192 const pool = new DynamicThreadPool(
1193 Math.floor(numberOfWorkers / 2),
1195 './tests/worker-files/thread/testWorker.mjs'
1197 expect(pool.emitter.eventNames()).toStrictEqual([])
1198 const promises = new Set()
1201 pool.emitter.on(PoolEvents.full, info => {
1205 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1206 for (let i = 0; i < numberOfWorkers * 2; i++) {
1207 promises.add(pool.execute())
1209 await Promise.all(promises)
1210 expect(poolFull).toBe(1)
1211 expect(poolInfo).toStrictEqual({
1213 type: PoolTypes.dynamic,
1214 worker: WorkerTypes.thread,
1217 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1218 minSize: expect.any(Number),
1219 maxSize: expect.any(Number),
1220 workerNodes: expect.any(Number),
1221 idleWorkerNodes: expect.any(Number),
1222 busyWorkerNodes: expect.any(Number),
1223 executedTasks: expect.any(Number),
1224 executingTasks: expect.any(Number),
1225 failedTasks: expect.any(Number)
1227 await pool.destroy()
1230 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1231 const pool = new FixedThreadPool(
1233 './tests/worker-files/thread/testWorker.mjs',
1235 enableTasksQueue: true
1238 stub(pool, 'hasBackPressure').returns(true)
1239 expect(pool.emitter.eventNames()).toStrictEqual([])
1240 const promises = new Set()
1241 let poolBackPressure = 0
1243 pool.emitter.on(PoolEvents.backPressure, info => {
1247 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1248 for (let i = 0; i < numberOfWorkers + 1; i++) {
1249 promises.add(pool.execute())
1251 await Promise.all(promises)
1252 expect(poolBackPressure).toBe(1)
1253 expect(poolInfo).toStrictEqual({
1255 type: PoolTypes.fixed,
1256 worker: WorkerTypes.thread,
1259 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1260 minSize: expect.any(Number),
1261 maxSize: expect.any(Number),
1262 workerNodes: expect.any(Number),
1263 idleWorkerNodes: expect.any(Number),
1264 busyWorkerNodes: expect.any(Number),
1265 executedTasks: expect.any(Number),
1266 executingTasks: expect.any(Number),
1267 maxQueuedTasks: expect.any(Number),
1268 queuedTasks: expect.any(Number),
1270 stolenTasks: expect.any(Number),
1271 failedTasks: expect.any(Number)
1273 expect(pool.hasBackPressure.callCount).toBe(5)
1274 await pool.destroy()
1277 it('Verify that destroy() waits for queued tasks to finish', async () => {
1278 const tasksFinishedTimeout = 2500
1279 const pool = new FixedThreadPool(
1281 './tests/worker-files/thread/asyncWorker.mjs',
1283 enableTasksQueue: true,
1284 tasksQueueOptions: { tasksFinishedTimeout }
1287 const maxMultiplier = 4
1288 let tasksFinished = 0
1289 for (const workerNode of pool.workerNodes) {
1290 workerNode.on('taskFinished', () => {
1294 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1297 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1298 const startTime = performance.now()
1299 await pool.destroy()
1300 const elapsedTime = performance.now() - startTime
1301 expect(tasksFinished).toBeGreaterThanOrEqual(numberOfWorkers)
1302 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1303 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1304 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1307 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1308 const tasksFinishedTimeout = 1000
1309 const pool = new FixedThreadPool(
1311 './tests/worker-files/thread/asyncWorker.mjs',
1313 enableTasksQueue: true,
1314 tasksQueueOptions: { tasksFinishedTimeout }
1317 const maxMultiplier = 4
1318 let tasksFinished = 0
1319 for (const workerNode of pool.workerNodes) {
1320 workerNode.on('taskFinished', () => {
1324 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1327 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1328 const startTime = performance.now()
1329 await pool.destroy()
1330 const elapsedTime = performance.now() - startTime
1331 expect(tasksFinished).toBe(0)
1332 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1335 it('Verify that pool asynchronous resource track tasks execution', async () => {
1340 let resolveCalls = 0
1341 const hook = createHook({
1342 init (asyncId, type) {
1343 if (type === 'poolifier:task') {
1345 taskAsyncId = asyncId
1349 if (asyncId === taskAsyncId) beforeCalls++
1352 if (asyncId === taskAsyncId) afterCalls++
1355 if (executionAsyncId() === taskAsyncId) resolveCalls++
1358 const pool = new FixedThreadPool(
1360 './tests/worker-files/thread/testWorker.mjs'
1363 await pool.execute()
1365 expect(initCalls).toBe(1)
1366 expect(beforeCalls).toBe(1)
1367 expect(afterCalls).toBe(1)
1368 expect(resolveCalls).toBe(1)
1369 await pool.destroy()
1372 it('Verify that hasTaskFunction() is working', async () => {
1373 const dynamicThreadPool = new DynamicThreadPool(
1374 Math.floor(numberOfWorkers / 2),
1376 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1378 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1379 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1380 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1383 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1384 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1385 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1386 await dynamicThreadPool.destroy()
1387 const fixedClusterPool = new FixedClusterPool(
1389 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1391 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1392 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1393 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1396 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1397 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1398 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1399 await fixedClusterPool.destroy()
1402 it('Verify that addTaskFunction() is working', async () => {
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1406 './tests/worker-files/thread/testWorker.mjs'
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1410 dynamicThreadPool.addTaskFunction(0, () => {})
1411 ).rejects.toThrow(new TypeError('name argument must be a string'))
1413 dynamicThreadPool.addTaskFunction('', () => {})
1415 new TypeError('name argument must not be an empty string')
1417 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1418 new TypeError('fn argument must be a function')
1420 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1421 new TypeError('fn argument must be a function')
1423 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1427 const echoTaskFunction = data => {
1431 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1432 ).resolves.toBe(true)
1433 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1434 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1442 const taskFunctionData = { test: 'test' }
1443 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1444 expect(echoResult).toStrictEqual(taskFunctionData)
1445 for (const workerNode of dynamicThreadPool.workerNodes) {
1446 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1448 executed: expect.any(Number),
1451 sequentiallyStolen: 0,
1456 history: new CircularArray()
1459 history: new CircularArray()
1463 history: new CircularArray()
1466 history: new CircularArray()
1471 await dynamicThreadPool.destroy()
1474 it('Verify that removeTaskFunction() is working', async () => {
1475 const dynamicThreadPool = new DynamicThreadPool(
1476 Math.floor(numberOfWorkers / 2),
1478 './tests/worker-files/thread/testWorker.mjs'
1480 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1481 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1485 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1486 new Error('Cannot remove a task function not handled on the pool side')
1488 const echoTaskFunction = data => {
1491 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1492 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1493 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1496 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1501 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1504 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1505 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1506 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1510 await dynamicThreadPool.destroy()
1513 it('Verify that listTaskFunctionNames() is working', async () => {
1514 const dynamicThreadPool = new DynamicThreadPool(
1515 Math.floor(numberOfWorkers / 2),
1517 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1519 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1520 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1522 'jsonIntegerSerialization',
1526 await dynamicThreadPool.destroy()
1527 const fixedClusterPool = new FixedClusterPool(
1529 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1531 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1532 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1534 'jsonIntegerSerialization',
1538 await fixedClusterPool.destroy()
1541 it('Verify that setDefaultTaskFunction() is working', async () => {
1542 const dynamicThreadPool = new DynamicThreadPool(
1543 Math.floor(numberOfWorkers / 2),
1545 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1547 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1548 const workerId = dynamicThreadPool.workerNodes[0].info.id
1549 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1551 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1555 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1558 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1562 dynamicThreadPool.setDefaultTaskFunction('unknown')
1565 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1568 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1570 'jsonIntegerSerialization',
1575 dynamicThreadPool.setDefaultTaskFunction('factorial')
1576 ).resolves.toBe(true)
1577 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1580 'jsonIntegerSerialization',
1584 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1585 ).resolves.toBe(true)
1586 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1589 'jsonIntegerSerialization',
1592 await dynamicThreadPool.destroy()
1595 it('Verify that multiple task functions worker is working', async () => {
1596 const pool = new DynamicClusterPool(
1597 Math.floor(numberOfWorkers / 2),
1599 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1601 const data = { n: 10 }
1602 const result0 = await pool.execute(data)
1603 expect(result0).toStrictEqual({ ok: 1 })
1604 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1605 expect(result1).toStrictEqual({ ok: 1 })
1606 const result2 = await pool.execute(data, 'factorial')
1607 expect(result2).toBe(3628800)
1608 const result3 = await pool.execute(data, 'fibonacci')
1609 expect(result3).toBe(55)
1610 expect(pool.info.executingTasks).toBe(0)
1611 expect(pool.info.executedTasks).toBe(4)
1612 for (const workerNode of pool.workerNodes) {
1613 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1615 'jsonIntegerSerialization',
1619 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1620 for (const name of pool.listTaskFunctionNames()) {
1621 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1623 executed: expect.any(Number),
1627 sequentiallyStolen: 0,
1631 history: expect.any(CircularArray)
1634 history: expect.any(CircularArray)
1638 history: expect.any(CircularArray)
1641 history: expect.any(CircularArray)
1646 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1647 ).toBeGreaterThan(0)
1650 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1652 workerNode.getTaskFunctionWorkerUsage(
1653 workerNode.info.taskFunctionNames[1]
1657 await pool.destroy()
1660 it('Verify sendKillMessageToWorker()', async () => {
1661 const pool = new DynamicClusterPool(
1662 Math.floor(numberOfWorkers / 2),
1664 './tests/worker-files/cluster/testWorker.js'
1666 const workerNodeKey = 0
1668 pool.sendKillMessageToWorker(workerNodeKey)
1669 ).resolves.toBeUndefined()
1670 await pool.destroy()
1673 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1674 const pool = new DynamicClusterPool(
1675 Math.floor(numberOfWorkers / 2),
1677 './tests/worker-files/cluster/testWorker.js'
1679 const workerNodeKey = 0
1681 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1682 taskFunctionOperation: 'add',
1683 taskFunctionName: 'empty',
1684 taskFunction: (() => {}).toString()
1686 ).resolves.toBe(true)
1688 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1689 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1690 await pool.destroy()
1693 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1694 const pool = new DynamicClusterPool(
1695 Math.floor(numberOfWorkers / 2),
1697 './tests/worker-files/cluster/testWorker.js'
1700 pool.sendTaskFunctionOperationToWorkers({
1701 taskFunctionOperation: 'add',
1702 taskFunctionName: 'empty',
1703 taskFunction: (() => {}).toString()
1705 ).resolves.toBe(true)
1706 for (const workerNode of pool.workerNodes) {
1707 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1713 await pool.destroy()