1 const { EventEmitter
} = require('node:events')
2 const { expect
} = require('expect')
3 const sinon
= require('sinon')
11 WorkerChoiceStrategies
,
13 } = require('../../../lib')
14 const { CircularArray
} = require('../../../lib/circular-array')
15 const { Deque
} = require('../../../lib/deque')
16 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
17 const { version
} = require('../../../package.json')
18 const { waitPoolEvents
} = require('../../test-utils')
19 const { WorkerNode
} = require('../../../lib/pools/worker-node')
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers
= 2
23 class StubPoolWithIsMain
extends FixedThreadPool
{
33 it('Simulate pool creation from a non main thread/process', () => {
36 new StubPoolWithIsMain(
38 './tests/worker-files/thread/testWorker.js',
40 errorHandler
: e
=> console
.error(e
)
45 'Cannot start a pool from a worker with the same type as the pool'
50 it('Verify that pool statuses properties are set', async () => {
51 const pool
= new FixedThreadPool(
53 './tests/worker-files/thread/testWorker.js'
55 expect(pool
.starting
).toBe(false)
56 expect(pool
.started
).toBe(true)
60 it('Verify that filePath is checked', () => {
61 const expectedError
= new Error(
62 'Please specify a file with a worker implementation'
64 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
67 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
70 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
73 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
77 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(() => new FixedThreadPool()).toThrowError(
84 'Cannot instantiate a pool without specifying the number of workers'
89 it('Verify that a negative number of workers is checked', () => {
92 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
95 'Cannot instantiate a pool with a negative number of workers'
100 it('Verify that a non integer number of workers is checked', () => {
103 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
106 'Cannot instantiate a pool with a non safe integer number of workers'
111 it('Verify that dynamic pool sizing is checked', () => {
114 new DynamicClusterPool(
117 './tests/worker-files/cluster/testWorker.js'
121 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
126 new DynamicThreadPool(
129 './tests/worker-files/thread/testWorker.js'
133 'Cannot instantiate a pool with a non safe integer number of workers'
138 new DynamicClusterPool(
141 './tests/worker-files/cluster/testWorker.js'
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
150 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
153 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
158 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
161 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
166 new DynamicClusterPool(
169 './tests/worker-files/cluster/testWorker.js'
173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
178 it('Verify that pool options are checked', async () => {
179 let pool
= new FixedThreadPool(
181 './tests/worker-files/thread/testWorker.js'
183 expect(pool
.emitter
).toBeInstanceOf(EventEmitter
)
184 expect(pool
.opts
).toStrictEqual({
187 restartWorkerOnError
: true,
188 enableTasksQueue
: false,
189 workerChoiceStrategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
190 workerChoiceStrategyOptions
: {
192 runTime
: { median
: false },
193 waitTime
: { median
: false },
194 elu
: { median
: false }
197 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
199 runTime
: { median
: false },
200 waitTime
: { median
: false },
201 elu
: { median
: false }
203 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
204 .workerChoiceStrategies
) {
205 expect(workerChoiceStrategy
.opts
).toStrictEqual({
207 runTime
: { median
: false },
208 waitTime
: { median
: false },
209 elu
: { median
: false }
213 const testHandler
= () => console
.info('test handler executed')
214 pool
= new FixedThreadPool(
216 './tests/worker-files/thread/testWorker.js',
218 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
219 workerChoiceStrategyOptions
: {
220 runTime
: { median
: true },
221 weights
: { 0: 300, 1: 200 }
224 restartWorkerOnError
: false,
225 enableTasksQueue
: true,
226 tasksQueueOptions
: { concurrency
: 2 },
227 messageHandler
: testHandler
,
228 errorHandler
: testHandler
,
229 onlineHandler
: testHandler
,
230 exitHandler
: testHandler
233 expect(pool
.emitter
).toBeUndefined()
234 expect(pool
.opts
).toStrictEqual({
237 restartWorkerOnError
: false,
238 enableTasksQueue
: true,
243 tasksStealingOnBackPressure
: true
245 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
246 workerChoiceStrategyOptions
: {
248 runTime
: { median
: true },
249 waitTime
: { median
: false },
250 elu
: { median
: false },
251 weights
: { 0: 300, 1: 200 }
253 onlineHandler
: testHandler
,
254 messageHandler
: testHandler
,
255 errorHandler
: testHandler
,
256 exitHandler
: testHandler
258 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
260 runTime
: { median
: true },
261 waitTime
: { median
: false },
262 elu
: { median
: false },
263 weights
: { 0: 300, 1: 200 }
265 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
266 .workerChoiceStrategies
) {
267 expect(workerChoiceStrategy
.opts
).toStrictEqual({
269 runTime
: { median
: true },
270 waitTime
: { median
: false },
271 elu
: { median
: false },
272 weights
: { 0: 300, 1: 200 }
278 it('Verify that pool options are validated', async () => {
283 './tests/worker-files/thread/testWorker.js',
285 workerChoiceStrategy
: 'invalidStrategy'
289 new Error("Invalid worker choice strategy 'invalidStrategy'")
295 './tests/worker-files/thread/testWorker.js',
297 workerChoiceStrategyOptions
: {
298 retries
: 'invalidChoiceRetries'
304 'Invalid worker choice strategy options: retries must be an integer'
311 './tests/worker-files/thread/testWorker.js',
313 workerChoiceStrategyOptions
: {
320 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
327 './tests/worker-files/thread/testWorker.js',
329 workerChoiceStrategyOptions
: { weights
: {} }
334 'Invalid worker choice strategy options: must have a weight for each worker node'
341 './tests/worker-files/thread/testWorker.js',
343 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
348 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 './tests/worker-files/thread/testWorker.js',
357 enableTasksQueue
: true,
358 tasksQueueOptions
: 'invalidTasksQueueOptions'
362 new TypeError('Invalid tasks queue options: must be a plain object')
368 './tests/worker-files/thread/testWorker.js',
370 enableTasksQueue
: true,
371 tasksQueueOptions
: { concurrency
: 0 }
376 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
383 './tests/worker-files/thread/testWorker.js',
385 enableTasksQueue
: true,
386 tasksQueueOptions
: { concurrency
: -1 }
391 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 './tests/worker-files/thread/testWorker.js',
400 enableTasksQueue
: true,
401 tasksQueueOptions
: { concurrency
: 0.2 }
405 new TypeError('Invalid worker node tasks concurrency: must be an integer')
411 './tests/worker-files/thread/testWorker.js',
413 enableTasksQueue
: true,
414 tasksQueueOptions
: { size
: 0 }
419 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
426 './tests/worker-files/thread/testWorker.js',
428 enableTasksQueue
: true,
429 tasksQueueOptions
: { size
: -1 }
434 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
441 './tests/worker-files/thread/testWorker.js',
443 enableTasksQueue
: true,
444 tasksQueueOptions
: { size
: 0.2 }
448 new TypeError('Invalid worker node tasks queue size: must be an integer')
452 it('Verify that pool worker choice strategy options can be set', async () => {
453 const pool
= new FixedThreadPool(
455 './tests/worker-files/thread/testWorker.js',
456 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
458 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
460 runTime
: { median
: false },
461 waitTime
: { median
: false },
462 elu
: { median
: false }
464 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
466 runTime
: { median
: false },
467 waitTime
: { median
: false },
468 elu
: { median
: false }
470 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
471 .workerChoiceStrategies
) {
472 expect(workerChoiceStrategy
.opts
).toStrictEqual({
474 runTime
: { median
: false },
475 waitTime
: { median
: false },
476 elu
: { median
: false }
480 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
498 pool
.setWorkerChoiceStrategyOptions({
499 runTime
: { median
: true },
500 elu
: { median
: true }
502 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
504 runTime
: { median
: true },
505 waitTime
: { median
: false },
506 elu
: { median
: true }
508 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
510 runTime
: { median
: true },
511 waitTime
: { median
: false },
512 elu
: { median
: true }
514 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
515 .workerChoiceStrategies
) {
516 expect(workerChoiceStrategy
.opts
).toStrictEqual({
518 runTime
: { median
: true },
519 waitTime
: { median
: false },
520 elu
: { median
: true }
524 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
542 pool
.setWorkerChoiceStrategyOptions({
543 runTime
: { median
: false },
544 elu
: { median
: false }
546 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
548 runTime
: { median
: false },
549 waitTime
: { median
: false },
550 elu
: { median
: false }
552 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
554 runTime
: { median
: false },
555 waitTime
: { median
: false },
556 elu
: { median
: false }
558 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
559 .workerChoiceStrategies
) {
560 expect(workerChoiceStrategy
.opts
).toStrictEqual({
562 runTime
: { median
: false },
563 waitTime
: { median
: false },
564 elu
: { median
: false }
568 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
587 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
590 'Invalid worker choice strategy options: must be a plain object'
594 pool
.setWorkerChoiceStrategyOptions({
595 retries
: 'invalidChoiceRetries'
599 'Invalid worker choice strategy options: retries must be an integer'
603 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
606 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
610 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
613 'Invalid worker choice strategy options: must have a weight for each worker node'
617 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
620 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
626 it('Verify that pool tasks queue can be enabled/disabled', async () => {
627 const pool
= new FixedThreadPool(
629 './tests/worker-files/thread/testWorker.js'
631 expect(pool
.opts
.enableTasksQueue
).toBe(false)
632 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
633 pool
.enableTasksQueue(true)
634 expect(pool
.opts
.enableTasksQueue
).toBe(true)
635 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
639 tasksStealingOnBackPressure
: true
641 pool
.enableTasksQueue(true, { concurrency
: 2 })
642 expect(pool
.opts
.enableTasksQueue
).toBe(true)
643 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
647 tasksStealingOnBackPressure
: true
649 pool
.enableTasksQueue(false)
650 expect(pool
.opts
.enableTasksQueue
).toBe(false)
651 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
655 it('Verify that pool tasks queue options can be set', async () => {
656 const pool
= new FixedThreadPool(
658 './tests/worker-files/thread/testWorker.js',
659 { enableTasksQueue
: true }
661 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
665 tasksStealingOnBackPressure
: true
667 pool
.setTasksQueueOptions({ concurrency
: 2 })
668 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
672 tasksStealingOnBackPressure
: true
675 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
677 new TypeError('Invalid tasks queue options: must be a plain object')
679 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
681 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
684 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
686 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
689 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
690 new TypeError('Invalid worker node tasks concurrency: must be an integer')
692 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
694 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
697 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
699 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
702 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
703 new TypeError('Invalid worker node tasks queue size: must be an integer')
708 it('Verify that pool info is set', async () => {
709 let pool
= new FixedThreadPool(
711 './tests/worker-files/thread/testWorker.js'
713 expect(pool
.info
).toStrictEqual({
715 type
: PoolTypes
.fixed
,
716 worker
: WorkerTypes
.thread
,
719 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
720 minSize
: numberOfWorkers
,
721 maxSize
: numberOfWorkers
,
722 workerNodes
: numberOfWorkers
,
723 idleWorkerNodes
: numberOfWorkers
,
730 pool
= new DynamicClusterPool(
731 Math
.floor(numberOfWorkers
/ 2),
733 './tests/worker-files/cluster/testWorker.js'
735 expect(pool
.info
).toStrictEqual({
737 type
: PoolTypes
.dynamic
,
738 worker
: WorkerTypes
.cluster
,
741 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
742 minSize
: Math
.floor(numberOfWorkers
/ 2),
743 maxSize
: numberOfWorkers
,
744 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
745 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
754 it('Verify that pool worker tasks usage are initialized', async () => {
755 const pool
= new FixedClusterPool(
757 './tests/worker-files/cluster/testWorker.js'
759 for (const workerNode
of pool
.workerNodes
) {
760 expect(workerNode
).toBeInstanceOf(WorkerNode
)
761 expect(workerNode
.usage
).toStrictEqual({
771 history
: new CircularArray()
774 history
: new CircularArray()
778 history
: new CircularArray()
781 history
: new CircularArray()
789 it('Verify that pool worker tasks queue are initialized', async () => {
790 let pool
= new FixedClusterPool(
792 './tests/worker-files/cluster/testWorker.js'
794 for (const workerNode
of pool
.workerNodes
) {
795 expect(workerNode
).toBeInstanceOf(WorkerNode
)
796 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
797 expect(workerNode
.tasksQueue
.size
).toBe(0)
798 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
801 pool
= new DynamicThreadPool(
802 Math
.floor(numberOfWorkers
/ 2),
804 './tests/worker-files/thread/testWorker.js'
806 for (const workerNode
of pool
.workerNodes
) {
807 expect(workerNode
).toBeInstanceOf(WorkerNode
)
808 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
809 expect(workerNode
.tasksQueue
.size
).toBe(0)
810 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
815 it('Verify that pool worker info are initialized', async () => {
816 let pool
= new FixedClusterPool(
818 './tests/worker-files/cluster/testWorker.js'
820 for (const workerNode
of pool
.workerNodes
) {
821 expect(workerNode
).toBeInstanceOf(WorkerNode
)
822 expect(workerNode
.info
).toStrictEqual({
823 id
: expect
.any(Number
),
824 type
: WorkerTypes
.cluster
,
830 pool
= new DynamicThreadPool(
831 Math
.floor(numberOfWorkers
/ 2),
833 './tests/worker-files/thread/testWorker.js'
835 for (const workerNode
of pool
.workerNodes
) {
836 expect(workerNode
).toBeInstanceOf(WorkerNode
)
837 expect(workerNode
.info
).toStrictEqual({
838 id
: expect
.any(Number
),
839 type
: WorkerTypes
.thread
,
847 it('Verify that pool can be started after initialization', async () => {
848 const pool
= new FixedClusterPool(
850 './tests/worker-files/cluster/testWorker.js',
855 expect(pool
.info
.started
).toBe(false)
856 expect(pool
.info
.ready
).toBe(false)
857 expect(pool
.workerNodes
).toStrictEqual([])
858 await
expect(pool
.execute()).rejects
.toThrowError(
859 new Error('Cannot execute a task on not started pool')
862 expect(pool
.info
.started
).toBe(true)
863 expect(pool
.info
.ready
).toBe(true)
864 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
865 for (const workerNode
of pool
.workerNodes
) {
866 expect(workerNode
).toBeInstanceOf(WorkerNode
)
871 it('Verify that pool execute() arguments are checked', async () => {
872 const pool
= new FixedClusterPool(
874 './tests/worker-files/cluster/testWorker.js'
876 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
877 new TypeError('name argument must be a string')
879 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
880 new TypeError('name argument must not be an empty string')
882 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
883 new TypeError('transferList argument must be an array')
885 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
886 "Task function 'unknown' not found"
889 await
expect(pool
.execute()).rejects
.toThrowError(
890 new Error('Cannot execute a task on not started pool')
894 it('Verify that pool worker tasks usage are computed', async () => {
895 const pool
= new FixedClusterPool(
897 './tests/worker-files/cluster/testWorker.js'
899 const promises
= new Set()
900 const maxMultiplier
= 2
901 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
902 promises
.add(pool
.execute())
904 for (const workerNode
of pool
.workerNodes
) {
905 expect(workerNode
.usage
).toStrictEqual({
908 executing
: maxMultiplier
,
915 history
: expect
.any(CircularArray
)
918 history
: expect
.any(CircularArray
)
922 history
: expect
.any(CircularArray
)
925 history
: expect
.any(CircularArray
)
930 await Promise
.all(promises
)
931 for (const workerNode
of pool
.workerNodes
) {
932 expect(workerNode
.usage
).toStrictEqual({
934 executed
: maxMultiplier
,
942 history
: expect
.any(CircularArray
)
945 history
: expect
.any(CircularArray
)
949 history
: expect
.any(CircularArray
)
952 history
: expect
.any(CircularArray
)
960 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
961 const pool
= new DynamicThreadPool(
962 Math
.floor(numberOfWorkers
/ 2),
964 './tests/worker-files/thread/testWorker.js'
966 const promises
= new Set()
967 const maxMultiplier
= 2
968 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
969 promises
.add(pool
.execute())
971 await Promise
.all(promises
)
972 for (const workerNode
of pool
.workerNodes
) {
973 expect(workerNode
.usage
).toStrictEqual({
975 executed
: expect
.any(Number
),
983 history
: expect
.any(CircularArray
)
986 history
: expect
.any(CircularArray
)
990 history
: expect
.any(CircularArray
)
993 history
: expect
.any(CircularArray
)
997 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
998 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
999 numberOfWorkers
* maxMultiplier
1001 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1002 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1003 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1004 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1006 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
1007 for (const workerNode
of pool
.workerNodes
) {
1008 expect(workerNode
.usage
).toStrictEqual({
1018 history
: expect
.any(CircularArray
)
1021 history
: expect
.any(CircularArray
)
1025 history
: expect
.any(CircularArray
)
1028 history
: expect
.any(CircularArray
)
1032 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1033 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1034 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1035 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1037 await pool
.destroy()
1040 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1041 const pool
= new DynamicClusterPool(
1042 Math
.floor(numberOfWorkers
/ 2),
1044 './tests/worker-files/cluster/testWorker.js'
1048 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
1052 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1053 expect(poolReady
).toBe(1)
1054 expect(poolInfo
).toStrictEqual({
1056 type
: PoolTypes
.dynamic
,
1057 worker
: WorkerTypes
.cluster
,
1060 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1061 minSize
: expect
.any(Number
),
1062 maxSize
: expect
.any(Number
),
1063 workerNodes
: expect
.any(Number
),
1064 idleWorkerNodes
: expect
.any(Number
),
1065 busyWorkerNodes
: expect
.any(Number
),
1066 executedTasks
: expect
.any(Number
),
1067 executingTasks
: expect
.any(Number
),
1068 failedTasks
: expect
.any(Number
)
1070 await pool
.destroy()
1073 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1074 const pool
= new FixedThreadPool(
1076 './tests/worker-files/thread/testWorker.js'
1078 const promises
= new Set()
1081 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
1085 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1086 promises
.add(pool
.execute())
1088 await Promise
.all(promises
)
1089 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1090 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1091 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1092 expect(poolInfo
).toStrictEqual({
1094 type
: PoolTypes
.fixed
,
1095 worker
: WorkerTypes
.thread
,
1098 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1099 minSize
: expect
.any(Number
),
1100 maxSize
: expect
.any(Number
),
1101 workerNodes
: expect
.any(Number
),
1102 idleWorkerNodes
: expect
.any(Number
),
1103 busyWorkerNodes
: expect
.any(Number
),
1104 executedTasks
: expect
.any(Number
),
1105 executingTasks
: expect
.any(Number
),
1106 failedTasks
: expect
.any(Number
)
1108 await pool
.destroy()
1111 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1112 const pool
= new DynamicThreadPool(
1113 Math
.floor(numberOfWorkers
/ 2),
1115 './tests/worker-files/thread/testWorker.js'
1117 const promises
= new Set()
1120 pool
.emitter
.on(PoolEvents
.full
, info
=> {
1124 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1125 promises
.add(pool
.execute())
1127 await Promise
.all(promises
)
1128 expect(poolFull
).toBe(1)
1129 expect(poolInfo
).toStrictEqual({
1131 type
: PoolTypes
.dynamic
,
1132 worker
: WorkerTypes
.thread
,
1135 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1136 minSize
: expect
.any(Number
),
1137 maxSize
: expect
.any(Number
),
1138 workerNodes
: expect
.any(Number
),
1139 idleWorkerNodes
: expect
.any(Number
),
1140 busyWorkerNodes
: expect
.any(Number
),
1141 executedTasks
: expect
.any(Number
),
1142 executingTasks
: expect
.any(Number
),
1143 failedTasks
: expect
.any(Number
)
1145 await pool
.destroy()
1148 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1149 const pool
= new FixedThreadPool(
1151 './tests/worker-files/thread/testWorker.js',
1153 enableTasksQueue
: true
1156 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1157 const promises
= new Set()
1158 let poolBackPressure
= 0
1160 pool
.emitter
.on(PoolEvents
.backPressure
, info
=> {
1164 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1165 promises
.add(pool
.execute())
1167 await Promise
.all(promises
)
1168 expect(poolBackPressure
).toBe(1)
1169 expect(poolInfo
).toStrictEqual({
1171 type
: PoolTypes
.fixed
,
1172 worker
: WorkerTypes
.thread
,
1175 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1176 minSize
: expect
.any(Number
),
1177 maxSize
: expect
.any(Number
),
1178 workerNodes
: expect
.any(Number
),
1179 idleWorkerNodes
: expect
.any(Number
),
1180 busyWorkerNodes
: expect
.any(Number
),
1181 executedTasks
: expect
.any(Number
),
1182 executingTasks
: expect
.any(Number
),
1183 maxQueuedTasks
: expect
.any(Number
),
1184 queuedTasks
: expect
.any(Number
),
1186 stolenTasks
: expect
.any(Number
),
1187 failedTasks
: expect
.any(Number
)
1189 expect(pool
.hasBackPressure
.called
).toBe(true)
1190 await pool
.destroy()
1193 it('Verify that listTaskFunctions() is working', async () => {
1194 const dynamicThreadPool
= new DynamicThreadPool(
1195 Math
.floor(numberOfWorkers
/ 2),
1197 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1199 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1200 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1202 'jsonIntegerSerialization',
1206 const fixedClusterPool
= new FixedClusterPool(
1208 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1210 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1211 expect(fixedClusterPool
.listTaskFunctionNames()).toStrictEqual([
1213 'jsonIntegerSerialization',
1217 await dynamicThreadPool
.destroy()
1218 await fixedClusterPool
.destroy()
1221 it('Verify that multiple task functions worker is working', async () => {
1222 const pool
= new DynamicClusterPool(
1223 Math
.floor(numberOfWorkers
/ 2),
1225 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1227 const data
= { n
: 10 }
1228 const result0
= await pool
.execute(data
)
1229 expect(result0
).toStrictEqual({ ok
: 1 })
1230 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1231 expect(result1
).toStrictEqual({ ok
: 1 })
1232 const result2
= await pool
.execute(data
, 'factorial')
1233 expect(result2
).toBe(3628800)
1234 const result3
= await pool
.execute(data
, 'fibonacci')
1235 expect(result3
).toBe(55)
1236 expect(pool
.info
.executingTasks
).toBe(0)
1237 expect(pool
.info
.executedTasks
).toBe(4)
1238 for (const workerNode
of pool
.workerNodes
) {
1239 expect(workerNode
.info
.taskFunctionNames
).toStrictEqual([
1241 'jsonIntegerSerialization',
1245 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1246 for (const name
of pool
.listTaskFunctionNames()) {
1247 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1249 executed
: expect
.any(Number
),
1256 history
: expect
.any(CircularArray
)
1259 history
: expect
.any(CircularArray
)
1263 history
: expect
.any(CircularArray
)
1266 history
: expect
.any(CircularArray
)
1271 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executed
1272 ).toBeGreaterThan(0)
1275 workerNode
.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME
)
1277 workerNode
.getTaskFunctionWorkerUsage(
1278 workerNode
.info
.taskFunctionNames
[1]
1282 await pool
.destroy()
1285 it('Verify sendKillMessageToWorker()', async () => {
1286 const pool
= new DynamicClusterPool(
1287 Math
.floor(numberOfWorkers
/ 2),
1289 './tests/worker-files/cluster/testWorker.js'
1291 const workerNodeKey
= 0
1293 pool
.sendKillMessageToWorker(
1295 pool
.workerNodes
[workerNodeKey
].info
.id
1297 ).resolves
.toBeUndefined()
1298 await pool
.destroy()