1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: (e
) => console
.error(e
)
42 'Cannot start a pool from a worker with the same type as the pool'
47 it('Verify that filePath is checked', () => {
48 const expectedError
= new Error(
49 'Please specify a file with a worker implementation'
51 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
57 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
60 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
64 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
65 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
68 it('Verify that numberOfWorkers is checked', () => {
69 expect(() => new FixedThreadPool()).toThrowError(
71 'Cannot instantiate a pool without specifying the number of workers'
76 it('Verify that a negative number of workers is checked', () => {
79 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
82 'Cannot instantiate a pool with a negative number of workers'
87 it('Verify that a non integer number of workers is checked', () => {
90 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a pool with a non safe integer number of workers'
98 it('Verify that dynamic pool sizing is checked', () => {
101 new DynamicClusterPool(
104 './tests/worker-files/cluster/testWorker.js'
108 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
113 new DynamicThreadPool(
116 './tests/worker-files/thread/testWorker.js'
120 'Cannot instantiate a pool with a non safe integer number of workers'
125 new DynamicClusterPool(
128 './tests/worker-files/cluster/testWorker.js'
132 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
137 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
140 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 it('Verify that pool options are checked', async () => {
166 let pool
= new FixedThreadPool(
168 './tests/worker-files/thread/testWorker.js'
170 expect(pool
.emitter
).toBeDefined()
171 expect(pool
.opts
.enableEvents
).toBe(true)
172 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
173 expect(pool
.opts
.enableTasksQueue
).toBe(false)
174 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
175 expect(pool
.opts
.workerChoiceStrategy
).toBe(
176 WorkerChoiceStrategies
.ROUND_ROBIN
178 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
180 runTime
: { median
: false },
181 waitTime
: { median
: false },
182 elu
: { median
: false }
184 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
186 runTime
: { median
: false },
187 waitTime
: { median
: false },
188 elu
: { median
: false }
190 expect(pool
.opts
.messageHandler
).toBeUndefined()
191 expect(pool
.opts
.errorHandler
).toBeUndefined()
192 expect(pool
.opts
.onlineHandler
).toBeUndefined()
193 expect(pool
.opts
.exitHandler
).toBeUndefined()
195 const testHandler
= () => console
.info('test handler executed')
196 pool
= new FixedThreadPool(
198 './tests/worker-files/thread/testWorker.js',
200 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
201 workerChoiceStrategyOptions
: {
202 runTime
: { median
: true },
203 weights
: { 0: 300, 1: 200 }
206 restartWorkerOnError
: false,
207 enableTasksQueue
: true,
208 tasksQueueOptions
: { concurrency
: 2 },
209 messageHandler
: testHandler
,
210 errorHandler
: testHandler
,
211 onlineHandler
: testHandler
,
212 exitHandler
: testHandler
215 expect(pool
.emitter
).toBeUndefined()
216 expect(pool
.opts
.enableEvents
).toBe(false)
217 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
223 expect(pool
.opts
.workerChoiceStrategy
).toBe(
224 WorkerChoiceStrategies
.LEAST_USED
226 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
228 runTime
: { median
: true },
229 waitTime
: { median
: false },
230 elu
: { median
: false },
231 weights
: { 0: 300, 1: 200 }
233 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
235 runTime
: { median
: true },
236 waitTime
: { median
: false },
237 elu
: { median
: false },
238 weights
: { 0: 300, 1: 200 }
240 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
241 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
242 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
243 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
247 it('Verify that pool options are validated', async () => {
252 './tests/worker-files/thread/testWorker.js',
254 workerChoiceStrategy
: 'invalidStrategy'
258 new Error("Invalid worker choice strategy 'invalidStrategy'")
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategyOptions
: { weights
: {} }
271 'Invalid worker choice strategy options: must have a weight for each worker node'
278 './tests/worker-files/thread/testWorker.js',
280 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
285 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
292 './tests/worker-files/thread/testWorker.js',
294 enableTasksQueue
: true,
295 tasksQueueOptions
: { concurrency
: 0 }
300 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
307 './tests/worker-files/thread/testWorker.js',
309 enableTasksQueue
: true,
310 tasksQueueOptions
: 'invalidTasksQueueOptions'
314 new TypeError('Invalid tasks queue options: must be a plain object')
320 './tests/worker-files/thread/testWorker.js',
322 enableTasksQueue
: true,
323 tasksQueueOptions
: { concurrency
: 0.2 }
327 new TypeError('Invalid worker node tasks concurrency: must be an integer')
331 it('Verify that pool worker choice strategy options can be set', async () => {
332 const pool
= new FixedThreadPool(
334 './tests/worker-files/thread/testWorker.js',
335 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
337 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
339 runTime
: { median
: false },
340 waitTime
: { median
: false },
341 elu
: { median
: false }
343 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
345 runTime
: { median
: false },
346 waitTime
: { median
: false },
347 elu
: { median
: false }
349 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
350 .workerChoiceStrategies
) {
351 expect(workerChoiceStrategy
.opts
).toStrictEqual({
353 runTime
: { median
: false },
354 waitTime
: { median
: false },
355 elu
: { median
: false }
359 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
377 pool
.setWorkerChoiceStrategyOptions({
378 runTime
: { median
: true },
379 elu
: { median
: true }
381 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
383 runTime
: { median
: true },
384 waitTime
: { median
: false },
385 elu
: { median
: true }
387 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
389 runTime
: { median
: true },
390 waitTime
: { median
: false },
391 elu
: { median
: true }
393 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
394 .workerChoiceStrategies
) {
395 expect(workerChoiceStrategy
.opts
).toStrictEqual({
397 runTime
: { median
: true },
398 waitTime
: { median
: false },
399 elu
: { median
: true }
403 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
421 pool
.setWorkerChoiceStrategyOptions({
422 runTime
: { median
: false },
423 elu
: { median
: false }
425 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
427 runTime
: { median
: false },
428 waitTime
: { median
: false },
429 elu
: { median
: false }
431 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
433 runTime
: { median
: false },
434 waitTime
: { median
: false },
435 elu
: { median
: false }
437 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
438 .workerChoiceStrategies
) {
439 expect(workerChoiceStrategy
.opts
).toStrictEqual({
441 runTime
: { median
: false },
442 waitTime
: { median
: false },
443 elu
: { median
: false }
447 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
466 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
469 'Invalid worker choice strategy options: must be a plain object'
473 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
476 'Invalid worker choice strategy options: must have a weight for each worker node'
480 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
483 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
489 it('Verify that pool tasks queue can be enabled/disabled', async () => {
490 const pool
= new FixedThreadPool(
492 './tests/worker-files/thread/testWorker.js'
494 expect(pool
.opts
.enableTasksQueue
).toBe(false)
495 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
496 pool
.enableTasksQueue(true)
497 expect(pool
.opts
.enableTasksQueue
).toBe(true)
498 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
502 pool
.enableTasksQueue(true, { concurrency
: 2 })
503 expect(pool
.opts
.enableTasksQueue
).toBe(true)
504 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
508 pool
.enableTasksQueue(false)
509 expect(pool
.opts
.enableTasksQueue
).toBe(false)
510 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
514 it('Verify that pool tasks queue options can be set', async () => {
515 const pool
= new FixedThreadPool(
517 './tests/worker-files/thread/testWorker.js',
518 { enableTasksQueue
: true }
520 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
524 pool
.setTasksQueueOptions({ concurrency
: 2 })
525 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
530 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
532 new TypeError('Invalid tasks queue options: must be a plain object')
534 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
536 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
539 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
541 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
544 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
545 new TypeError('Invalid worker node tasks concurrency: must be an integer')
547 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
549 'Invalid worker node tasks queue max size: 0 is a negative integer or zero'
552 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
554 'Invalid worker node tasks queue max size: -1 is a negative integer or zero'
557 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
559 'Invalid worker node tasks queue max size: must be an integer'
565 it('Verify that pool info is set', async () => {
566 let pool
= new FixedThreadPool(
568 './tests/worker-files/thread/testWorker.js'
570 expect(pool
.info
).toStrictEqual({
572 type
: PoolTypes
.fixed
,
573 worker
: WorkerTypes
.thread
,
575 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
576 minSize
: numberOfWorkers
,
577 maxSize
: numberOfWorkers
,
578 workerNodes
: numberOfWorkers
,
579 idleWorkerNodes
: numberOfWorkers
,
586 pool
= new DynamicClusterPool(
587 Math
.floor(numberOfWorkers
/ 2),
589 './tests/worker-files/cluster/testWorker.js'
591 expect(pool
.info
).toStrictEqual({
593 type
: PoolTypes
.dynamic
,
594 worker
: WorkerTypes
.cluster
,
596 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
597 minSize
: Math
.floor(numberOfWorkers
/ 2),
598 maxSize
: numberOfWorkers
,
599 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
600 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
609 it('Verify that pool worker tasks usage are initialized', async () => {
610 const pool
= new FixedClusterPool(
612 './tests/worker-files/cluster/testWorker.js'
614 for (const workerNode
of pool
.workerNodes
) {
615 expect(workerNode
.usage
).toStrictEqual({
625 history
: expect
.any(CircularArray
)
628 history
: expect
.any(CircularArray
)
632 history
: expect
.any(CircularArray
)
635 history
: expect
.any(CircularArray
)
643 it('Verify that pool worker tasks queue are initialized', async () => {
644 let pool
= new FixedClusterPool(
646 './tests/worker-files/cluster/testWorker.js'
648 for (const workerNode
of pool
.workerNodes
) {
649 expect(workerNode
.tasksQueue
).toBeDefined()
650 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
651 expect(workerNode
.tasksQueue
.size
).toBe(0)
652 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
655 pool
= new DynamicThreadPool(
656 Math
.floor(numberOfWorkers
/ 2),
658 './tests/worker-files/thread/testWorker.js'
660 for (const workerNode
of pool
.workerNodes
) {
661 expect(workerNode
.tasksQueue
).toBeDefined()
662 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
663 expect(workerNode
.tasksQueue
.size
).toBe(0)
664 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
668 it('Verify that pool worker info are initialized', async () => {
669 let pool
= new FixedClusterPool(
671 './tests/worker-files/cluster/testWorker.js'
673 for (const workerNode
of pool
.workerNodes
) {
674 expect(workerNode
.info
).toStrictEqual({
675 id
: expect
.any(Number
),
676 type
: WorkerTypes
.cluster
,
682 pool
= new DynamicThreadPool(
683 Math
.floor(numberOfWorkers
/ 2),
685 './tests/worker-files/thread/testWorker.js'
687 for (const workerNode
of pool
.workerNodes
) {
688 expect(workerNode
.info
).toStrictEqual({
689 id
: expect
.any(Number
),
690 type
: WorkerTypes
.thread
,
697 it('Verify that pool worker tasks usage are computed', async () => {
698 const pool
= new FixedClusterPool(
700 './tests/worker-files/cluster/testWorker.js'
702 const promises
= new Set()
703 const maxMultiplier
= 2
704 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
705 promises
.add(pool
.execute())
707 for (const workerNode
of pool
.workerNodes
) {
708 expect(workerNode
.usage
).toStrictEqual({
711 executing
: maxMultiplier
,
718 history
: expect
.any(CircularArray
)
721 history
: expect
.any(CircularArray
)
725 history
: expect
.any(CircularArray
)
728 history
: expect
.any(CircularArray
)
733 await Promise
.all(promises
)
734 for (const workerNode
of pool
.workerNodes
) {
735 expect(workerNode
.usage
).toStrictEqual({
737 executed
: maxMultiplier
,
745 history
: expect
.any(CircularArray
)
748 history
: expect
.any(CircularArray
)
752 history
: expect
.any(CircularArray
)
755 history
: expect
.any(CircularArray
)
763 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
764 const pool
= new DynamicThreadPool(
765 Math
.floor(numberOfWorkers
/ 2),
767 './tests/worker-files/thread/testWorker.js'
769 const promises
= new Set()
770 const maxMultiplier
= 2
771 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
772 promises
.add(pool
.execute())
774 await Promise
.all(promises
)
775 for (const workerNode
of pool
.workerNodes
) {
776 expect(workerNode
.usage
).toStrictEqual({
778 executed
: expect
.any(Number
),
786 history
: expect
.any(CircularArray
)
789 history
: expect
.any(CircularArray
)
793 history
: expect
.any(CircularArray
)
796 history
: expect
.any(CircularArray
)
800 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
801 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
802 numberOfWorkers
* maxMultiplier
804 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
805 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
806 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
807 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
809 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
810 for (const workerNode
of pool
.workerNodes
) {
811 expect(workerNode
.usage
).toStrictEqual({
821 history
: expect
.any(CircularArray
)
824 history
: expect
.any(CircularArray
)
828 history
: expect
.any(CircularArray
)
831 history
: expect
.any(CircularArray
)
835 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
836 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
837 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
838 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
843 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
844 const pool
= new DynamicClusterPool(
845 Math
.floor(numberOfWorkers
/ 2),
847 './tests/worker-files/cluster/testWorker.js'
851 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
855 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
856 expect(poolReady
).toBe(1)
857 expect(poolInfo
).toStrictEqual({
859 type
: PoolTypes
.dynamic
,
860 worker
: WorkerTypes
.cluster
,
862 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
863 minSize
: expect
.any(Number
),
864 maxSize
: expect
.any(Number
),
865 workerNodes
: expect
.any(Number
),
866 idleWorkerNodes
: expect
.any(Number
),
867 busyWorkerNodes
: expect
.any(Number
),
868 executedTasks
: expect
.any(Number
),
869 executingTasks
: expect
.any(Number
),
870 failedTasks
: expect
.any(Number
)
875 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
876 const pool
= new FixedThreadPool(
878 './tests/worker-files/thread/testWorker.js'
880 const promises
= new Set()
883 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
887 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
888 promises
.add(pool
.execute())
890 await Promise
.all(promises
)
891 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
892 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
893 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
894 expect(poolInfo
).toStrictEqual({
896 type
: PoolTypes
.fixed
,
897 worker
: WorkerTypes
.thread
,
898 ready
: expect
.any(Boolean
),
899 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
900 minSize
: expect
.any(Number
),
901 maxSize
: expect
.any(Number
),
902 workerNodes
: expect
.any(Number
),
903 idleWorkerNodes
: expect
.any(Number
),
904 busyWorkerNodes
: expect
.any(Number
),
905 executedTasks
: expect
.any(Number
),
906 executingTasks
: expect
.any(Number
),
907 failedTasks
: expect
.any(Number
)
912 it("Verify that pool event emitter 'full' event can register a callback", async () => {
913 const pool
= new DynamicThreadPool(
914 Math
.floor(numberOfWorkers
/ 2),
916 './tests/worker-files/thread/testWorker.js'
918 const promises
= new Set()
921 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
925 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
926 promises
.add(pool
.execute())
928 await Promise
.all(promises
)
929 expect(poolFull
).toBe(1)
930 expect(poolInfo
).toStrictEqual({
932 type
: PoolTypes
.dynamic
,
933 worker
: WorkerTypes
.thread
,
934 ready
: expect
.any(Boolean
),
935 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
936 minSize
: expect
.any(Number
),
937 maxSize
: expect
.any(Number
),
938 workerNodes
: expect
.any(Number
),
939 idleWorkerNodes
: expect
.any(Number
),
940 busyWorkerNodes
: expect
.any(Number
),
941 executedTasks
: expect
.any(Number
),
942 executingTasks
: expect
.any(Number
),
943 failedTasks
: expect
.any(Number
)
948 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
949 const pool
= new FixedThreadPool(
951 './tests/worker-files/thread/testWorker.js',
953 enableTasksQueue
: true
956 sinon
.stub(pool
, 'hasBackPressure').returns(true)
957 const promises
= new Set()
958 let poolBackPressure
= 0
960 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
964 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
965 promises
.add(pool
.execute())
967 await Promise
.all(promises
)
968 expect(poolBackPressure
).toBe(1)
969 expect(poolInfo
).toStrictEqual({
971 type
: PoolTypes
.fixed
,
972 worker
: WorkerTypes
.thread
,
973 ready
: expect
.any(Boolean
),
974 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
975 minSize
: expect
.any(Number
),
976 maxSize
: expect
.any(Number
),
977 workerNodes
: expect
.any(Number
),
978 idleWorkerNodes
: expect
.any(Number
),
979 busyWorkerNodes
: expect
.any(Number
),
980 executedTasks
: expect
.any(Number
),
981 executingTasks
: expect
.any(Number
),
982 maxQueuedTasks
: expect
.any(Number
),
983 queuedTasks
: expect
.any(Number
),
985 stolenTasks
: expect
.any(Number
),
986 failedTasks
: expect
.any(Number
)
988 expect(pool
.hasBackPressure
.called
).toBe(true)
992 it('Verify that listTaskFunctions() is working', async () => {
993 const dynamicThreadPool
= new DynamicThreadPool(
994 Math
.floor(numberOfWorkers
/ 2),
996 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
998 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
999 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1001 'jsonIntegerSerialization',
1005 const fixedClusterPool
= new FixedClusterPool(
1007 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1009 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1010 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1012 'jsonIntegerSerialization',
1018 it('Verify that multiple task functions worker is working', async () => {
1019 const pool
= new DynamicClusterPool(
1020 Math
.floor(numberOfWorkers
/ 2),
1022 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1024 const data
= { n
: 10 }
1025 const result0
= await pool
.execute(data
)
1026 expect(result0
).toStrictEqual({ ok
: 1 })
1027 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1028 expect(result1
).toStrictEqual({ ok
: 1 })
1029 const result2
= await pool
.execute(data
, 'factorial')
1030 expect(result2
).toBe(3628800)
1031 const result3
= await pool
.execute(data
, 'fibonacci')
1032 expect(result3
).toBe(55)
1033 expect(pool
.info
.executingTasks
).toBe(0)
1034 expect(pool
.info
.executedTasks
).toBe(4)
1035 for (const workerNode
of pool
.workerNodes
) {
1036 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1038 'jsonIntegerSerialization',
1042 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1043 for (const name
of pool
.listTaskFunctions()) {
1044 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1046 executed
: expect
.any(Number
),
1047 executing
: expect
.any(Number
),
1053 history
: expect
.any(CircularArray
)
1056 history
: expect
.any(CircularArray
)
1060 history
: expect
.any(CircularArray
)
1063 history
: expect
.any(CircularArray
)
1068 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1069 ).toBeGreaterThanOrEqual(0)