1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: (e
) => console
.error(e
)
42 'Cannot start a pool from a worker with the same type as the pool'
47 it('Verify that filePath is checked', () => {
48 const expectedError
= new Error(
49 'Please specify a file with a worker implementation'
51 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
57 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
60 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
64 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
65 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
68 it('Verify that numberOfWorkers is checked', () => {
69 expect(() => new FixedThreadPool()).toThrowError(
71 'Cannot instantiate a pool without specifying the number of workers'
76 it('Verify that a negative number of workers is checked', () => {
79 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
82 'Cannot instantiate a pool with a negative number of workers'
87 it('Verify that a non integer number of workers is checked', () => {
90 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a pool with a non safe integer number of workers'
98 it('Verify that dynamic pool sizing is checked', () => {
101 new DynamicClusterPool(
104 './tests/worker-files/cluster/testWorker.js'
108 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
113 new DynamicThreadPool(
116 './tests/worker-files/thread/testWorker.js'
120 'Cannot instantiate a pool with a non safe integer number of workers'
125 new DynamicClusterPool(
128 './tests/worker-files/cluster/testWorker.js'
132 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
137 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
140 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 it('Verify that pool options are checked', async () => {
166 let pool
= new FixedThreadPool(
168 './tests/worker-files/thread/testWorker.js'
170 expect(pool
.emitter
).toBeDefined()
171 expect(pool
.opts
.enableEvents
).toBe(true)
172 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
173 expect(pool
.opts
.enableTasksQueue
).toBe(false)
174 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
175 expect(pool
.opts
.workerChoiceStrategy
).toBe(
176 WorkerChoiceStrategies
.ROUND_ROBIN
178 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
180 runTime
: { median
: false },
181 waitTime
: { median
: false },
182 elu
: { median
: false }
184 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
186 runTime
: { median
: false },
187 waitTime
: { median
: false },
188 elu
: { median
: false }
190 expect(pool
.opts
.messageHandler
).toBeUndefined()
191 expect(pool
.opts
.errorHandler
).toBeUndefined()
192 expect(pool
.opts
.onlineHandler
).toBeUndefined()
193 expect(pool
.opts
.exitHandler
).toBeUndefined()
195 const testHandler
= () => console
.info('test handler executed')
196 pool
= new FixedThreadPool(
198 './tests/worker-files/thread/testWorker.js',
200 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
201 workerChoiceStrategyOptions
: {
202 runTime
: { median
: true },
203 weights
: { 0: 300, 1: 200 }
206 restartWorkerOnError
: false,
207 enableTasksQueue
: true,
208 tasksQueueOptions
: { concurrency
: 2 },
209 messageHandler
: testHandler
,
210 errorHandler
: testHandler
,
211 onlineHandler
: testHandler
,
212 exitHandler
: testHandler
215 expect(pool
.emitter
).toBeUndefined()
216 expect(pool
.opts
.enableEvents
).toBe(false)
217 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
223 expect(pool
.opts
.workerChoiceStrategy
).toBe(
224 WorkerChoiceStrategies
.LEAST_USED
226 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
228 runTime
: { median
: true },
229 waitTime
: { median
: false },
230 elu
: { median
: false },
231 weights
: { 0: 300, 1: 200 }
233 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
235 runTime
: { median
: true },
236 waitTime
: { median
: false },
237 elu
: { median
: false },
238 weights
: { 0: 300, 1: 200 }
240 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
241 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
242 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
243 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
247 it('Verify that pool options are validated', async () => {
252 './tests/worker-files/thread/testWorker.js',
254 workerChoiceStrategy
: 'invalidStrategy'
258 new Error("Invalid worker choice strategy 'invalidStrategy'")
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategyOptions
: {
267 choiceRetries
: 'invalidChoiceRetries'
273 'Invalid worker choice strategy options: choice retries must be an integer'
280 './tests/worker-files/thread/testWorker.js',
282 workerChoiceStrategyOptions
: {
289 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
296 './tests/worker-files/thread/testWorker.js',
298 workerChoiceStrategyOptions
: { weights
: {} }
303 'Invalid worker choice strategy options: must have a weight for each worker node'
310 './tests/worker-files/thread/testWorker.js',
312 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
317 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
324 './tests/worker-files/thread/testWorker.js',
326 enableTasksQueue
: true,
327 tasksQueueOptions
: 'invalidTasksQueueOptions'
331 new TypeError('Invalid tasks queue options: must be a plain object')
337 './tests/worker-files/thread/testWorker.js',
339 enableTasksQueue
: true,
340 tasksQueueOptions
: { concurrency
: 0 }
345 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
352 './tests/worker-files/thread/testWorker.js',
354 enableTasksQueue
: true,
355 tasksQueueOptions
: { concurrency
: -1 }
360 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
367 './tests/worker-files/thread/testWorker.js',
369 enableTasksQueue
: true,
370 tasksQueueOptions
: { concurrency
: 0.2 }
374 new TypeError('Invalid worker node tasks concurrency: must be an integer')
380 './tests/worker-files/thread/testWorker.js',
382 enableTasksQueue
: true,
383 tasksQueueOptions
: { queueMaxSize
: 2 }
388 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
395 './tests/worker-files/thread/testWorker.js',
397 enableTasksQueue
: true,
398 tasksQueueOptions
: { size
: 0 }
403 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
410 './tests/worker-files/thread/testWorker.js',
412 enableTasksQueue
: true,
413 tasksQueueOptions
: { size
: -1 }
418 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
425 './tests/worker-files/thread/testWorker.js',
427 enableTasksQueue
: true,
428 tasksQueueOptions
: { size
: 0.2 }
432 new TypeError('Invalid worker node tasks queue size: must be an integer')
436 it('Verify that pool worker choice strategy options can be set', async () => {
437 const pool
= new FixedThreadPool(
439 './tests/worker-files/thread/testWorker.js',
440 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
442 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
444 runTime
: { median
: false },
445 waitTime
: { median
: false },
446 elu
: { median
: false }
448 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
450 runTime
: { median
: false },
451 waitTime
: { median
: false },
452 elu
: { median
: false }
454 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
455 .workerChoiceStrategies
) {
456 expect(workerChoiceStrategy
.opts
).toStrictEqual({
458 runTime
: { median
: false },
459 waitTime
: { median
: false },
460 elu
: { median
: false }
464 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
482 pool
.setWorkerChoiceStrategyOptions({
483 runTime
: { median
: true },
484 elu
: { median
: true }
486 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
488 runTime
: { median
: true },
489 waitTime
: { median
: false },
490 elu
: { median
: true }
492 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
494 runTime
: { median
: true },
495 waitTime
: { median
: false },
496 elu
: { median
: true }
498 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
499 .workerChoiceStrategies
) {
500 expect(workerChoiceStrategy
.opts
).toStrictEqual({
502 runTime
: { median
: true },
503 waitTime
: { median
: false },
504 elu
: { median
: true }
508 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
526 pool
.setWorkerChoiceStrategyOptions({
527 runTime
: { median
: false },
528 elu
: { median
: false }
530 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
532 runTime
: { median
: false },
533 waitTime
: { median
: false },
534 elu
: { median
: false }
536 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
538 runTime
: { median
: false },
539 waitTime
: { median
: false },
540 elu
: { median
: false }
542 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
543 .workerChoiceStrategies
) {
544 expect(workerChoiceStrategy
.opts
).toStrictEqual({
546 runTime
: { median
: false },
547 waitTime
: { median
: false },
548 elu
: { median
: false }
552 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
571 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
574 'Invalid worker choice strategy options: must be a plain object'
578 pool
.setWorkerChoiceStrategyOptions({
579 choiceRetries
: 'invalidChoiceRetries'
583 'Invalid worker choice strategy options: choice retries must be an integer'
587 pool
.setWorkerChoiceStrategyOptions({ choiceRetries
: -1 })
590 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
594 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
597 'Invalid worker choice strategy options: must have a weight for each worker node'
601 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
604 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
610 it('Verify that pool tasks queue can be enabled/disabled', async () => {
611 const pool
= new FixedThreadPool(
613 './tests/worker-files/thread/testWorker.js'
615 expect(pool
.opts
.enableTasksQueue
).toBe(false)
616 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
617 pool
.enableTasksQueue(true)
618 expect(pool
.opts
.enableTasksQueue
).toBe(true)
619 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
623 pool
.enableTasksQueue(true, { concurrency
: 2 })
624 expect(pool
.opts
.enableTasksQueue
).toBe(true)
625 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
629 pool
.enableTasksQueue(false)
630 expect(pool
.opts
.enableTasksQueue
).toBe(false)
631 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
635 it('Verify that pool tasks queue options can be set', async () => {
636 const pool
= new FixedThreadPool(
638 './tests/worker-files/thread/testWorker.js',
639 { enableTasksQueue
: true }
641 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
645 pool
.setTasksQueueOptions({ concurrency
: 2 })
646 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
651 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
653 new TypeError('Invalid tasks queue options: must be a plain object')
655 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
657 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
660 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
662 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
665 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
666 new TypeError('Invalid worker node tasks concurrency: must be an integer')
668 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
670 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
673 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
675 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
678 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
680 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
683 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
684 new TypeError('Invalid worker node tasks queue size: must be an integer')
689 it('Verify that pool info is set', async () => {
690 let pool
= new FixedThreadPool(
692 './tests/worker-files/thread/testWorker.js'
694 expect(pool
.info
).toStrictEqual({
696 type
: PoolTypes
.fixed
,
697 worker
: WorkerTypes
.thread
,
699 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
700 minSize
: numberOfWorkers
,
701 maxSize
: numberOfWorkers
,
702 workerNodes
: numberOfWorkers
,
703 idleWorkerNodes
: numberOfWorkers
,
710 pool
= new DynamicClusterPool(
711 Math
.floor(numberOfWorkers
/ 2),
713 './tests/worker-files/cluster/testWorker.js'
715 expect(pool
.info
).toStrictEqual({
717 type
: PoolTypes
.dynamic
,
718 worker
: WorkerTypes
.cluster
,
720 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
721 minSize
: Math
.floor(numberOfWorkers
/ 2),
722 maxSize
: numberOfWorkers
,
723 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
724 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
733 it('Verify that pool worker tasks usage are initialized', async () => {
734 const pool
= new FixedClusterPool(
736 './tests/worker-files/cluster/testWorker.js'
738 for (const workerNode
of pool
.workerNodes
) {
739 expect(workerNode
.usage
).toStrictEqual({
749 history
: expect
.any(CircularArray
)
752 history
: expect
.any(CircularArray
)
756 history
: expect
.any(CircularArray
)
759 history
: expect
.any(CircularArray
)
767 it('Verify that pool worker tasks queue are initialized', async () => {
768 let pool
= new FixedClusterPool(
770 './tests/worker-files/cluster/testWorker.js'
772 for (const workerNode
of pool
.workerNodes
) {
773 expect(workerNode
.tasksQueue
).toBeDefined()
774 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
775 expect(workerNode
.tasksQueue
.size
).toBe(0)
776 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
779 pool
= new DynamicThreadPool(
780 Math
.floor(numberOfWorkers
/ 2),
782 './tests/worker-files/thread/testWorker.js'
784 for (const workerNode
of pool
.workerNodes
) {
785 expect(workerNode
.tasksQueue
).toBeDefined()
786 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
787 expect(workerNode
.tasksQueue
.size
).toBe(0)
788 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
792 it('Verify that pool worker info are initialized', async () => {
793 let pool
= new FixedClusterPool(
795 './tests/worker-files/cluster/testWorker.js'
797 for (const workerNode
of pool
.workerNodes
) {
798 expect(workerNode
.info
).toStrictEqual({
799 id
: expect
.any(Number
),
800 type
: WorkerTypes
.cluster
,
806 pool
= new DynamicThreadPool(
807 Math
.floor(numberOfWorkers
/ 2),
809 './tests/worker-files/thread/testWorker.js'
811 for (const workerNode
of pool
.workerNodes
) {
812 expect(workerNode
.info
).toStrictEqual({
813 id
: expect
.any(Number
),
814 type
: WorkerTypes
.thread
,
821 it('Verify that pool execute() arguments are checked', async () => {
822 const pool
= new FixedClusterPool(
824 './tests/worker-files/cluster/testWorker.js'
826 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
827 new TypeError('name argument must be a string')
829 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
830 new TypeError('name argument must not be an empty string')
832 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
833 new TypeError('transferList argument must be an array')
835 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
836 "Task function 'unknown' not found"
839 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
840 new Error('Cannot execute a task on destroyed pool')
844 it('Verify that pool worker tasks usage are computed', async () => {
845 const pool
= new FixedClusterPool(
847 './tests/worker-files/cluster/testWorker.js'
849 const promises
= new Set()
850 const maxMultiplier
= 2
851 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
852 promises
.add(pool
.execute())
854 for (const workerNode
of pool
.workerNodes
) {
855 expect(workerNode
.usage
).toStrictEqual({
858 executing
: maxMultiplier
,
865 history
: expect
.any(CircularArray
)
868 history
: expect
.any(CircularArray
)
872 history
: expect
.any(CircularArray
)
875 history
: expect
.any(CircularArray
)
880 await Promise
.all(promises
)
881 for (const workerNode
of pool
.workerNodes
) {
882 expect(workerNode
.usage
).toStrictEqual({
884 executed
: maxMultiplier
,
892 history
: expect
.any(CircularArray
)
895 history
: expect
.any(CircularArray
)
899 history
: expect
.any(CircularArray
)
902 history
: expect
.any(CircularArray
)
910 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
911 const pool
= new DynamicThreadPool(
912 Math
.floor(numberOfWorkers
/ 2),
914 './tests/worker-files/thread/testWorker.js'
916 const promises
= new Set()
917 const maxMultiplier
= 2
918 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
919 promises
.add(pool
.execute())
921 await Promise
.all(promises
)
922 for (const workerNode
of pool
.workerNodes
) {
923 expect(workerNode
.usage
).toStrictEqual({
925 executed
: expect
.any(Number
),
933 history
: expect
.any(CircularArray
)
936 history
: expect
.any(CircularArray
)
940 history
: expect
.any(CircularArray
)
943 history
: expect
.any(CircularArray
)
947 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
948 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
949 numberOfWorkers
* maxMultiplier
951 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
952 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
953 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
954 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
956 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
957 for (const workerNode
of pool
.workerNodes
) {
958 expect(workerNode
.usage
).toStrictEqual({
968 history
: expect
.any(CircularArray
)
971 history
: expect
.any(CircularArray
)
975 history
: expect
.any(CircularArray
)
978 history
: expect
.any(CircularArray
)
982 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
983 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
984 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
985 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
990 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
991 const pool
= new DynamicClusterPool(
992 Math
.floor(numberOfWorkers
/ 2),
994 './tests/worker-files/cluster/testWorker.js'
998 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
1002 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1003 expect(poolReady
).toBe(1)
1004 expect(poolInfo
).toStrictEqual({
1006 type
: PoolTypes
.dynamic
,
1007 worker
: WorkerTypes
.cluster
,
1009 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1010 minSize
: expect
.any(Number
),
1011 maxSize
: expect
.any(Number
),
1012 workerNodes
: expect
.any(Number
),
1013 idleWorkerNodes
: expect
.any(Number
),
1014 busyWorkerNodes
: expect
.any(Number
),
1015 executedTasks
: expect
.any(Number
),
1016 executingTasks
: expect
.any(Number
),
1017 failedTasks
: expect
.any(Number
)
1019 await pool
.destroy()
1022 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1023 const pool
= new FixedThreadPool(
1025 './tests/worker-files/thread/testWorker.js'
1027 const promises
= new Set()
1030 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
1034 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1035 promises
.add(pool
.execute())
1037 await Promise
.all(promises
)
1038 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1039 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1040 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1041 expect(poolInfo
).toStrictEqual({
1043 type
: PoolTypes
.fixed
,
1044 worker
: WorkerTypes
.thread
,
1045 ready
: expect
.any(Boolean
),
1046 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1047 minSize
: expect
.any(Number
),
1048 maxSize
: expect
.any(Number
),
1049 workerNodes
: expect
.any(Number
),
1050 idleWorkerNodes
: expect
.any(Number
),
1051 busyWorkerNodes
: expect
.any(Number
),
1052 executedTasks
: expect
.any(Number
),
1053 executingTasks
: expect
.any(Number
),
1054 failedTasks
: expect
.any(Number
)
1056 await pool
.destroy()
1059 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1060 const pool
= new DynamicThreadPool(
1061 Math
.floor(numberOfWorkers
/ 2),
1063 './tests/worker-files/thread/testWorker.js'
1065 const promises
= new Set()
1068 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
1072 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1073 promises
.add(pool
.execute())
1075 await Promise
.all(promises
)
1076 expect(poolFull
).toBe(1)
1077 expect(poolInfo
).toStrictEqual({
1079 type
: PoolTypes
.dynamic
,
1080 worker
: WorkerTypes
.thread
,
1081 ready
: expect
.any(Boolean
),
1082 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1083 minSize
: expect
.any(Number
),
1084 maxSize
: expect
.any(Number
),
1085 workerNodes
: expect
.any(Number
),
1086 idleWorkerNodes
: expect
.any(Number
),
1087 busyWorkerNodes
: expect
.any(Number
),
1088 executedTasks
: expect
.any(Number
),
1089 executingTasks
: expect
.any(Number
),
1090 failedTasks
: expect
.any(Number
)
1092 await pool
.destroy()
1095 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1096 const pool
= new FixedThreadPool(
1098 './tests/worker-files/thread/testWorker.js',
1100 enableTasksQueue
: true
1103 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1104 const promises
= new Set()
1105 let poolBackPressure
= 0
1107 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1111 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1112 promises
.add(pool
.execute())
1114 await Promise
.all(promises
)
1115 expect(poolBackPressure
).toBe(1)
1116 expect(poolInfo
).toStrictEqual({
1118 type
: PoolTypes
.fixed
,
1119 worker
: WorkerTypes
.thread
,
1120 ready
: expect
.any(Boolean
),
1121 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1122 minSize
: expect
.any(Number
),
1123 maxSize
: expect
.any(Number
),
1124 workerNodes
: expect
.any(Number
),
1125 idleWorkerNodes
: expect
.any(Number
),
1126 busyWorkerNodes
: expect
.any(Number
),
1127 executedTasks
: expect
.any(Number
),
1128 executingTasks
: expect
.any(Number
),
1129 maxQueuedTasks
: expect
.any(Number
),
1130 queuedTasks
: expect
.any(Number
),
1132 stolenTasks
: expect
.any(Number
),
1133 failedTasks
: expect
.any(Number
)
1135 expect(pool
.hasBackPressure
.called
).toBe(true)
1136 await pool
.destroy()
1139 it('Verify that listTaskFunctions() is working', async () => {
1140 const dynamicThreadPool
= new DynamicThreadPool(
1141 Math
.floor(numberOfWorkers
/ 2),
1143 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1145 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1146 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1148 'jsonIntegerSerialization',
1152 const fixedClusterPool
= new FixedClusterPool(
1154 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1156 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1157 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1159 'jsonIntegerSerialization',
1165 it('Verify that multiple task functions worker is working', async () => {
1166 const pool
= new DynamicClusterPool(
1167 Math
.floor(numberOfWorkers
/ 2),
1169 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1171 const data
= { n
: 10 }
1172 const result0
= await pool
.execute(data
)
1173 expect(result0
).toStrictEqual({ ok
: 1 })
1174 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1175 expect(result1
).toStrictEqual({ ok
: 1 })
1176 const result2
= await pool
.execute(data
, 'factorial')
1177 expect(result2
).toBe(3628800)
1178 const result3
= await pool
.execute(data
, 'fibonacci')
1179 expect(result3
).toBe(55)
1180 expect(pool
.info
.executingTasks
).toBe(0)
1181 expect(pool
.info
.executedTasks
).toBe(4)
1182 for (const workerNode
of pool
.workerNodes
) {
1183 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1185 'jsonIntegerSerialization',
1189 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1190 for (const name
of pool
.listTaskFunctions()) {
1191 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1193 executed
: expect
.any(Number
),
1194 executing
: expect
.any(Number
),
1200 history
: expect
.any(CircularArray
)
1203 history
: expect
.any(CircularArray
)
1207 history
: expect
.any(CircularArray
)
1210 history
: expect
.any(CircularArray
)
1215 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1216 ).toBeGreaterThanOrEqual(0)