1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: (e
) => console
.error(e
)
42 'Cannot start a pool from a worker with the same type as the pool'
47 it('Verify that pool statuses properties are set', async () => {
48 const pool
= new FixedThreadPool(
50 './tests/worker-files/thread/testWorker.js'
52 expect(pool
.starting
).toBe(false)
53 expect(pool
.started
).toBe(true)
55 expect(pool
.started
).toBe(false)
58 it('Verify that filePath is checked', () => {
59 const expectedError
= new Error(
60 'Please specify a file with a worker implementation'
62 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
65 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
68 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
71 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
75 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
76 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
79 it('Verify that numberOfWorkers is checked', () => {
80 expect(() => new FixedThreadPool()).toThrowError(
82 'Cannot instantiate a pool without specifying the number of workers'
87 it('Verify that a negative number of workers is checked', () => {
90 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
93 'Cannot instantiate a pool with a negative number of workers'
98 it('Verify that a non integer number of workers is checked', () => {
101 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
104 'Cannot instantiate a pool with a non safe integer number of workers'
109 it('Verify that dynamic pool sizing is checked', () => {
112 new DynamicClusterPool(
115 './tests/worker-files/cluster/testWorker.js'
119 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 new DynamicThreadPool(
127 './tests/worker-files/thread/testWorker.js'
131 'Cannot instantiate a pool with a non safe integer number of workers'
136 new DynamicClusterPool(
139 './tests/worker-files/cluster/testWorker.js'
143 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
151 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
156 new DynamicClusterPool(
159 './tests/worker-files/cluster/testWorker.js'
163 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
168 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
176 it('Verify that pool options are checked', async () => {
177 let pool
= new FixedThreadPool(
179 './tests/worker-files/thread/testWorker.js'
181 expect(pool
.emitter
).toBeDefined()
182 expect(pool
.opts
.enableEvents
).toBe(true)
183 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
184 expect(pool
.opts
.enableTasksQueue
).toBe(false)
185 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
186 expect(pool
.opts
.workerChoiceStrategy
).toBe(
187 WorkerChoiceStrategies
.ROUND_ROBIN
189 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
191 runTime
: { median
: false },
192 waitTime
: { median
: false },
193 elu
: { median
: false }
195 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
197 runTime
: { median
: false },
198 waitTime
: { median
: false },
199 elu
: { median
: false }
201 expect(pool
.opts
.messageHandler
).toBeUndefined()
202 expect(pool
.opts
.errorHandler
).toBeUndefined()
203 expect(pool
.opts
.onlineHandler
).toBeUndefined()
204 expect(pool
.opts
.exitHandler
).toBeUndefined()
206 const testHandler
= () => console
.info('test handler executed')
207 pool
= new FixedThreadPool(
209 './tests/worker-files/thread/testWorker.js',
211 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
212 workerChoiceStrategyOptions
: {
213 runTime
: { median
: true },
214 weights
: { 0: 300, 1: 200 }
217 restartWorkerOnError
: false,
218 enableTasksQueue
: true,
219 tasksQueueOptions
: { concurrency
: 2 },
220 messageHandler
: testHandler
,
221 errorHandler
: testHandler
,
222 onlineHandler
: testHandler
,
223 exitHandler
: testHandler
226 expect(pool
.emitter
).toBeUndefined()
227 expect(pool
.opts
.enableEvents
).toBe(false)
228 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
229 expect(pool
.opts
.enableTasksQueue
).toBe(true)
230 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
234 expect(pool
.opts
.workerChoiceStrategy
).toBe(
235 WorkerChoiceStrategies
.LEAST_USED
237 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
239 runTime
: { median
: true },
240 waitTime
: { median
: false },
241 elu
: { median
: false },
242 weights
: { 0: 300, 1: 200 }
244 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
246 runTime
: { median
: true },
247 waitTime
: { median
: false },
248 elu
: { median
: false },
249 weights
: { 0: 300, 1: 200 }
251 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
252 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
253 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
254 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
258 it('Verify that pool options are validated', async () => {
263 './tests/worker-files/thread/testWorker.js',
265 workerChoiceStrategy
: 'invalidStrategy'
269 new Error("Invalid worker choice strategy 'invalidStrategy'")
275 './tests/worker-files/thread/testWorker.js',
277 workerChoiceStrategyOptions
: {
278 choiceRetries
: 'invalidChoiceRetries'
284 'Invalid worker choice strategy options: choice retries must be an integer'
291 './tests/worker-files/thread/testWorker.js',
293 workerChoiceStrategyOptions
: {
300 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
307 './tests/worker-files/thread/testWorker.js',
309 workerChoiceStrategyOptions
: { weights
: {} }
314 'Invalid worker choice strategy options: must have a weight for each worker node'
321 './tests/worker-files/thread/testWorker.js',
323 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
328 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
335 './tests/worker-files/thread/testWorker.js',
337 enableTasksQueue
: true,
338 tasksQueueOptions
: 'invalidTasksQueueOptions'
342 new TypeError('Invalid tasks queue options: must be a plain object')
348 './tests/worker-files/thread/testWorker.js',
350 enableTasksQueue
: true,
351 tasksQueueOptions
: { concurrency
: 0 }
356 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
363 './tests/worker-files/thread/testWorker.js',
365 enableTasksQueue
: true,
366 tasksQueueOptions
: { concurrency
: -1 }
371 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
378 './tests/worker-files/thread/testWorker.js',
380 enableTasksQueue
: true,
381 tasksQueueOptions
: { concurrency
: 0.2 }
385 new TypeError('Invalid worker node tasks concurrency: must be an integer')
391 './tests/worker-files/thread/testWorker.js',
393 enableTasksQueue
: true,
394 tasksQueueOptions
: { queueMaxSize
: 2 }
399 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
406 './tests/worker-files/thread/testWorker.js',
408 enableTasksQueue
: true,
409 tasksQueueOptions
: { size
: 0 }
414 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
421 './tests/worker-files/thread/testWorker.js',
423 enableTasksQueue
: true,
424 tasksQueueOptions
: { size
: -1 }
429 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
436 './tests/worker-files/thread/testWorker.js',
438 enableTasksQueue
: true,
439 tasksQueueOptions
: { size
: 0.2 }
443 new TypeError('Invalid worker node tasks queue size: must be an integer')
447 it('Verify that pool worker choice strategy options can be set', async () => {
448 const pool
= new FixedThreadPool(
450 './tests/worker-files/thread/testWorker.js',
451 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
453 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
455 runTime
: { median
: false },
456 waitTime
: { median
: false },
457 elu
: { median
: false }
459 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
461 runTime
: { median
: false },
462 waitTime
: { median
: false },
463 elu
: { median
: false }
465 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
466 .workerChoiceStrategies
) {
467 expect(workerChoiceStrategy
.opts
).toStrictEqual({
469 runTime
: { median
: false },
470 waitTime
: { median
: false },
471 elu
: { median
: false }
475 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
493 pool
.setWorkerChoiceStrategyOptions({
494 runTime
: { median
: true },
495 elu
: { median
: true }
497 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
499 runTime
: { median
: true },
500 waitTime
: { median
: false },
501 elu
: { median
: true }
503 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
505 runTime
: { median
: true },
506 waitTime
: { median
: false },
507 elu
: { median
: true }
509 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
510 .workerChoiceStrategies
) {
511 expect(workerChoiceStrategy
.opts
).toStrictEqual({
513 runTime
: { median
: true },
514 waitTime
: { median
: false },
515 elu
: { median
: true }
519 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
537 pool
.setWorkerChoiceStrategyOptions({
538 runTime
: { median
: false },
539 elu
: { median
: false }
541 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
543 runTime
: { median
: false },
544 waitTime
: { median
: false },
545 elu
: { median
: false }
547 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
549 runTime
: { median
: false },
550 waitTime
: { median
: false },
551 elu
: { median
: false }
553 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
554 .workerChoiceStrategies
) {
555 expect(workerChoiceStrategy
.opts
).toStrictEqual({
557 runTime
: { median
: false },
558 waitTime
: { median
: false },
559 elu
: { median
: false }
563 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
582 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
585 'Invalid worker choice strategy options: must be a plain object'
589 pool
.setWorkerChoiceStrategyOptions({
590 choiceRetries
: 'invalidChoiceRetries'
594 'Invalid worker choice strategy options: choice retries must be an integer'
598 pool
.setWorkerChoiceStrategyOptions({ choiceRetries
: -1 })
601 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
605 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
608 'Invalid worker choice strategy options: must have a weight for each worker node'
612 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
615 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
621 it('Verify that pool tasks queue can be enabled/disabled', async () => {
622 const pool
= new FixedThreadPool(
624 './tests/worker-files/thread/testWorker.js'
626 expect(pool
.opts
.enableTasksQueue
).toBe(false)
627 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
628 pool
.enableTasksQueue(true)
629 expect(pool
.opts
.enableTasksQueue
).toBe(true)
630 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
634 pool
.enableTasksQueue(true, { concurrency
: 2 })
635 expect(pool
.opts
.enableTasksQueue
).toBe(true)
636 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
640 pool
.enableTasksQueue(false)
641 expect(pool
.opts
.enableTasksQueue
).toBe(false)
642 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
646 it('Verify that pool tasks queue options can be set', async () => {
647 const pool
= new FixedThreadPool(
649 './tests/worker-files/thread/testWorker.js',
650 { enableTasksQueue
: true }
652 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
656 pool
.setTasksQueueOptions({ concurrency
: 2 })
657 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
662 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
664 new TypeError('Invalid tasks queue options: must be a plain object')
666 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
671 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
676 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
679 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
681 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
684 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
686 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
689 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
691 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
694 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
695 new TypeError('Invalid worker node tasks queue size: must be an integer')
700 it('Verify that pool info is set', async () => {
701 let pool
= new FixedThreadPool(
703 './tests/worker-files/thread/testWorker.js'
705 expect(pool
.info
).toStrictEqual({
707 type
: PoolTypes
.fixed
,
708 worker
: WorkerTypes
.thread
,
710 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
711 minSize
: numberOfWorkers
,
712 maxSize
: numberOfWorkers
,
713 workerNodes
: numberOfWorkers
,
714 idleWorkerNodes
: numberOfWorkers
,
721 pool
= new DynamicClusterPool(
722 Math
.floor(numberOfWorkers
/ 2),
724 './tests/worker-files/cluster/testWorker.js'
726 expect(pool
.info
).toStrictEqual({
728 type
: PoolTypes
.dynamic
,
729 worker
: WorkerTypes
.cluster
,
731 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
732 minSize
: Math
.floor(numberOfWorkers
/ 2),
733 maxSize
: numberOfWorkers
,
734 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
735 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
744 it('Verify that pool worker tasks usage are initialized', async () => {
745 const pool
= new FixedClusterPool(
747 './tests/worker-files/cluster/testWorker.js'
749 for (const workerNode
of pool
.workerNodes
) {
750 expect(workerNode
.usage
).toStrictEqual({
760 history
: expect
.any(CircularArray
)
763 history
: expect
.any(CircularArray
)
767 history
: expect
.any(CircularArray
)
770 history
: expect
.any(CircularArray
)
778 it('Verify that pool worker tasks queue are initialized', async () => {
779 let pool
= new FixedClusterPool(
781 './tests/worker-files/cluster/testWorker.js'
783 for (const workerNode
of pool
.workerNodes
) {
784 expect(workerNode
.tasksQueue
).toBeDefined()
785 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
786 expect(workerNode
.tasksQueue
.size
).toBe(0)
787 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
790 pool
= new DynamicThreadPool(
791 Math
.floor(numberOfWorkers
/ 2),
793 './tests/worker-files/thread/testWorker.js'
795 for (const workerNode
of pool
.workerNodes
) {
796 expect(workerNode
.tasksQueue
).toBeDefined()
797 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
798 expect(workerNode
.tasksQueue
.size
).toBe(0)
799 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
803 it('Verify that pool worker info are initialized', async () => {
804 let pool
= new FixedClusterPool(
806 './tests/worker-files/cluster/testWorker.js'
808 for (const workerNode
of pool
.workerNodes
) {
809 expect(workerNode
.info
).toStrictEqual({
810 id
: expect
.any(Number
),
811 type
: WorkerTypes
.cluster
,
817 pool
= new DynamicThreadPool(
818 Math
.floor(numberOfWorkers
/ 2),
820 './tests/worker-files/thread/testWorker.js'
822 for (const workerNode
of pool
.workerNodes
) {
823 expect(workerNode
.info
).toStrictEqual({
824 id
: expect
.any(Number
),
825 type
: WorkerTypes
.thread
,
832 it('Verify that pool execute() arguments are checked', async () => {
833 const pool
= new FixedClusterPool(
835 './tests/worker-files/cluster/testWorker.js'
837 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
838 new TypeError('name argument must be a string')
840 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
841 new TypeError('name argument must not be an empty string')
843 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
844 new TypeError('transferList argument must be an array')
846 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
847 "Task function 'unknown' not found"
850 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
851 new Error('Cannot execute a task on destroyed pool')
855 it('Verify that pool worker tasks usage are computed', async () => {
856 const pool
= new FixedClusterPool(
858 './tests/worker-files/cluster/testWorker.js'
860 const promises
= new Set()
861 const maxMultiplier
= 2
862 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
863 promises
.add(pool
.execute())
865 for (const workerNode
of pool
.workerNodes
) {
866 expect(workerNode
.usage
).toStrictEqual({
869 executing
: maxMultiplier
,
876 history
: expect
.any(CircularArray
)
879 history
: expect
.any(CircularArray
)
883 history
: expect
.any(CircularArray
)
886 history
: expect
.any(CircularArray
)
891 await Promise
.all(promises
)
892 for (const workerNode
of pool
.workerNodes
) {
893 expect(workerNode
.usage
).toStrictEqual({
895 executed
: maxMultiplier
,
903 history
: expect
.any(CircularArray
)
906 history
: expect
.any(CircularArray
)
910 history
: expect
.any(CircularArray
)
913 history
: expect
.any(CircularArray
)
921 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
922 const pool
= new DynamicThreadPool(
923 Math
.floor(numberOfWorkers
/ 2),
925 './tests/worker-files/thread/testWorker.js'
927 const promises
= new Set()
928 const maxMultiplier
= 2
929 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
930 promises
.add(pool
.execute())
932 await Promise
.all(promises
)
933 for (const workerNode
of pool
.workerNodes
) {
934 expect(workerNode
.usage
).toStrictEqual({
936 executed
: expect
.any(Number
),
944 history
: expect
.any(CircularArray
)
947 history
: expect
.any(CircularArray
)
951 history
: expect
.any(CircularArray
)
954 history
: expect
.any(CircularArray
)
958 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
959 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
960 numberOfWorkers
* maxMultiplier
962 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
963 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
964 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
965 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
967 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
968 for (const workerNode
of pool
.workerNodes
) {
969 expect(workerNode
.usage
).toStrictEqual({
979 history
: expect
.any(CircularArray
)
982 history
: expect
.any(CircularArray
)
986 history
: expect
.any(CircularArray
)
989 history
: expect
.any(CircularArray
)
993 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
994 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
995 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
996 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1001 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1002 const pool
= new DynamicClusterPool(
1003 Math
.floor(numberOfWorkers
/ 2),
1005 './tests/worker-files/cluster/testWorker.js'
1009 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
1013 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1014 expect(poolReady
).toBe(1)
1015 expect(poolInfo
).toStrictEqual({
1017 type
: PoolTypes
.dynamic
,
1018 worker
: WorkerTypes
.cluster
,
1020 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1021 minSize
: expect
.any(Number
),
1022 maxSize
: expect
.any(Number
),
1023 workerNodes
: expect
.any(Number
),
1024 idleWorkerNodes
: expect
.any(Number
),
1025 busyWorkerNodes
: expect
.any(Number
),
1026 executedTasks
: expect
.any(Number
),
1027 executingTasks
: expect
.any(Number
),
1028 failedTasks
: expect
.any(Number
)
1030 await pool
.destroy()
1033 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1034 const pool
= new FixedThreadPool(
1036 './tests/worker-files/thread/testWorker.js'
1038 const promises
= new Set()
1041 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
1045 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1046 promises
.add(pool
.execute())
1048 await Promise
.all(promises
)
1049 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1050 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1051 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1052 expect(poolInfo
).toStrictEqual({
1054 type
: PoolTypes
.fixed
,
1055 worker
: WorkerTypes
.thread
,
1056 ready
: expect
.any(Boolean
),
1057 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1058 minSize
: expect
.any(Number
),
1059 maxSize
: expect
.any(Number
),
1060 workerNodes
: expect
.any(Number
),
1061 idleWorkerNodes
: expect
.any(Number
),
1062 busyWorkerNodes
: expect
.any(Number
),
1063 executedTasks
: expect
.any(Number
),
1064 executingTasks
: expect
.any(Number
),
1065 failedTasks
: expect
.any(Number
)
1067 await pool
.destroy()
1070 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1071 const pool
= new DynamicThreadPool(
1072 Math
.floor(numberOfWorkers
/ 2),
1074 './tests/worker-files/thread/testWorker.js'
1076 const promises
= new Set()
1079 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
1083 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1084 promises
.add(pool
.execute())
1086 await Promise
.all(promises
)
1087 expect(poolFull
).toBe(1)
1088 expect(poolInfo
).toStrictEqual({
1090 type
: PoolTypes
.dynamic
,
1091 worker
: WorkerTypes
.thread
,
1092 ready
: expect
.any(Boolean
),
1093 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1094 minSize
: expect
.any(Number
),
1095 maxSize
: expect
.any(Number
),
1096 workerNodes
: expect
.any(Number
),
1097 idleWorkerNodes
: expect
.any(Number
),
1098 busyWorkerNodes
: expect
.any(Number
),
1099 executedTasks
: expect
.any(Number
),
1100 executingTasks
: expect
.any(Number
),
1101 failedTasks
: expect
.any(Number
)
1103 await pool
.destroy()
1106 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1107 const pool
= new FixedThreadPool(
1109 './tests/worker-files/thread/testWorker.js',
1111 enableTasksQueue
: true
1114 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1115 const promises
= new Set()
1116 let poolBackPressure
= 0
1118 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1122 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1123 promises
.add(pool
.execute())
1125 await Promise
.all(promises
)
1126 expect(poolBackPressure
).toBe(1)
1127 expect(poolInfo
).toStrictEqual({
1129 type
: PoolTypes
.fixed
,
1130 worker
: WorkerTypes
.thread
,
1131 ready
: expect
.any(Boolean
),
1132 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1133 minSize
: expect
.any(Number
),
1134 maxSize
: expect
.any(Number
),
1135 workerNodes
: expect
.any(Number
),
1136 idleWorkerNodes
: expect
.any(Number
),
1137 busyWorkerNodes
: expect
.any(Number
),
1138 executedTasks
: expect
.any(Number
),
1139 executingTasks
: expect
.any(Number
),
1140 maxQueuedTasks
: expect
.any(Number
),
1141 queuedTasks
: expect
.any(Number
),
1143 stolenTasks
: expect
.any(Number
),
1144 failedTasks
: expect
.any(Number
)
1146 expect(pool
.hasBackPressure
.called
).toBe(true)
1147 await pool
.destroy()
1150 it('Verify that listTaskFunctions() is working', async () => {
1151 const dynamicThreadPool
= new DynamicThreadPool(
1152 Math
.floor(numberOfWorkers
/ 2),
1154 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1156 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1157 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1159 'jsonIntegerSerialization',
1163 const fixedClusterPool
= new FixedClusterPool(
1165 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1167 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1168 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1170 'jsonIntegerSerialization',
1176 it('Verify that multiple task functions worker is working', async () => {
1177 const pool
= new DynamicClusterPool(
1178 Math
.floor(numberOfWorkers
/ 2),
1180 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1182 const data
= { n
: 10 }
1183 const result0
= await pool
.execute(data
)
1184 expect(result0
).toStrictEqual({ ok
: 1 })
1185 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1186 expect(result1
).toStrictEqual({ ok
: 1 })
1187 const result2
= await pool
.execute(data
, 'factorial')
1188 expect(result2
).toBe(3628800)
1189 const result3
= await pool
.execute(data
, 'fibonacci')
1190 expect(result3
).toBe(55)
1191 expect(pool
.info
.executingTasks
).toBe(0)
1192 expect(pool
.info
.executedTasks
).toBe(4)
1193 for (const workerNode
of pool
.workerNodes
) {
1194 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1196 'jsonIntegerSerialization',
1200 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1201 for (const name
of pool
.listTaskFunctions()) {
1202 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1204 executed
: expect
.any(Number
),
1205 executing
: expect
.any(Number
),
1211 history
: expect
.any(CircularArray
)
1214 history
: expect
.any(CircularArray
)
1218 history
: expect
.any(CircularArray
)
1221 history
: expect
.any(CircularArray
)
1226 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1227 ).toBeGreaterThanOrEqual(0)