1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: (e
) => console
.error(e
)
42 'Cannot start a pool from a worker with the same type as the pool'
47 it('Verify that filePath is checked', () => {
48 const expectedError
= new Error(
49 'Please specify a file with a worker implementation'
51 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
57 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
60 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
64 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
65 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
68 it('Verify that numberOfWorkers is checked', () => {
69 expect(() => new FixedThreadPool()).toThrowError(
71 'Cannot instantiate a pool without specifying the number of workers'
76 it('Verify that a negative number of workers is checked', () => {
79 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
82 'Cannot instantiate a pool with a negative number of workers'
87 it('Verify that a non integer number of workers is checked', () => {
90 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a pool with a non safe integer number of workers'
98 it('Verify that dynamic pool sizing is checked', () => {
101 new DynamicClusterPool(
104 './tests/worker-files/cluster/testWorker.js'
108 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
113 new DynamicThreadPool(
116 './tests/worker-files/thread/testWorker.js'
120 'Cannot instantiate a pool with a non safe integer number of workers'
125 new DynamicClusterPool(
128 './tests/worker-files/cluster/testWorker.js'
132 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
137 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
140 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 it('Verify that pool options are checked', async () => {
166 let pool
= new FixedThreadPool(
168 './tests/worker-files/thread/testWorker.js'
170 expect(pool
.emitter
).toBeDefined()
171 expect(pool
.opts
.enableEvents
).toBe(true)
172 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
173 expect(pool
.opts
.enableTasksQueue
).toBe(false)
174 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
175 expect(pool
.opts
.workerChoiceStrategy
).toBe(
176 WorkerChoiceStrategies
.ROUND_ROBIN
178 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
180 runTime
: { median
: false },
181 waitTime
: { median
: false },
182 elu
: { median
: false }
184 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
186 runTime
: { median
: false },
187 waitTime
: { median
: false },
188 elu
: { median
: false }
190 expect(pool
.opts
.messageHandler
).toBeUndefined()
191 expect(pool
.opts
.errorHandler
).toBeUndefined()
192 expect(pool
.opts
.onlineHandler
).toBeUndefined()
193 expect(pool
.opts
.exitHandler
).toBeUndefined()
195 const testHandler
= () => console
.info('test handler executed')
196 pool
= new FixedThreadPool(
198 './tests/worker-files/thread/testWorker.js',
200 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
201 workerChoiceStrategyOptions
: {
202 runTime
: { median
: true },
203 weights
: { 0: 300, 1: 200 }
206 restartWorkerOnError
: false,
207 enableTasksQueue
: true,
208 tasksQueueOptions
: { concurrency
: 2 },
209 messageHandler
: testHandler
,
210 errorHandler
: testHandler
,
211 onlineHandler
: testHandler
,
212 exitHandler
: testHandler
215 expect(pool
.emitter
).toBeUndefined()
216 expect(pool
.opts
.enableEvents
).toBe(false)
217 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
223 expect(pool
.opts
.workerChoiceStrategy
).toBe(
224 WorkerChoiceStrategies
.LEAST_USED
226 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
228 runTime
: { median
: true },
229 waitTime
: { median
: false },
230 elu
: { median
: false },
231 weights
: { 0: 300, 1: 200 }
233 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
235 runTime
: { median
: true },
236 waitTime
: { median
: false },
237 elu
: { median
: false },
238 weights
: { 0: 300, 1: 200 }
240 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
241 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
242 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
243 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
247 it('Verify that pool options are validated', async () => {
252 './tests/worker-files/thread/testWorker.js',
254 workerChoiceStrategy
: 'invalidStrategy'
258 new Error("Invalid worker choice strategy 'invalidStrategy'")
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategyOptions
: { weights
: {} }
271 'Invalid worker choice strategy options: must have a weight for each worker node'
278 './tests/worker-files/thread/testWorker.js',
280 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
285 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
292 './tests/worker-files/thread/testWorker.js',
294 enableTasksQueue
: true,
295 tasksQueueOptions
: { concurrency
: 0 }
300 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
307 './tests/worker-files/thread/testWorker.js',
309 enableTasksQueue
: true,
310 tasksQueueOptions
: 'invalidTasksQueueOptions'
314 new TypeError('Invalid tasks queue options: must be a plain object')
320 './tests/worker-files/thread/testWorker.js',
322 enableTasksQueue
: true,
323 tasksQueueOptions
: { concurrency
: 0.2 }
327 new TypeError('Invalid worker node tasks concurrency: must be an integer')
331 it('Verify that pool worker choice strategy options can be set', async () => {
332 const pool
= new FixedThreadPool(
334 './tests/worker-files/thread/testWorker.js',
335 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
337 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
339 runTime
: { median
: false },
340 waitTime
: { median
: false },
341 elu
: { median
: false }
343 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
345 runTime
: { median
: false },
346 waitTime
: { median
: false },
347 elu
: { median
: false }
349 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
350 .workerChoiceStrategies
) {
351 expect(workerChoiceStrategy
.opts
).toStrictEqual({
353 runTime
: { median
: false },
354 waitTime
: { median
: false },
355 elu
: { median
: false }
359 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
377 pool
.setWorkerChoiceStrategyOptions({
378 runTime
: { median
: true },
379 elu
: { median
: true }
381 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
383 runTime
: { median
: true },
384 waitTime
: { median
: false },
385 elu
: { median
: true }
387 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
389 runTime
: { median
: true },
390 waitTime
: { median
: false },
391 elu
: { median
: true }
393 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
394 .workerChoiceStrategies
) {
395 expect(workerChoiceStrategy
.opts
).toStrictEqual({
397 runTime
: { median
: true },
398 waitTime
: { median
: false },
399 elu
: { median
: true }
403 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
421 pool
.setWorkerChoiceStrategyOptions({
422 runTime
: { median
: false },
423 elu
: { median
: false }
425 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
427 runTime
: { median
: false },
428 waitTime
: { median
: false },
429 elu
: { median
: false }
431 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
433 runTime
: { median
: false },
434 waitTime
: { median
: false },
435 elu
: { median
: false }
437 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
438 .workerChoiceStrategies
) {
439 expect(workerChoiceStrategy
.opts
).toStrictEqual({
441 runTime
: { median
: false },
442 waitTime
: { median
: false },
443 elu
: { median
: false }
447 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
466 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
469 'Invalid worker choice strategy options: must be a plain object'
473 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
476 'Invalid worker choice strategy options: must have a weight for each worker node'
480 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
483 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
489 it('Verify that pool tasks queue can be enabled/disabled', async () => {
490 const pool
= new FixedThreadPool(
492 './tests/worker-files/thread/testWorker.js'
494 expect(pool
.opts
.enableTasksQueue
).toBe(false)
495 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
496 pool
.enableTasksQueue(true)
497 expect(pool
.opts
.enableTasksQueue
).toBe(true)
498 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
502 pool
.enableTasksQueue(true, { concurrency
: 2 })
503 expect(pool
.opts
.enableTasksQueue
).toBe(true)
504 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
508 pool
.enableTasksQueue(false)
509 expect(pool
.opts
.enableTasksQueue
).toBe(false)
510 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
514 it('Verify that pool tasks queue options can be set', async () => {
515 const pool
= new FixedThreadPool(
517 './tests/worker-files/thread/testWorker.js',
518 { enableTasksQueue
: true }
520 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
524 pool
.setTasksQueueOptions({ concurrency
: 2 })
525 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
530 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
532 new TypeError('Invalid tasks queue options: must be a plain object')
534 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
536 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
539 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
541 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
544 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
545 new TypeError('Invalid worker node tasks concurrency: must be an integer')
547 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
549 'Invalid worker node tasks queue max size: 0 is a negative integer or zero'
552 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
554 'Invalid worker node tasks queue max size: -1 is a negative integer or zero'
557 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
559 'Invalid worker node tasks queue max size: must be an integer'
565 it('Verify that pool info is set', async () => {
566 let pool
= new FixedThreadPool(
568 './tests/worker-files/thread/testWorker.js'
570 expect(pool
.info
).toStrictEqual({
572 type
: PoolTypes
.fixed
,
573 worker
: WorkerTypes
.thread
,
575 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
576 minSize
: numberOfWorkers
,
577 maxSize
: numberOfWorkers
,
578 workerNodes
: numberOfWorkers
,
579 idleWorkerNodes
: numberOfWorkers
,
586 pool
= new DynamicClusterPool(
587 Math
.floor(numberOfWorkers
/ 2),
589 './tests/worker-files/cluster/testWorker.js'
591 expect(pool
.info
).toStrictEqual({
593 type
: PoolTypes
.dynamic
,
594 worker
: WorkerTypes
.cluster
,
596 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
597 minSize
: Math
.floor(numberOfWorkers
/ 2),
598 maxSize
: numberOfWorkers
,
599 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
600 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
609 it('Verify that pool worker tasks usage are initialized', async () => {
610 const pool
= new FixedClusterPool(
612 './tests/worker-files/cluster/testWorker.js'
614 for (const workerNode
of pool
.workerNodes
) {
615 expect(workerNode
.usage
).toStrictEqual({
624 history
: expect
.any(CircularArray
)
627 history
: expect
.any(CircularArray
)
631 history
: expect
.any(CircularArray
)
634 history
: expect
.any(CircularArray
)
642 it('Verify that pool worker tasks queue are initialized', async () => {
643 let pool
= new FixedClusterPool(
645 './tests/worker-files/cluster/testWorker.js'
647 for (const workerNode
of pool
.workerNodes
) {
648 expect(workerNode
.tasksQueue
).toBeDefined()
649 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
650 expect(workerNode
.tasksQueue
.size
).toBe(0)
651 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
654 pool
= new DynamicThreadPool(
655 Math
.floor(numberOfWorkers
/ 2),
657 './tests/worker-files/thread/testWorker.js'
659 for (const workerNode
of pool
.workerNodes
) {
660 expect(workerNode
.tasksQueue
).toBeDefined()
661 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
662 expect(workerNode
.tasksQueue
.size
).toBe(0)
663 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
667 it('Verify that pool worker info are initialized', async () => {
668 let pool
= new FixedClusterPool(
670 './tests/worker-files/cluster/testWorker.js'
672 for (const workerNode
of pool
.workerNodes
) {
673 expect(workerNode
.info
).toStrictEqual({
674 id
: expect
.any(Number
),
675 type
: WorkerTypes
.cluster
,
681 pool
= new DynamicThreadPool(
682 Math
.floor(numberOfWorkers
/ 2),
684 './tests/worker-files/thread/testWorker.js'
686 for (const workerNode
of pool
.workerNodes
) {
687 expect(workerNode
.info
).toStrictEqual({
688 id
: expect
.any(Number
),
689 type
: WorkerTypes
.thread
,
696 it('Verify that pool worker tasks usage are computed', async () => {
697 const pool
= new FixedClusterPool(
699 './tests/worker-files/cluster/testWorker.js'
701 const promises
= new Set()
702 const maxMultiplier
= 2
703 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
704 promises
.add(pool
.execute())
706 for (const workerNode
of pool
.workerNodes
) {
707 expect(workerNode
.usage
).toStrictEqual({
710 executing
: maxMultiplier
,
716 history
: expect
.any(CircularArray
)
719 history
: expect
.any(CircularArray
)
723 history
: expect
.any(CircularArray
)
726 history
: expect
.any(CircularArray
)
731 await Promise
.all(promises
)
732 for (const workerNode
of pool
.workerNodes
) {
733 expect(workerNode
.usage
).toStrictEqual({
735 executed
: maxMultiplier
,
742 history
: expect
.any(CircularArray
)
745 history
: expect
.any(CircularArray
)
749 history
: expect
.any(CircularArray
)
752 history
: expect
.any(CircularArray
)
760 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
761 const pool
= new DynamicThreadPool(
762 Math
.floor(numberOfWorkers
/ 2),
764 './tests/worker-files/thread/testWorker.js'
766 const promises
= new Set()
767 const maxMultiplier
= 2
768 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
769 promises
.add(pool
.execute())
771 await Promise
.all(promises
)
772 for (const workerNode
of pool
.workerNodes
) {
773 expect(workerNode
.usage
).toStrictEqual({
775 executed
: expect
.any(Number
),
782 history
: expect
.any(CircularArray
)
785 history
: expect
.any(CircularArray
)
789 history
: expect
.any(CircularArray
)
792 history
: expect
.any(CircularArray
)
796 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
797 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
798 numberOfWorkers
* maxMultiplier
800 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
801 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
802 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
803 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
805 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
806 for (const workerNode
of pool
.workerNodes
) {
807 expect(workerNode
.usage
).toStrictEqual({
816 history
: expect
.any(CircularArray
)
819 history
: expect
.any(CircularArray
)
823 history
: expect
.any(CircularArray
)
826 history
: expect
.any(CircularArray
)
830 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
831 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
832 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
833 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
838 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
839 const pool
= new DynamicClusterPool(
840 Math
.floor(numberOfWorkers
/ 2),
842 './tests/worker-files/cluster/testWorker.js'
846 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
850 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
851 expect(poolReady
).toBe(1)
852 expect(poolInfo
).toStrictEqual({
854 type
: PoolTypes
.dynamic
,
855 worker
: WorkerTypes
.cluster
,
857 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
858 minSize
: expect
.any(Number
),
859 maxSize
: expect
.any(Number
),
860 workerNodes
: expect
.any(Number
),
861 idleWorkerNodes
: expect
.any(Number
),
862 busyWorkerNodes
: expect
.any(Number
),
863 executedTasks
: expect
.any(Number
),
864 executingTasks
: expect
.any(Number
),
865 failedTasks
: expect
.any(Number
)
870 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
871 const pool
= new FixedThreadPool(
873 './tests/worker-files/thread/testWorker.js'
875 const promises
= new Set()
878 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
882 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
883 promises
.add(pool
.execute())
885 await Promise
.all(promises
)
886 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
887 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
888 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
889 expect(poolInfo
).toStrictEqual({
891 type
: PoolTypes
.fixed
,
892 worker
: WorkerTypes
.thread
,
893 ready
: expect
.any(Boolean
),
894 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
895 minSize
: expect
.any(Number
),
896 maxSize
: expect
.any(Number
),
897 workerNodes
: expect
.any(Number
),
898 idleWorkerNodes
: expect
.any(Number
),
899 busyWorkerNodes
: expect
.any(Number
),
900 executedTasks
: expect
.any(Number
),
901 executingTasks
: expect
.any(Number
),
902 failedTasks
: expect
.any(Number
)
907 it("Verify that pool event emitter 'full' event can register a callback", async () => {
908 const pool
= new DynamicThreadPool(
909 Math
.floor(numberOfWorkers
/ 2),
911 './tests/worker-files/thread/testWorker.js'
913 const promises
= new Set()
916 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
920 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
921 promises
.add(pool
.execute())
923 await Promise
.all(promises
)
924 expect(poolFull
).toBe(1)
925 expect(poolInfo
).toStrictEqual({
927 type
: PoolTypes
.dynamic
,
928 worker
: WorkerTypes
.thread
,
929 ready
: expect
.any(Boolean
),
930 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
931 minSize
: expect
.any(Number
),
932 maxSize
: expect
.any(Number
),
933 workerNodes
: expect
.any(Number
),
934 idleWorkerNodes
: expect
.any(Number
),
935 busyWorkerNodes
: expect
.any(Number
),
936 executedTasks
: expect
.any(Number
),
937 executingTasks
: expect
.any(Number
),
938 failedTasks
: expect
.any(Number
)
943 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
944 const pool
= new FixedThreadPool(
946 './tests/worker-files/thread/testWorker.js',
948 enableTasksQueue
: true
951 sinon
.stub(pool
, 'hasBackPressure').returns(true)
952 const promises
= new Set()
953 let poolBackPressure
= 0
955 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
959 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
960 promises
.add(pool
.execute())
962 await Promise
.all(promises
)
963 expect(poolBackPressure
).toBe(2)
964 expect(poolInfo
).toStrictEqual({
966 type
: PoolTypes
.fixed
,
967 worker
: WorkerTypes
.thread
,
968 ready
: expect
.any(Boolean
),
969 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
970 minSize
: expect
.any(Number
),
971 maxSize
: expect
.any(Number
),
972 workerNodes
: expect
.any(Number
),
973 idleWorkerNodes
: expect
.any(Number
),
974 busyWorkerNodes
: expect
.any(Number
),
975 executedTasks
: expect
.any(Number
),
976 executingTasks
: expect
.any(Number
),
977 maxQueuedTasks
: expect
.any(Number
),
978 queuedTasks
: expect
.any(Number
),
980 failedTasks
: expect
.any(Number
)
982 expect(pool
.hasBackPressure
.called
).toBe(true)
986 it('Verify that listTaskFunctions() is working', async () => {
987 const dynamicThreadPool
= new DynamicThreadPool(
988 Math
.floor(numberOfWorkers
/ 2),
990 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
992 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
993 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
995 'jsonIntegerSerialization',
999 const fixedClusterPool
= new FixedClusterPool(
1001 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1003 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1004 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1006 'jsonIntegerSerialization',
1012 it('Verify that multiple task functions worker is working', async () => {
1013 const pool
= new DynamicClusterPool(
1014 Math
.floor(numberOfWorkers
/ 2),
1016 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1018 const data
= { n
: 10 }
1019 const result0
= await pool
.execute(data
)
1020 expect(result0
).toStrictEqual({ ok
: 1 })
1021 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1022 expect(result1
).toStrictEqual({ ok
: 1 })
1023 const result2
= await pool
.execute(data
, 'factorial')
1024 expect(result2
).toBe(3628800)
1025 const result3
= await pool
.execute(data
, 'fibonacci')
1026 expect(result3
).toBe(55)
1027 expect(pool
.info
.executingTasks
).toBe(0)
1028 expect(pool
.info
.executedTasks
).toBe(4)
1029 for (const workerNode
of pool
.workerNodes
) {
1030 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1032 'jsonIntegerSerialization',
1036 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1037 for (const name
of pool
.listTaskFunctions()) {
1038 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1040 executed
: expect
.any(Number
),
1041 executing
: expect
.any(Number
),
1046 history
: expect
.any(CircularArray
)
1049 history
: expect
.any(CircularArray
)
1053 history
: expect
.any(CircularArray
)
1056 history
: expect
.any(CircularArray
)
1061 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1062 ).toBeGreaterThanOrEqual(0)