1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: (e
) => console
.error(e
)
42 'Cannot start a pool from a worker with the same type as the pool'
47 it('Verify that filePath is checked', () => {
48 const expectedError
= new Error(
49 'Please specify a file with a worker implementation'
51 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
57 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
60 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
64 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
65 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
68 it('Verify that numberOfWorkers is checked', () => {
69 expect(() => new FixedThreadPool()).toThrowError(
71 'Cannot instantiate a pool without specifying the number of workers'
76 it('Verify that a negative number of workers is checked', () => {
79 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
82 'Cannot instantiate a pool with a negative number of workers'
87 it('Verify that a non integer number of workers is checked', () => {
90 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a pool with a non safe integer number of workers'
98 it('Verify that dynamic pool sizing is checked', () => {
101 new DynamicClusterPool(
104 './tests/worker-files/cluster/testWorker.js'
108 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
113 new DynamicThreadPool(
116 './tests/worker-files/thread/testWorker.js'
120 'Cannot instantiate a pool with a non safe integer number of workers'
125 new DynamicClusterPool(
128 './tests/worker-files/cluster/testWorker.js'
132 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
137 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
140 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 it('Verify that pool options are checked', async () => {
166 let pool
= new FixedThreadPool(
168 './tests/worker-files/thread/testWorker.js'
170 expect(pool
.emitter
).toBeDefined()
171 expect(pool
.opts
.enableEvents
).toBe(true)
172 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
173 expect(pool
.opts
.enableTasksQueue
).toBe(false)
174 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
175 expect(pool
.opts
.workerChoiceStrategy
).toBe(
176 WorkerChoiceStrategies
.ROUND_ROBIN
178 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
180 runTime
: { median
: false },
181 waitTime
: { median
: false },
182 elu
: { median
: false }
184 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
186 runTime
: { median
: false },
187 waitTime
: { median
: false },
188 elu
: { median
: false }
190 expect(pool
.opts
.messageHandler
).toBeUndefined()
191 expect(pool
.opts
.errorHandler
).toBeUndefined()
192 expect(pool
.opts
.onlineHandler
).toBeUndefined()
193 expect(pool
.opts
.exitHandler
).toBeUndefined()
195 const testHandler
= () => console
.info('test handler executed')
196 pool
= new FixedThreadPool(
198 './tests/worker-files/thread/testWorker.js',
200 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
201 workerChoiceStrategyOptions
: {
202 runTime
: { median
: true },
203 weights
: { 0: 300, 1: 200 }
206 restartWorkerOnError
: false,
207 enableTasksQueue
: true,
208 tasksQueueOptions
: { concurrency
: 2 },
209 messageHandler
: testHandler
,
210 errorHandler
: testHandler
,
211 onlineHandler
: testHandler
,
212 exitHandler
: testHandler
215 expect(pool
.emitter
).toBeUndefined()
216 expect(pool
.opts
.enableEvents
).toBe(false)
217 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
223 expect(pool
.opts
.workerChoiceStrategy
).toBe(
224 WorkerChoiceStrategies
.LEAST_USED
226 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
228 runTime
: { median
: true },
229 waitTime
: { median
: false },
230 elu
: { median
: false },
231 weights
: { 0: 300, 1: 200 }
233 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
235 runTime
: { median
: true },
236 waitTime
: { median
: false },
237 elu
: { median
: false },
238 weights
: { 0: 300, 1: 200 }
240 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
241 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
242 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
243 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
247 it('Verify that pool options are validated', async () => {
252 './tests/worker-files/thread/testWorker.js',
254 workerChoiceStrategy
: 'invalidStrategy'
258 new Error("Invalid worker choice strategy 'invalidStrategy'")
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategyOptions
: {
267 choiceRetries
: 'invalidChoiceRetries'
273 'Invalid worker choice strategy options: choice retries must be an integer'
280 './tests/worker-files/thread/testWorker.js',
282 workerChoiceStrategyOptions
: {
289 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
296 './tests/worker-files/thread/testWorker.js',
298 workerChoiceStrategyOptions
: { weights
: {} }
303 'Invalid worker choice strategy options: must have a weight for each worker node'
310 './tests/worker-files/thread/testWorker.js',
312 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
317 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
324 './tests/worker-files/thread/testWorker.js',
326 enableTasksQueue
: true,
327 tasksQueueOptions
: { concurrency
: 0 }
332 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
339 './tests/worker-files/thread/testWorker.js',
341 enableTasksQueue
: true,
342 tasksQueueOptions
: 'invalidTasksQueueOptions'
346 new TypeError('Invalid tasks queue options: must be a plain object')
352 './tests/worker-files/thread/testWorker.js',
354 enableTasksQueue
: true,
355 tasksQueueOptions
: { concurrency
: 0.2 }
359 new TypeError('Invalid worker node tasks concurrency: must be an integer')
363 it('Verify that pool worker choice strategy options can be set', async () => {
364 const pool
= new FixedThreadPool(
366 './tests/worker-files/thread/testWorker.js',
367 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
369 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
371 runTime
: { median
: false },
372 waitTime
: { median
: false },
373 elu
: { median
: false }
375 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
377 runTime
: { median
: false },
378 waitTime
: { median
: false },
379 elu
: { median
: false }
381 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
382 .workerChoiceStrategies
) {
383 expect(workerChoiceStrategy
.opts
).toStrictEqual({
385 runTime
: { median
: false },
386 waitTime
: { median
: false },
387 elu
: { median
: false }
391 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
409 pool
.setWorkerChoiceStrategyOptions({
410 runTime
: { median
: true },
411 elu
: { median
: true }
413 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
415 runTime
: { median
: true },
416 waitTime
: { median
: false },
417 elu
: { median
: true }
419 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
421 runTime
: { median
: true },
422 waitTime
: { median
: false },
423 elu
: { median
: true }
425 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
426 .workerChoiceStrategies
) {
427 expect(workerChoiceStrategy
.opts
).toStrictEqual({
429 runTime
: { median
: true },
430 waitTime
: { median
: false },
431 elu
: { median
: true }
435 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
453 pool
.setWorkerChoiceStrategyOptions({
454 runTime
: { median
: false },
455 elu
: { median
: false }
457 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
459 runTime
: { median
: false },
460 waitTime
: { median
: false },
461 elu
: { median
: false }
463 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
465 runTime
: { median
: false },
466 waitTime
: { median
: false },
467 elu
: { median
: false }
469 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
470 .workerChoiceStrategies
) {
471 expect(workerChoiceStrategy
.opts
).toStrictEqual({
473 runTime
: { median
: false },
474 waitTime
: { median
: false },
475 elu
: { median
: false }
479 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
498 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
501 'Invalid worker choice strategy options: must be a plain object'
505 pool
.setWorkerChoiceStrategyOptions({
506 choiceRetries
: 'invalidChoiceRetries'
510 'Invalid worker choice strategy options: choice retries must be an integer'
514 pool
.setWorkerChoiceStrategyOptions({ choiceRetries
: -1 })
517 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
521 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
524 'Invalid worker choice strategy options: must have a weight for each worker node'
528 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
531 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
537 it('Verify that pool tasks queue can be enabled/disabled', async () => {
538 const pool
= new FixedThreadPool(
540 './tests/worker-files/thread/testWorker.js'
542 expect(pool
.opts
.enableTasksQueue
).toBe(false)
543 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
544 pool
.enableTasksQueue(true)
545 expect(pool
.opts
.enableTasksQueue
).toBe(true)
546 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
550 pool
.enableTasksQueue(true, { concurrency
: 2 })
551 expect(pool
.opts
.enableTasksQueue
).toBe(true)
552 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
556 pool
.enableTasksQueue(false)
557 expect(pool
.opts
.enableTasksQueue
).toBe(false)
558 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
562 it('Verify that pool tasks queue options can be set', async () => {
563 const pool
= new FixedThreadPool(
565 './tests/worker-files/thread/testWorker.js',
566 { enableTasksQueue
: true }
568 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
572 pool
.setTasksQueueOptions({ concurrency
: 2 })
573 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
578 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
580 new TypeError('Invalid tasks queue options: must be a plain object')
582 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
584 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
587 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
589 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
592 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
593 new TypeError('Invalid worker node tasks concurrency: must be an integer')
595 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
597 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
600 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
602 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
605 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
607 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
610 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
611 new TypeError('Invalid worker node tasks queue size: must be an integer')
616 it('Verify that pool info is set', async () => {
617 let pool
= new FixedThreadPool(
619 './tests/worker-files/thread/testWorker.js'
621 expect(pool
.info
).toStrictEqual({
623 type
: PoolTypes
.fixed
,
624 worker
: WorkerTypes
.thread
,
626 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
627 minSize
: numberOfWorkers
,
628 maxSize
: numberOfWorkers
,
629 workerNodes
: numberOfWorkers
,
630 idleWorkerNodes
: numberOfWorkers
,
637 pool
= new DynamicClusterPool(
638 Math
.floor(numberOfWorkers
/ 2),
640 './tests/worker-files/cluster/testWorker.js'
642 expect(pool
.info
).toStrictEqual({
644 type
: PoolTypes
.dynamic
,
645 worker
: WorkerTypes
.cluster
,
647 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
648 minSize
: Math
.floor(numberOfWorkers
/ 2),
649 maxSize
: numberOfWorkers
,
650 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
651 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
660 it('Verify that pool worker tasks usage are initialized', async () => {
661 const pool
= new FixedClusterPool(
663 './tests/worker-files/cluster/testWorker.js'
665 for (const workerNode
of pool
.workerNodes
) {
666 expect(workerNode
.usage
).toStrictEqual({
676 history
: expect
.any(CircularArray
)
679 history
: expect
.any(CircularArray
)
683 history
: expect
.any(CircularArray
)
686 history
: expect
.any(CircularArray
)
694 it('Verify that pool worker tasks queue are initialized', async () => {
695 let pool
= new FixedClusterPool(
697 './tests/worker-files/cluster/testWorker.js'
699 for (const workerNode
of pool
.workerNodes
) {
700 expect(workerNode
.tasksQueue
).toBeDefined()
701 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
702 expect(workerNode
.tasksQueue
.size
).toBe(0)
703 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
706 pool
= new DynamicThreadPool(
707 Math
.floor(numberOfWorkers
/ 2),
709 './tests/worker-files/thread/testWorker.js'
711 for (const workerNode
of pool
.workerNodes
) {
712 expect(workerNode
.tasksQueue
).toBeDefined()
713 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
714 expect(workerNode
.tasksQueue
.size
).toBe(0)
715 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
719 it('Verify that pool worker info are initialized', async () => {
720 let pool
= new FixedClusterPool(
722 './tests/worker-files/cluster/testWorker.js'
724 for (const workerNode
of pool
.workerNodes
) {
725 expect(workerNode
.info
).toStrictEqual({
726 id
: expect
.any(Number
),
727 type
: WorkerTypes
.cluster
,
733 pool
= new DynamicThreadPool(
734 Math
.floor(numberOfWorkers
/ 2),
736 './tests/worker-files/thread/testWorker.js'
738 for (const workerNode
of pool
.workerNodes
) {
739 expect(workerNode
.info
).toStrictEqual({
740 id
: expect
.any(Number
),
741 type
: WorkerTypes
.thread
,
748 it('Verify that pool execute() arguments are checked', async () => {
749 const pool
= new FixedClusterPool(
751 './tests/worker-files/cluster/testWorker.js'
753 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
754 new TypeError('name argument must be a string')
756 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
757 new TypeError('name argument must not be an empty string')
759 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
760 new TypeError('transferList argument must be an array')
762 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
763 "Task function 'unknown' not found"
766 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
767 new Error('Cannot execute a task on destroyed pool')
771 it('Verify that pool worker tasks usage are computed', async () => {
772 const pool
= new FixedClusterPool(
774 './tests/worker-files/cluster/testWorker.js'
776 const promises
= new Set()
777 const maxMultiplier
= 2
778 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
779 promises
.add(pool
.execute())
781 for (const workerNode
of pool
.workerNodes
) {
782 expect(workerNode
.usage
).toStrictEqual({
785 executing
: maxMultiplier
,
792 history
: expect
.any(CircularArray
)
795 history
: expect
.any(CircularArray
)
799 history
: expect
.any(CircularArray
)
802 history
: expect
.any(CircularArray
)
807 await Promise
.all(promises
)
808 for (const workerNode
of pool
.workerNodes
) {
809 expect(workerNode
.usage
).toStrictEqual({
811 executed
: maxMultiplier
,
819 history
: expect
.any(CircularArray
)
822 history
: expect
.any(CircularArray
)
826 history
: expect
.any(CircularArray
)
829 history
: expect
.any(CircularArray
)
837 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
838 const pool
= new DynamicThreadPool(
839 Math
.floor(numberOfWorkers
/ 2),
841 './tests/worker-files/thread/testWorker.js'
843 const promises
= new Set()
844 const maxMultiplier
= 2
845 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
846 promises
.add(pool
.execute())
848 await Promise
.all(promises
)
849 for (const workerNode
of pool
.workerNodes
) {
850 expect(workerNode
.usage
).toStrictEqual({
852 executed
: expect
.any(Number
),
860 history
: expect
.any(CircularArray
)
863 history
: expect
.any(CircularArray
)
867 history
: expect
.any(CircularArray
)
870 history
: expect
.any(CircularArray
)
874 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
875 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
876 numberOfWorkers
* maxMultiplier
878 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
879 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
880 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
881 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
883 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
884 for (const workerNode
of pool
.workerNodes
) {
885 expect(workerNode
.usage
).toStrictEqual({
895 history
: expect
.any(CircularArray
)
898 history
: expect
.any(CircularArray
)
902 history
: expect
.any(CircularArray
)
905 history
: expect
.any(CircularArray
)
909 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
910 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
911 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
912 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
917 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
918 const pool
= new DynamicClusterPool(
919 Math
.floor(numberOfWorkers
/ 2),
921 './tests/worker-files/cluster/testWorker.js'
925 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
929 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
930 expect(poolReady
).toBe(1)
931 expect(poolInfo
).toStrictEqual({
933 type
: PoolTypes
.dynamic
,
934 worker
: WorkerTypes
.cluster
,
936 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
937 minSize
: expect
.any(Number
),
938 maxSize
: expect
.any(Number
),
939 workerNodes
: expect
.any(Number
),
940 idleWorkerNodes
: expect
.any(Number
),
941 busyWorkerNodes
: expect
.any(Number
),
942 executedTasks
: expect
.any(Number
),
943 executingTasks
: expect
.any(Number
),
944 failedTasks
: expect
.any(Number
)
949 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
950 const pool
= new FixedThreadPool(
952 './tests/worker-files/thread/testWorker.js'
954 const promises
= new Set()
957 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
961 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
962 promises
.add(pool
.execute())
964 await Promise
.all(promises
)
965 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
966 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
967 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
968 expect(poolInfo
).toStrictEqual({
970 type
: PoolTypes
.fixed
,
971 worker
: WorkerTypes
.thread
,
972 ready
: expect
.any(Boolean
),
973 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
974 minSize
: expect
.any(Number
),
975 maxSize
: expect
.any(Number
),
976 workerNodes
: expect
.any(Number
),
977 idleWorkerNodes
: expect
.any(Number
),
978 busyWorkerNodes
: expect
.any(Number
),
979 executedTasks
: expect
.any(Number
),
980 executingTasks
: expect
.any(Number
),
981 failedTasks
: expect
.any(Number
)
986 it("Verify that pool event emitter 'full' event can register a callback", async () => {
987 const pool
= new DynamicThreadPool(
988 Math
.floor(numberOfWorkers
/ 2),
990 './tests/worker-files/thread/testWorker.js'
992 const promises
= new Set()
995 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
999 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1000 promises
.add(pool
.execute())
1002 await Promise
.all(promises
)
1003 expect(poolFull
).toBe(1)
1004 expect(poolInfo
).toStrictEqual({
1006 type
: PoolTypes
.dynamic
,
1007 worker
: WorkerTypes
.thread
,
1008 ready
: expect
.any(Boolean
),
1009 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1010 minSize
: expect
.any(Number
),
1011 maxSize
: expect
.any(Number
),
1012 workerNodes
: expect
.any(Number
),
1013 idleWorkerNodes
: expect
.any(Number
),
1014 busyWorkerNodes
: expect
.any(Number
),
1015 executedTasks
: expect
.any(Number
),
1016 executingTasks
: expect
.any(Number
),
1017 failedTasks
: expect
.any(Number
)
1019 await pool
.destroy()
1022 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1023 const pool
= new FixedThreadPool(
1025 './tests/worker-files/thread/testWorker.js',
1027 enableTasksQueue
: true
1030 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1031 const promises
= new Set()
1032 let poolBackPressure
= 0
1034 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1038 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1039 promises
.add(pool
.execute())
1041 await Promise
.all(promises
)
1042 expect(poolBackPressure
).toBe(1)
1043 expect(poolInfo
).toStrictEqual({
1045 type
: PoolTypes
.fixed
,
1046 worker
: WorkerTypes
.thread
,
1047 ready
: expect
.any(Boolean
),
1048 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1049 minSize
: expect
.any(Number
),
1050 maxSize
: expect
.any(Number
),
1051 workerNodes
: expect
.any(Number
),
1052 idleWorkerNodes
: expect
.any(Number
),
1053 busyWorkerNodes
: expect
.any(Number
),
1054 executedTasks
: expect
.any(Number
),
1055 executingTasks
: expect
.any(Number
),
1056 maxQueuedTasks
: expect
.any(Number
),
1057 queuedTasks
: expect
.any(Number
),
1059 stolenTasks
: expect
.any(Number
),
1060 failedTasks
: expect
.any(Number
)
1062 expect(pool
.hasBackPressure
.called
).toBe(true)
1063 await pool
.destroy()
1066 it('Verify that listTaskFunctions() is working', async () => {
1067 const dynamicThreadPool
= new DynamicThreadPool(
1068 Math
.floor(numberOfWorkers
/ 2),
1070 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1072 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1073 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1075 'jsonIntegerSerialization',
1079 const fixedClusterPool
= new FixedClusterPool(
1081 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1083 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1084 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1086 'jsonIntegerSerialization',
1092 it('Verify that multiple task functions worker is working', async () => {
1093 const pool
= new DynamicClusterPool(
1094 Math
.floor(numberOfWorkers
/ 2),
1096 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1098 const data
= { n
: 10 }
1099 const result0
= await pool
.execute(data
)
1100 expect(result0
).toStrictEqual({ ok
: 1 })
1101 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1102 expect(result1
).toStrictEqual({ ok
: 1 })
1103 const result2
= await pool
.execute(data
, 'factorial')
1104 expect(result2
).toBe(3628800)
1105 const result3
= await pool
.execute(data
, 'fibonacci')
1106 expect(result3
).toBe(55)
1107 expect(pool
.info
.executingTasks
).toBe(0)
1108 expect(pool
.info
.executedTasks
).toBe(4)
1109 for (const workerNode
of pool
.workerNodes
) {
1110 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1112 'jsonIntegerSerialization',
1116 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1117 for (const name
of pool
.listTaskFunctions()) {
1118 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1120 executed
: expect
.any(Number
),
1121 executing
: expect
.any(Number
),
1127 history
: expect
.any(CircularArray
)
1130 history
: expect
.any(CircularArray
)
1134 history
: expect
.any(CircularArray
)
1137 history
: expect
.any(CircularArray
)
1142 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1143 ).toBeGreaterThanOrEqual(0)