1 const { EventEmitterAsyncResource
} = require('node:events')
2 const { expect
} = require('expect')
3 const sinon
= require('sinon')
11 WorkerChoiceStrategies
,
13 } = require('../../lib')
14 const { CircularArray
} = require('../../lib/circular-array')
15 const { Deque
} = require('../../lib/deque')
16 const { DEFAULT_TASK_NAME
} = require('../../lib/utils')
17 const { version
} = require('../../package.json')
18 const { waitPoolEvents
} = require('../test-utils')
19 const { WorkerNode
} = require('../../lib/pools/worker-node')
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers
= 2
23 class StubPoolWithIsMain
extends FixedThreadPool
{
33 it('Simulate pool creation from a non main thread/process', () => {
36 new StubPoolWithIsMain(
38 './tests/worker-files/thread/testWorker.js',
40 errorHandler
: e
=> console
.error(e
)
45 'Cannot start a pool from a worker with the same type as the pool'
50 it('Verify that pool statuses properties are set', async () => {
51 const pool
= new FixedThreadPool(
53 './tests/worker-files/thread/testWorker.js'
55 expect(pool
.starting
).toBe(false)
56 expect(pool
.started
).toBe(true)
60 it('Verify that filePath is checked', () => {
61 const expectedError
= new Error(
62 'Please specify a file with a worker implementation'
64 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
67 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
70 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
73 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
77 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
81 it('Verify that numberOfWorkers is checked', () => {
86 './tests/worker-files/thread/testWorker.js'
90 'Cannot instantiate a pool without specifying the number of workers'
95 it('Verify that a negative number of workers is checked', () => {
98 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
101 'Cannot instantiate a pool with a negative number of workers'
106 it('Verify that a non integer number of workers is checked', () => {
109 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
112 'Cannot instantiate a pool with a non safe integer number of workers'
117 it('Verify that dynamic pool sizing is checked', () => {
120 new DynamicClusterPool(
123 './tests/worker-files/cluster/testWorker.js'
127 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
132 new DynamicThreadPool(
135 './tests/worker-files/thread/testWorker.js'
139 'Cannot instantiate a pool with a non safe integer number of workers'
144 new DynamicClusterPool(
147 './tests/worker-files/cluster/testWorker.js'
151 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
156 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
167 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
172 new DynamicClusterPool(
175 './tests/worker-files/cluster/testWorker.js'
179 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
184 it('Verify that pool options are checked', async () => {
185 let pool
= new FixedThreadPool(
187 './tests/worker-files/thread/testWorker.js'
189 expect(pool
.emitter
).toBeInstanceOf(EventEmitterAsyncResource
)
190 expect(pool
.opts
).toStrictEqual({
193 restartWorkerOnError
: true,
194 enableTasksQueue
: false,
195 workerChoiceStrategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
196 workerChoiceStrategyOptions
: {
198 runTime
: { median
: false },
199 waitTime
: { median
: false },
200 elu
: { median
: false }
203 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
205 runTime
: { median
: false },
206 waitTime
: { median
: false },
207 elu
: { median
: false }
209 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
210 .workerChoiceStrategies
) {
211 expect(workerChoiceStrategy
.opts
).toStrictEqual({
213 runTime
: { median
: false },
214 waitTime
: { median
: false },
215 elu
: { median
: false }
219 const testHandler
= () => console
.info('test handler executed')
220 pool
= new FixedThreadPool(
222 './tests/worker-files/thread/testWorker.js',
224 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
225 workerChoiceStrategyOptions
: {
226 runTime
: { median
: true },
227 weights
: { 0: 300, 1: 200 }
230 restartWorkerOnError
: false,
231 enableTasksQueue
: true,
232 tasksQueueOptions
: { concurrency
: 2 },
233 messageHandler
: testHandler
,
234 errorHandler
: testHandler
,
235 onlineHandler
: testHandler
,
236 exitHandler
: testHandler
239 expect(pool
.emitter
).toBeUndefined()
240 expect(pool
.opts
).toStrictEqual({
243 restartWorkerOnError
: false,
244 enableTasksQueue
: true,
247 size
: Math
.pow(numberOfWorkers
, 2),
249 tasksStealingOnBackPressure
: true
251 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
252 workerChoiceStrategyOptions
: {
254 runTime
: { median
: true },
255 waitTime
: { median
: false },
256 elu
: { median
: false },
257 weights
: { 0: 300, 1: 200 }
259 onlineHandler
: testHandler
,
260 messageHandler
: testHandler
,
261 errorHandler
: testHandler
,
262 exitHandler
: testHandler
264 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
266 runTime
: { median
: true },
267 waitTime
: { median
: false },
268 elu
: { median
: false },
269 weights
: { 0: 300, 1: 200 }
271 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
272 .workerChoiceStrategies
) {
273 expect(workerChoiceStrategy
.opts
).toStrictEqual({
275 runTime
: { median
: true },
276 waitTime
: { median
: false },
277 elu
: { median
: false },
278 weights
: { 0: 300, 1: 200 }
284 it('Verify that pool options are validated', async () => {
289 './tests/worker-files/thread/testWorker.js',
291 workerChoiceStrategy
: 'invalidStrategy'
295 new Error("Invalid worker choice strategy 'invalidStrategy'")
301 './tests/worker-files/thread/testWorker.js',
303 workerChoiceStrategyOptions
: {
304 retries
: 'invalidChoiceRetries'
310 'Invalid worker choice strategy options: retries must be an integer'
317 './tests/worker-files/thread/testWorker.js',
319 workerChoiceStrategyOptions
: {
326 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
333 './tests/worker-files/thread/testWorker.js',
335 workerChoiceStrategyOptions
: { weights
: {} }
340 'Invalid worker choice strategy options: must have a weight for each worker node'
347 './tests/worker-files/thread/testWorker.js',
349 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
354 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
361 './tests/worker-files/thread/testWorker.js',
363 enableTasksQueue
: true,
364 tasksQueueOptions
: 'invalidTasksQueueOptions'
368 new TypeError('Invalid tasks queue options: must be a plain object')
374 './tests/worker-files/thread/testWorker.js',
376 enableTasksQueue
: true,
377 tasksQueueOptions
: { concurrency
: 0 }
382 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.js',
391 enableTasksQueue
: true,
392 tasksQueueOptions
: { concurrency
: -1 }
397 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
404 './tests/worker-files/thread/testWorker.js',
406 enableTasksQueue
: true,
407 tasksQueueOptions
: { concurrency
: 0.2 }
411 new TypeError('Invalid worker node tasks concurrency: must be an integer')
417 './tests/worker-files/thread/testWorker.js',
419 enableTasksQueue
: true,
420 tasksQueueOptions
: { size
: 0 }
425 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.js',
434 enableTasksQueue
: true,
435 tasksQueueOptions
: { size
: -1 }
440 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
447 './tests/worker-files/thread/testWorker.js',
449 enableTasksQueue
: true,
450 tasksQueueOptions
: { size
: 0.2 }
454 new TypeError('Invalid worker node tasks queue size: must be an integer')
458 it('Verify that pool worker choice strategy options can be set', async () => {
459 const pool
= new FixedThreadPool(
461 './tests/worker-files/thread/testWorker.js',
462 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
464 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
466 runTime
: { median
: false },
467 waitTime
: { median
: false },
468 elu
: { median
: false }
470 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
472 runTime
: { median
: false },
473 waitTime
: { median
: false },
474 elu
: { median
: false }
476 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
477 .workerChoiceStrategies
) {
478 expect(workerChoiceStrategy
.opts
).toStrictEqual({
480 runTime
: { median
: false },
481 waitTime
: { median
: false },
482 elu
: { median
: false }
486 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
504 pool
.setWorkerChoiceStrategyOptions({
505 runTime
: { median
: true },
506 elu
: { median
: true }
508 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
510 runTime
: { median
: true },
511 waitTime
: { median
: false },
512 elu
: { median
: true }
514 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
516 runTime
: { median
: true },
517 waitTime
: { median
: false },
518 elu
: { median
: true }
520 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
521 .workerChoiceStrategies
) {
522 expect(workerChoiceStrategy
.opts
).toStrictEqual({
524 runTime
: { median
: true },
525 waitTime
: { median
: false },
526 elu
: { median
: true }
530 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
548 pool
.setWorkerChoiceStrategyOptions({
549 runTime
: { median
: false },
550 elu
: { median
: false }
552 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
554 runTime
: { median
: false },
555 waitTime
: { median
: false },
556 elu
: { median
: false }
558 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
560 runTime
: { median
: false },
561 waitTime
: { median
: false },
562 elu
: { median
: false }
564 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
565 .workerChoiceStrategies
) {
566 expect(workerChoiceStrategy
.opts
).toStrictEqual({
568 runTime
: { median
: false },
569 waitTime
: { median
: false },
570 elu
: { median
: false }
574 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
593 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
596 'Invalid worker choice strategy options: must be a plain object'
600 pool
.setWorkerChoiceStrategyOptions({
601 retries
: 'invalidChoiceRetries'
605 'Invalid worker choice strategy options: retries must be an integer'
609 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
616 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
619 'Invalid worker choice strategy options: must have a weight for each worker node'
623 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
626 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
632 it('Verify that pool tasks queue can be enabled/disabled', async () => {
633 const pool
= new FixedThreadPool(
635 './tests/worker-files/thread/testWorker.js'
637 expect(pool
.opts
.enableTasksQueue
).toBe(false)
638 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
639 for (const workerNode
of pool
.workerNodes
) {
640 expect(workerNode
.onEmptyQueue
).toBeUndefined()
641 expect(workerNode
.onBackPressure
).toBeUndefined()
643 pool
.enableTasksQueue(true)
644 expect(pool
.opts
.enableTasksQueue
).toBe(true)
645 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
647 size
: Math
.pow(numberOfWorkers
, 2),
649 tasksStealingOnBackPressure
: true
651 for (const workerNode
of pool
.workerNodes
) {
652 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
653 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
655 pool
.enableTasksQueue(true, { concurrency
: 2 })
656 expect(pool
.opts
.enableTasksQueue
).toBe(true)
657 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
659 size
: Math
.pow(numberOfWorkers
, 2),
661 tasksStealingOnBackPressure
: true
663 for (const workerNode
of pool
.workerNodes
) {
664 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
665 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
667 pool
.enableTasksQueue(false)
668 expect(pool
.opts
.enableTasksQueue
).toBe(false)
669 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
670 for (const workerNode
of pool
.workerNodes
) {
671 expect(workerNode
.onEmptyQueue
).toBeUndefined()
672 expect(workerNode
.onBackPressure
).toBeUndefined()
677 it('Verify that pool tasks queue options can be set', async () => {
678 const pool
= new FixedThreadPool(
680 './tests/worker-files/thread/testWorker.js',
681 { enableTasksQueue
: true }
683 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
685 size
: Math
.pow(numberOfWorkers
, 2),
687 tasksStealingOnBackPressure
: true
689 for (const workerNode
of pool
.workerNodes
) {
690 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
691 pool
.opts
.tasksQueueOptions
.size
693 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
694 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
696 pool
.setTasksQueueOptions({
700 tasksStealingOnBackPressure
: false
702 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
706 tasksStealingOnBackPressure
: false
708 for (const workerNode
of pool
.workerNodes
) {
709 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
710 pool
.opts
.tasksQueueOptions
.size
712 expect(workerNode
.onEmptyQueue
).toBeUndefined()
713 expect(workerNode
.onBackPressure
).toBeUndefined()
715 pool
.setTasksQueueOptions({
718 tasksStealingOnBackPressure
: true
720 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
722 size
: Math
.pow(numberOfWorkers
, 2),
724 tasksStealingOnBackPressure
: true
726 for (const workerNode
of pool
.workerNodes
) {
727 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
728 pool
.opts
.tasksQueueOptions
.size
730 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
731 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
734 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
736 new TypeError('Invalid tasks queue options: must be a plain object')
738 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
740 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
743 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
745 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
748 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
749 new TypeError('Invalid worker node tasks concurrency: must be an integer')
751 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
753 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
756 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
758 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
761 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
762 new TypeError('Invalid worker node tasks queue size: must be an integer')
767 it('Verify that pool info is set', async () => {
768 let pool
= new FixedThreadPool(
770 './tests/worker-files/thread/testWorker.js'
772 expect(pool
.info
).toStrictEqual({
774 type
: PoolTypes
.fixed
,
775 worker
: WorkerTypes
.thread
,
778 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
779 minSize
: numberOfWorkers
,
780 maxSize
: numberOfWorkers
,
781 workerNodes
: numberOfWorkers
,
782 idleWorkerNodes
: numberOfWorkers
,
789 pool
= new DynamicClusterPool(
790 Math
.floor(numberOfWorkers
/ 2),
792 './tests/worker-files/cluster/testWorker.js'
794 expect(pool
.info
).toStrictEqual({
796 type
: PoolTypes
.dynamic
,
797 worker
: WorkerTypes
.cluster
,
800 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
801 minSize
: Math
.floor(numberOfWorkers
/ 2),
802 maxSize
: numberOfWorkers
,
803 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
804 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
813 it('Verify that pool worker tasks usage are initialized', async () => {
814 const pool
= new FixedClusterPool(
816 './tests/worker-files/cluster/testWorker.js'
818 for (const workerNode
of pool
.workerNodes
) {
819 expect(workerNode
).toBeInstanceOf(WorkerNode
)
820 expect(workerNode
.usage
).toStrictEqual({
830 history
: new CircularArray()
833 history
: new CircularArray()
837 history
: new CircularArray()
840 history
: new CircularArray()
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool
= new FixedClusterPool(
851 './tests/worker-files/cluster/testWorker.js'
853 for (const workerNode
of pool
.workerNodes
) {
854 expect(workerNode
).toBeInstanceOf(WorkerNode
)
855 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
856 expect(workerNode
.tasksQueue
.size
).toBe(0)
857 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
860 pool
= new DynamicThreadPool(
861 Math
.floor(numberOfWorkers
/ 2),
863 './tests/worker-files/thread/testWorker.js'
865 for (const workerNode
of pool
.workerNodes
) {
866 expect(workerNode
).toBeInstanceOf(WorkerNode
)
867 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
868 expect(workerNode
.tasksQueue
.size
).toBe(0)
869 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
874 it('Verify that pool worker info are initialized', async () => {
875 let pool
= new FixedClusterPool(
877 './tests/worker-files/cluster/testWorker.js'
879 for (const workerNode
of pool
.workerNodes
) {
880 expect(workerNode
).toBeInstanceOf(WorkerNode
)
881 expect(workerNode
.info
).toStrictEqual({
882 id
: expect
.any(Number
),
883 type
: WorkerTypes
.cluster
,
889 pool
= new DynamicThreadPool(
890 Math
.floor(numberOfWorkers
/ 2),
892 './tests/worker-files/thread/testWorker.js'
894 for (const workerNode
of pool
.workerNodes
) {
895 expect(workerNode
).toBeInstanceOf(WorkerNode
)
896 expect(workerNode
.info
).toStrictEqual({
897 id
: expect
.any(Number
),
898 type
: WorkerTypes
.thread
,
906 it('Verify that pool can be started after initialization', async () => {
907 const pool
= new FixedClusterPool(
909 './tests/worker-files/cluster/testWorker.js',
914 expect(pool
.info
.started
).toBe(false)
915 expect(pool
.info
.ready
).toBe(false)
916 expect(pool
.workerNodes
).toStrictEqual([])
917 await
expect(pool
.execute()).rejects
.toThrowError(
918 new Error('Cannot execute a task on not started pool')
921 expect(pool
.info
.started
).toBe(true)
922 expect(pool
.info
.ready
).toBe(true)
923 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
924 for (const workerNode
of pool
.workerNodes
) {
925 expect(workerNode
).toBeInstanceOf(WorkerNode
)
930 it('Verify that pool execute() arguments are checked', async () => {
931 const pool
= new FixedClusterPool(
933 './tests/worker-files/cluster/testWorker.js'
935 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
936 new TypeError('name argument must be a string')
938 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
939 new TypeError('name argument must not be an empty string')
941 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
942 new TypeError('transferList argument must be an array')
944 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
945 "Task function 'unknown' not found"
948 await
expect(pool
.execute()).rejects
.toThrowError(
949 new Error('Cannot execute a task on not started pool')
953 it('Verify that pool worker tasks usage are computed', async () => {
954 const pool
= new FixedClusterPool(
956 './tests/worker-files/cluster/testWorker.js'
958 const promises
= new Set()
959 const maxMultiplier
= 2
960 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
961 promises
.add(pool
.execute())
963 for (const workerNode
of pool
.workerNodes
) {
964 expect(workerNode
.usage
).toStrictEqual({
967 executing
: maxMultiplier
,
974 history
: expect
.any(CircularArray
)
977 history
: expect
.any(CircularArray
)
981 history
: expect
.any(CircularArray
)
984 history
: expect
.any(CircularArray
)
989 await Promise
.all(promises
)
990 for (const workerNode
of pool
.workerNodes
) {
991 expect(workerNode
.usage
).toStrictEqual({
993 executed
: maxMultiplier
,
1001 history
: expect
.any(CircularArray
)
1004 history
: expect
.any(CircularArray
)
1008 history
: expect
.any(CircularArray
)
1011 history
: expect
.any(CircularArray
)
1016 await pool
.destroy()
1019 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1020 const pool
= new DynamicThreadPool(
1021 Math
.floor(numberOfWorkers
/ 2),
1023 './tests/worker-files/thread/testWorker.js'
1025 const promises
= new Set()
1026 const maxMultiplier
= 2
1027 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
1028 promises
.add(pool
.execute())
1030 await Promise
.all(promises
)
1031 for (const workerNode
of pool
.workerNodes
) {
1032 expect(workerNode
.usage
).toStrictEqual({
1034 executed
: expect
.any(Number
),
1042 history
: expect
.any(CircularArray
)
1045 history
: expect
.any(CircularArray
)
1049 history
: expect
.any(CircularArray
)
1052 history
: expect
.any(CircularArray
)
1056 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
1057 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
1058 numberOfWorkers
* maxMultiplier
1060 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1061 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1062 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1063 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1065 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
1066 for (const workerNode
of pool
.workerNodes
) {
1067 expect(workerNode
.usage
).toStrictEqual({
1077 history
: expect
.any(CircularArray
)
1080 history
: expect
.any(CircularArray
)
1084 history
: expect
.any(CircularArray
)
1087 history
: expect
.any(CircularArray
)
1091 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1092 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1093 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1094 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1096 await pool
.destroy()
1099 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1100 const pool
= new DynamicClusterPool(
1101 Math
.floor(numberOfWorkers
/ 2),
1103 './tests/worker-files/cluster/testWorker.js'
1105 expect(pool
.emitter
.eventNames()).toStrictEqual([])
1108 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
1112 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1113 expect(pool
.emitter
.eventNames()).toStrictEqual([PoolEvents
.ready
])
1114 expect(poolReady
).toBe(1)
1115 expect(poolInfo
).toStrictEqual({
1117 type
: PoolTypes
.dynamic
,
1118 worker
: WorkerTypes
.cluster
,
1121 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1122 minSize
: expect
.any(Number
),
1123 maxSize
: expect
.any(Number
),
1124 workerNodes
: expect
.any(Number
),
1125 idleWorkerNodes
: expect
.any(Number
),
1126 busyWorkerNodes
: expect
.any(Number
),
1127 executedTasks
: expect
.any(Number
),
1128 executingTasks
: expect
.any(Number
),
1129 failedTasks
: expect
.any(Number
)
1131 await pool
.destroy()
1134 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1135 const pool
= new FixedThreadPool(
1137 './tests/worker-files/thread/testWorker.js'
1139 expect(pool
.emitter
.eventNames()).toStrictEqual([])
1140 const promises
= new Set()
1143 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
1147 expect(pool
.emitter
.eventNames()).toStrictEqual([PoolEvents
.busy
])
1148 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1149 promises
.add(pool
.execute())
1151 await Promise
.all(promises
)
1152 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1153 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1154 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1155 expect(poolInfo
).toStrictEqual({
1157 type
: PoolTypes
.fixed
,
1158 worker
: WorkerTypes
.thread
,
1161 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1162 minSize
: expect
.any(Number
),
1163 maxSize
: expect
.any(Number
),
1164 workerNodes
: expect
.any(Number
),
1165 idleWorkerNodes
: expect
.any(Number
),
1166 busyWorkerNodes
: expect
.any(Number
),
1167 executedTasks
: expect
.any(Number
),
1168 executingTasks
: expect
.any(Number
),
1169 failedTasks
: expect
.any(Number
)
1171 await pool
.destroy()
1174 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1175 const pool
= new DynamicThreadPool(
1176 Math
.floor(numberOfWorkers
/ 2),
1178 './tests/worker-files/thread/testWorker.js'
1180 expect(pool
.emitter
.eventNames()).toStrictEqual([])
1181 const promises
= new Set()
1184 pool
.emitter
.on(PoolEvents
.full
, info
=> {
1188 expect(pool
.emitter
.eventNames()).toStrictEqual([PoolEvents
.full
])
1189 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1190 promises
.add(pool
.execute())
1192 await Promise
.all(promises
)
1193 expect(poolFull
).toBe(1)
1194 expect(poolInfo
).toStrictEqual({
1196 type
: PoolTypes
.dynamic
,
1197 worker
: WorkerTypes
.thread
,
1200 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1201 minSize
: expect
.any(Number
),
1202 maxSize
: expect
.any(Number
),
1203 workerNodes
: expect
.any(Number
),
1204 idleWorkerNodes
: expect
.any(Number
),
1205 busyWorkerNodes
: expect
.any(Number
),
1206 executedTasks
: expect
.any(Number
),
1207 executingTasks
: expect
.any(Number
),
1208 failedTasks
: expect
.any(Number
)
1210 await pool
.destroy()
1213 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1214 const pool
= new FixedThreadPool(
1216 './tests/worker-files/thread/testWorker.js',
1218 enableTasksQueue
: true
1221 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1222 expect(pool
.emitter
.eventNames()).toStrictEqual([])
1223 const promises
= new Set()
1224 let poolBackPressure
= 0
1226 pool
.emitter
.on(PoolEvents
.backPressure
, info
=> {
1230 expect(pool
.emitter
.eventNames()).toStrictEqual([PoolEvents
.backPressure
])
1231 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1232 promises
.add(pool
.execute())
1234 await Promise
.all(promises
)
1235 expect(poolBackPressure
).toBe(1)
1236 expect(poolInfo
).toStrictEqual({
1238 type
: PoolTypes
.fixed
,
1239 worker
: WorkerTypes
.thread
,
1242 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1243 minSize
: expect
.any(Number
),
1244 maxSize
: expect
.any(Number
),
1245 workerNodes
: expect
.any(Number
),
1246 idleWorkerNodes
: expect
.any(Number
),
1247 busyWorkerNodes
: expect
.any(Number
),
1248 executedTasks
: expect
.any(Number
),
1249 executingTasks
: expect
.any(Number
),
1250 maxQueuedTasks
: expect
.any(Number
),
1251 queuedTasks
: expect
.any(Number
),
1253 stolenTasks
: expect
.any(Number
),
1254 failedTasks
: expect
.any(Number
)
1256 expect(pool
.hasBackPressure
.called
).toBe(true)
1257 await pool
.destroy()
1260 it('Verify that hasTaskFunction() is working', async () => {
1261 const dynamicThreadPool
= new DynamicThreadPool(
1262 Math
.floor(numberOfWorkers
/ 2),
1264 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1266 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1267 expect(dynamicThreadPool
.hasTaskFunction(DEFAULT_TASK_NAME
)).toBe(true)
1268 expect(dynamicThreadPool
.hasTaskFunction('jsonIntegerSerialization')).toBe(
1271 expect(dynamicThreadPool
.hasTaskFunction('factorial')).toBe(true)
1272 expect(dynamicThreadPool
.hasTaskFunction('fibonacci')).toBe(true)
1273 expect(dynamicThreadPool
.hasTaskFunction('unknown')).toBe(false)
1274 await dynamicThreadPool
.destroy()
1275 const fixedClusterPool
= new FixedClusterPool(
1277 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1279 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1280 expect(fixedClusterPool
.hasTaskFunction(DEFAULT_TASK_NAME
)).toBe(true)
1281 expect(fixedClusterPool
.hasTaskFunction('jsonIntegerSerialization')).toBe(
1284 expect(fixedClusterPool
.hasTaskFunction('factorial')).toBe(true)
1285 expect(fixedClusterPool
.hasTaskFunction('fibonacci')).toBe(true)
1286 expect(fixedClusterPool
.hasTaskFunction('unknown')).toBe(false)
1287 await fixedClusterPool
.destroy()
1290 it('Verify that addTaskFunction() is working', async () => {
1291 const dynamicThreadPool
= new DynamicThreadPool(
1292 Math
.floor(numberOfWorkers
/ 2),
1294 './tests/worker-files/thread/testWorker.js'
1296 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1298 dynamicThreadPool
.addTaskFunction(0, () => {})
1299 ).rejects
.toThrowError(new TypeError('name argument must be a string'))
1301 dynamicThreadPool
.addTaskFunction('', () => {})
1302 ).rejects
.toThrowError(
1303 new TypeError('name argument must not be an empty string')
1306 dynamicThreadPool
.addTaskFunction('test', 0)
1307 ).rejects
.toThrowError(new TypeError('fn argument must be a function'))
1309 dynamicThreadPool
.addTaskFunction('test', '')
1310 ).rejects
.toThrowError(new TypeError('fn argument must be a function'))
1311 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1315 const echoTaskFunction
= data
=> {
1319 dynamicThreadPool
.addTaskFunction('echo', echoTaskFunction
)
1320 ).resolves
.toBe(true)
1321 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(1)
1322 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toStrictEqual(
1325 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1330 const taskFunctionData
= { test
: 'test' }
1331 const echoResult
= await dynamicThreadPool
.execute(taskFunctionData
, 'echo')
1332 expect(echoResult
).toStrictEqual(taskFunctionData
)
1333 for (const workerNode
of dynamicThreadPool
.workerNodes
) {
1334 expect(workerNode
.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1336 executed
: expect
.any(Number
),
1343 history
: new CircularArray()
1346 history
: new CircularArray()
1350 history
: new CircularArray()
1353 history
: new CircularArray()
1358 await dynamicThreadPool
.destroy()
1361 it('Verify that removeTaskFunction() is working', async () => {
1362 const dynamicThreadPool
= new DynamicThreadPool(
1363 Math
.floor(numberOfWorkers
/ 2),
1365 './tests/worker-files/thread/testWorker.js'
1367 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1368 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1373 dynamicThreadPool
.removeTaskFunction('test')
1374 ).rejects
.toThrowError(
1375 new Error('Cannot remove a task function not handled on the pool side')
1377 const echoTaskFunction
= data
=> {
1380 await dynamicThreadPool
.addTaskFunction('echo', echoTaskFunction
)
1381 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(1)
1382 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toStrictEqual(
1385 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1390 await
expect(dynamicThreadPool
.removeTaskFunction('echo')).resolves
.toBe(
1393 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(0)
1394 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toBeUndefined()
1395 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1399 await dynamicThreadPool
.destroy()
1402 it('Verify that listTaskFunctionNames() is working', async () => {
1403 const dynamicThreadPool
= new DynamicThreadPool(
1404 Math
.floor(numberOfWorkers
/ 2),
1406 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1408 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1409 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1411 'jsonIntegerSerialization',
1415 await dynamicThreadPool
.destroy()
1416 const fixedClusterPool
= new FixedClusterPool(
1418 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1420 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1421 expect(fixedClusterPool
.listTaskFunctionNames()).toStrictEqual([
1423 'jsonIntegerSerialization',
1427 await fixedClusterPool
.destroy()
1430 it('Verify that setDefaultTaskFunction() is working', async () => {
1431 const dynamicThreadPool
= new DynamicThreadPool(
1432 Math
.floor(numberOfWorkers
/ 2),
1434 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1436 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1438 dynamicThreadPool
.setDefaultTaskFunction(0)
1439 ).rejects
.toThrowError(
1441 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1445 dynamicThreadPool
.setDefaultTaskFunction(DEFAULT_TASK_NAME
)
1446 ).rejects
.toThrowError(
1448 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1452 dynamicThreadPool
.setDefaultTaskFunction('unknown')
1453 ).rejects
.toThrowError(
1455 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1458 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1460 'jsonIntegerSerialization',
1465 dynamicThreadPool
.setDefaultTaskFunction('factorial')
1466 ).resolves
.toBe(true)
1467 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1470 'jsonIntegerSerialization',
1474 dynamicThreadPool
.setDefaultTaskFunction('fibonacci')
1475 ).resolves
.toBe(true)
1476 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1479 'jsonIntegerSerialization',
1484 it('Verify that multiple task functions worker is working', async () => {
1485 const pool
= new DynamicClusterPool(
1486 Math
.floor(numberOfWorkers
/ 2),
1488 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1490 const data
= { n
: 10 }
1491 const result0
= await pool
.execute(data
)
1492 expect(result0
).toStrictEqual({ ok
: 1 })
1493 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1494 expect(result1
).toStrictEqual({ ok
: 1 })
1495 const result2
= await pool
.execute(data
, 'factorial')
1496 expect(result2
).toBe(3628800)
1497 const result3
= await pool
.execute(data
, 'fibonacci')
1498 expect(result3
).toBe(55)
1499 expect(pool
.info
.executingTasks
).toBe(0)
1500 expect(pool
.info
.executedTasks
).toBe(4)
1501 for (const workerNode
of pool
.workerNodes
) {
1502 expect(workerNode
.info
.taskFunctionNames
).toStrictEqual([
1504 'jsonIntegerSerialization',
1508 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1509 for (const name
of pool
.listTaskFunctionNames()) {
1510 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1512 executed
: expect
.any(Number
),
1519 history
: expect
.any(CircularArray
)
1522 history
: expect
.any(CircularArray
)
1526 history
: expect
.any(CircularArray
)
1529 history
: expect
.any(CircularArray
)
1534 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executed
1535 ).toBeGreaterThan(0)
1538 workerNode
.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME
)
1540 workerNode
.getTaskFunctionWorkerUsage(
1541 workerNode
.info
.taskFunctionNames
[1]
1545 await pool
.destroy()
1548 it('Verify sendKillMessageToWorker()', async () => {
1549 const pool
= new DynamicClusterPool(
1550 Math
.floor(numberOfWorkers
/ 2),
1552 './tests/worker-files/cluster/testWorker.js'
1554 const workerNodeKey
= 0
1556 pool
.sendKillMessageToWorker(workerNodeKey
)
1557 ).resolves
.toBeUndefined()
1558 await pool
.destroy()
1561 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1562 const pool
= new DynamicClusterPool(
1563 Math
.floor(numberOfWorkers
/ 2),
1565 './tests/worker-files/cluster/testWorker.js'
1567 const workerNodeKey
= 0
1569 pool
.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1570 taskFunctionOperation
: 'add',
1571 taskFunctionName
: 'empty',
1572 taskFunction
: (() => {}).toString()
1574 ).resolves
.toBe(true)
1576 pool
.workerNodes
[workerNodeKey
].info
.taskFunctionNames
1577 ).toStrictEqual([DEFAULT_TASK_NAME
, 'test', 'empty'])
1578 await pool
.destroy()
1581 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1582 const pool
= new DynamicClusterPool(
1583 Math
.floor(numberOfWorkers
/ 2),
1585 './tests/worker-files/cluster/testWorker.js'
1588 pool
.sendTaskFunctionOperationToWorkers({
1589 taskFunctionOperation
: 'add',
1590 taskFunctionName
: 'empty',
1591 taskFunction
: (() => {}).toString()
1593 ).resolves
.toBe(true)
1594 for (const workerNode
of pool
.workerNodes
) {
1595 expect(workerNode
.info
.taskFunctionNames
).toStrictEqual([
1601 await pool
.destroy()