1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
26 it('Simulate pool creation from a non main thread/process', () => {
29 new StubPoolWithIsMain(
31 './tests/worker-files/thread/testWorker.js',
33 errorHandler
: (e
) => console
.error(e
)
38 'Cannot start a pool from a worker with the same type as the pool'
43 it('Verify that filePath is checked', () => {
44 const expectedError
= new Error(
45 'Please specify a file with a worker implementation'
47 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
50 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
56 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
60 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
61 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
64 it('Verify that numberOfWorkers is checked', () => {
65 expect(() => new FixedThreadPool()).toThrowError(
67 'Cannot instantiate a pool without specifying the number of workers'
72 it('Verify that a negative number of workers is checked', () => {
75 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
78 'Cannot instantiate a pool with a negative number of workers'
83 it('Verify that a non integer number of workers is checked', () => {
86 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
89 'Cannot instantiate a pool with a non safe integer number of workers'
94 it('Verify that dynamic pool sizing is checked', () => {
97 new DynamicClusterPool(
100 './tests/worker-files/cluster/testWorker.js'
104 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
109 new DynamicThreadPool(
112 './tests/worker-files/thread/testWorker.js'
116 'Cannot instantiate a pool with a non safe integer number of workers'
121 new DynamicClusterPool(
124 './tests/worker-files/cluster/testWorker.js'
128 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
133 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
136 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
141 new DynamicClusterPool(
144 './tests/worker-files/cluster/testWorker.js'
148 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
153 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
156 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
161 it('Verify that pool options are checked', async () => {
162 let pool
= new FixedThreadPool(
164 './tests/worker-files/thread/testWorker.js'
166 expect(pool
.emitter
).toBeDefined()
167 expect(pool
.opts
.enableEvents
).toBe(true)
168 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
169 expect(pool
.opts
.enableTasksQueue
).toBe(false)
170 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
171 expect(pool
.opts
.workerChoiceStrategy
).toBe(
172 WorkerChoiceStrategies
.ROUND_ROBIN
174 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
176 runTime
: { median
: false },
177 waitTime
: { median
: false },
178 elu
: { median
: false }
180 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
182 runTime
: { median
: false },
183 waitTime
: { median
: false },
184 elu
: { median
: false }
186 expect(pool
.opts
.messageHandler
).toBeUndefined()
187 expect(pool
.opts
.errorHandler
).toBeUndefined()
188 expect(pool
.opts
.onlineHandler
).toBeUndefined()
189 expect(pool
.opts
.exitHandler
).toBeUndefined()
191 const testHandler
= () => console
.info('test handler executed')
192 pool
= new FixedThreadPool(
194 './tests/worker-files/thread/testWorker.js',
196 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
197 workerChoiceStrategyOptions
: {
198 runTime
: { median
: true },
199 weights
: { 0: 300, 1: 200 }
202 restartWorkerOnError
: false,
203 enableTasksQueue
: true,
204 tasksQueueOptions
: { concurrency
: 2 },
205 messageHandler
: testHandler
,
206 errorHandler
: testHandler
,
207 onlineHandler
: testHandler
,
208 exitHandler
: testHandler
211 expect(pool
.emitter
).toBeUndefined()
212 expect(pool
.opts
.enableEvents
).toBe(false)
213 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
214 expect(pool
.opts
.enableTasksQueue
).toBe(true)
215 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
219 expect(pool
.opts
.workerChoiceStrategy
).toBe(
220 WorkerChoiceStrategies
.LEAST_USED
222 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
224 runTime
: { median
: true },
225 waitTime
: { median
: false },
226 elu
: { median
: false },
227 weights
: { 0: 300, 1: 200 }
229 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
231 runTime
: { median
: true },
232 waitTime
: { median
: false },
233 elu
: { median
: false },
234 weights
: { 0: 300, 1: 200 }
236 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
237 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
238 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
239 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
243 it('Verify that pool options are validated', async () => {
248 './tests/worker-files/thread/testWorker.js',
250 workerChoiceStrategy
: 'invalidStrategy'
254 new Error("Invalid worker choice strategy 'invalidStrategy'")
260 './tests/worker-files/thread/testWorker.js',
262 workerChoiceStrategyOptions
: { weights
: {} }
267 'Invalid worker choice strategy options: must have a weight for each worker node'
274 './tests/worker-files/thread/testWorker.js',
276 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
281 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
288 './tests/worker-files/thread/testWorker.js',
290 enableTasksQueue
: true,
291 tasksQueueOptions
: { concurrency
: 0 }
296 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
303 './tests/worker-files/thread/testWorker.js',
305 enableTasksQueue
: true,
306 tasksQueueOptions
: 'invalidTasksQueueOptions'
310 new TypeError('Invalid tasks queue options: must be a plain object')
316 './tests/worker-files/thread/testWorker.js',
318 enableTasksQueue
: true,
319 tasksQueueOptions
: { concurrency
: 0.2 }
323 new TypeError('Invalid worker node tasks concurrency: must be an integer')
327 it('Verify that pool worker choice strategy options can be set', async () => {
328 const pool
= new FixedThreadPool(
330 './tests/worker-files/thread/testWorker.js',
331 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
333 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
335 runTime
: { median
: false },
336 waitTime
: { median
: false },
337 elu
: { median
: false }
339 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
341 runTime
: { median
: false },
342 waitTime
: { median
: false },
343 elu
: { median
: false }
345 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
346 .workerChoiceStrategies
) {
347 expect(workerChoiceStrategy
.opts
).toStrictEqual({
349 runTime
: { median
: false },
350 waitTime
: { median
: false },
351 elu
: { median
: false }
355 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
373 pool
.setWorkerChoiceStrategyOptions({
374 runTime
: { median
: true },
375 elu
: { median
: true }
377 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
379 runTime
: { median
: true },
380 waitTime
: { median
: false },
381 elu
: { median
: true }
383 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
385 runTime
: { median
: true },
386 waitTime
: { median
: false },
387 elu
: { median
: true }
389 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
390 .workerChoiceStrategies
) {
391 expect(workerChoiceStrategy
.opts
).toStrictEqual({
393 runTime
: { median
: true },
394 waitTime
: { median
: false },
395 elu
: { median
: true }
399 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
417 pool
.setWorkerChoiceStrategyOptions({
418 runTime
: { median
: false },
419 elu
: { median
: false }
421 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
423 runTime
: { median
: false },
424 waitTime
: { median
: false },
425 elu
: { median
: false }
427 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
429 runTime
: { median
: false },
430 waitTime
: { median
: false },
431 elu
: { median
: false }
433 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
434 .workerChoiceStrategies
) {
435 expect(workerChoiceStrategy
.opts
).toStrictEqual({
437 runTime
: { median
: false },
438 waitTime
: { median
: false },
439 elu
: { median
: false }
443 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
462 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
465 'Invalid worker choice strategy options: must be a plain object'
469 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
472 'Invalid worker choice strategy options: must have a weight for each worker node'
476 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
479 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
485 it('Verify that pool tasks queue can be enabled/disabled', async () => {
486 const pool
= new FixedThreadPool(
488 './tests/worker-files/thread/testWorker.js'
490 expect(pool
.opts
.enableTasksQueue
).toBe(false)
491 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
492 pool
.enableTasksQueue(true)
493 expect(pool
.opts
.enableTasksQueue
).toBe(true)
494 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
498 pool
.enableTasksQueue(true, { concurrency
: 2 })
499 expect(pool
.opts
.enableTasksQueue
).toBe(true)
500 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
504 pool
.enableTasksQueue(false)
505 expect(pool
.opts
.enableTasksQueue
).toBe(false)
506 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
510 it('Verify that pool tasks queue options can be set', async () => {
511 const pool
= new FixedThreadPool(
513 './tests/worker-files/thread/testWorker.js',
514 { enableTasksQueue
: true }
516 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
520 pool
.setTasksQueueOptions({ concurrency
: 2 })
521 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
526 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
528 new TypeError('Invalid tasks queue options: must be a plain object')
530 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
532 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
535 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
537 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
540 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
541 new TypeError('Invalid worker node tasks concurrency: must be an integer')
543 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
545 'Invalid worker node tasks queue max size: 0 is a negative integer or zero'
548 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
550 'Invalid worker node tasks queue max size: -1 is a negative integer or zero'
553 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
555 'Invalid worker node tasks queue max size: must be an integer'
561 it('Verify that pool info is set', async () => {
562 let pool
= new FixedThreadPool(
564 './tests/worker-files/thread/testWorker.js'
566 expect(pool
.info
).toStrictEqual({
568 type
: PoolTypes
.fixed
,
569 worker
: WorkerTypes
.thread
,
571 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
572 minSize
: numberOfWorkers
,
573 maxSize
: numberOfWorkers
,
574 workerNodes
: numberOfWorkers
,
575 idleWorkerNodes
: numberOfWorkers
,
582 pool
= new DynamicClusterPool(
583 Math
.floor(numberOfWorkers
/ 2),
585 './tests/worker-files/cluster/testWorker.js'
587 expect(pool
.info
).toStrictEqual({
589 type
: PoolTypes
.dynamic
,
590 worker
: WorkerTypes
.cluster
,
592 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
593 minSize
: Math
.floor(numberOfWorkers
/ 2),
594 maxSize
: numberOfWorkers
,
595 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
596 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
605 it('Verify that pool worker tasks usage are initialized', async () => {
606 const pool
= new FixedClusterPool(
608 './tests/worker-files/cluster/testWorker.js'
610 for (const workerNode
of pool
.workerNodes
) {
611 expect(workerNode
.usage
).toStrictEqual({
620 history
: expect
.any(CircularArray
)
623 history
: expect
.any(CircularArray
)
627 history
: expect
.any(CircularArray
)
630 history
: expect
.any(CircularArray
)
638 it('Verify that pool worker tasks queue are initialized', async () => {
639 let pool
= new FixedClusterPool(
641 './tests/worker-files/cluster/testWorker.js'
643 for (const workerNode
of pool
.workerNodes
) {
644 expect(workerNode
.tasksQueue
).toBeDefined()
645 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
646 expect(workerNode
.tasksQueue
.size
).toBe(0)
647 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
650 pool
= new DynamicThreadPool(
651 Math
.floor(numberOfWorkers
/ 2),
653 './tests/worker-files/thread/testWorker.js'
655 for (const workerNode
of pool
.workerNodes
) {
656 expect(workerNode
.tasksQueue
).toBeDefined()
657 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
658 expect(workerNode
.tasksQueue
.size
).toBe(0)
659 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
663 it('Verify that pool worker info are initialized', async () => {
664 let pool
= new FixedClusterPool(
666 './tests/worker-files/cluster/testWorker.js'
668 for (const workerNode
of pool
.workerNodes
) {
669 expect(workerNode
.info
).toStrictEqual({
670 id
: expect
.any(Number
),
671 type
: WorkerTypes
.cluster
,
677 pool
= new DynamicThreadPool(
678 Math
.floor(numberOfWorkers
/ 2),
680 './tests/worker-files/thread/testWorker.js'
682 for (const workerNode
of pool
.workerNodes
) {
683 expect(workerNode
.info
).toStrictEqual({
684 id
: expect
.any(Number
),
685 type
: WorkerTypes
.thread
,
692 it('Verify that pool worker tasks usage are computed', async () => {
693 const pool
= new FixedClusterPool(
695 './tests/worker-files/cluster/testWorker.js'
697 const promises
= new Set()
698 const maxMultiplier
= 2
699 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
700 promises
.add(pool
.execute())
702 for (const workerNode
of pool
.workerNodes
) {
703 expect(workerNode
.usage
).toStrictEqual({
706 executing
: maxMultiplier
,
712 history
: expect
.any(CircularArray
)
715 history
: expect
.any(CircularArray
)
719 history
: expect
.any(CircularArray
)
722 history
: expect
.any(CircularArray
)
727 await Promise
.all(promises
)
728 for (const workerNode
of pool
.workerNodes
) {
729 expect(workerNode
.usage
).toStrictEqual({
731 executed
: maxMultiplier
,
738 history
: expect
.any(CircularArray
)
741 history
: expect
.any(CircularArray
)
745 history
: expect
.any(CircularArray
)
748 history
: expect
.any(CircularArray
)
756 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
757 const pool
= new DynamicThreadPool(
758 Math
.floor(numberOfWorkers
/ 2),
760 './tests/worker-files/thread/testWorker.js'
762 const promises
= new Set()
763 const maxMultiplier
= 2
764 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
765 promises
.add(pool
.execute())
767 await Promise
.all(promises
)
768 for (const workerNode
of pool
.workerNodes
) {
769 expect(workerNode
.usage
).toStrictEqual({
771 executed
: expect
.any(Number
),
778 history
: expect
.any(CircularArray
)
781 history
: expect
.any(CircularArray
)
785 history
: expect
.any(CircularArray
)
788 history
: expect
.any(CircularArray
)
792 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
793 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
794 numberOfWorkers
* maxMultiplier
796 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
797 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
798 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
799 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
801 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
802 for (const workerNode
of pool
.workerNodes
) {
803 expect(workerNode
.usage
).toStrictEqual({
812 history
: expect
.any(CircularArray
)
815 history
: expect
.any(CircularArray
)
819 history
: expect
.any(CircularArray
)
822 history
: expect
.any(CircularArray
)
826 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
827 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
828 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
829 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
834 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
835 const pool
= new DynamicClusterPool(
836 Math
.floor(numberOfWorkers
/ 2),
838 './tests/worker-files/cluster/testWorker.js'
842 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
846 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
847 expect(poolReady
).toBe(1)
848 expect(poolInfo
).toStrictEqual({
850 type
: PoolTypes
.dynamic
,
851 worker
: WorkerTypes
.cluster
,
853 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
854 minSize
: expect
.any(Number
),
855 maxSize
: expect
.any(Number
),
856 workerNodes
: expect
.any(Number
),
857 idleWorkerNodes
: expect
.any(Number
),
858 busyWorkerNodes
: expect
.any(Number
),
859 executedTasks
: expect
.any(Number
),
860 executingTasks
: expect
.any(Number
),
861 failedTasks
: expect
.any(Number
)
866 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
867 const pool
= new FixedThreadPool(
869 './tests/worker-files/thread/testWorker.js'
871 const promises
= new Set()
874 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
878 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
879 promises
.add(pool
.execute())
881 await Promise
.all(promises
)
882 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
883 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
884 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
885 expect(poolInfo
).toStrictEqual({
887 type
: PoolTypes
.fixed
,
888 worker
: WorkerTypes
.thread
,
889 ready
: expect
.any(Boolean
),
890 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
891 minSize
: expect
.any(Number
),
892 maxSize
: expect
.any(Number
),
893 workerNodes
: expect
.any(Number
),
894 idleWorkerNodes
: expect
.any(Number
),
895 busyWorkerNodes
: expect
.any(Number
),
896 executedTasks
: expect
.any(Number
),
897 executingTasks
: expect
.any(Number
),
898 failedTasks
: expect
.any(Number
)
903 it("Verify that pool event emitter 'full' event can register a callback", async () => {
904 const pool
= new DynamicThreadPool(
905 Math
.floor(numberOfWorkers
/ 2),
907 './tests/worker-files/thread/testWorker.js'
909 const promises
= new Set()
912 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
916 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
917 promises
.add(pool
.execute())
919 await Promise
.all(promises
)
920 expect(poolFull
).toBe(1)
921 expect(poolInfo
).toStrictEqual({
923 type
: PoolTypes
.dynamic
,
924 worker
: WorkerTypes
.thread
,
925 ready
: expect
.any(Boolean
),
926 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
927 minSize
: expect
.any(Number
),
928 maxSize
: expect
.any(Number
),
929 workerNodes
: expect
.any(Number
),
930 idleWorkerNodes
: expect
.any(Number
),
931 busyWorkerNodes
: expect
.any(Number
),
932 executedTasks
: expect
.any(Number
),
933 executingTasks
: expect
.any(Number
),
934 failedTasks
: expect
.any(Number
)
939 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
940 const pool
= new FixedThreadPool(
942 './tests/worker-files/thread/testWorker.js',
944 enableTasksQueue
: true
947 sinon
.stub(pool
, 'hasBackPressure').returns(true)
948 const promises
= new Set()
949 let poolBackPressure
= 0
951 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
955 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
956 promises
.add(pool
.execute())
958 await Promise
.all(promises
)
959 expect(poolBackPressure
).toBe(2)
960 expect(poolInfo
).toStrictEqual({
962 type
: PoolTypes
.fixed
,
963 worker
: WorkerTypes
.thread
,
964 ready
: expect
.any(Boolean
),
965 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
966 minSize
: expect
.any(Number
),
967 maxSize
: expect
.any(Number
),
968 workerNodes
: expect
.any(Number
),
969 idleWorkerNodes
: expect
.any(Number
),
970 busyWorkerNodes
: expect
.any(Number
),
971 executedTasks
: expect
.any(Number
),
972 executingTasks
: expect
.any(Number
),
973 maxQueuedTasks
: expect
.any(Number
),
974 queuedTasks
: expect
.any(Number
),
976 failedTasks
: expect
.any(Number
)
978 expect(pool
.hasBackPressure
.called
).toBe(true)
982 it('Verify that listTaskFunctions() is working', async () => {
983 const dynamicThreadPool
= new DynamicThreadPool(
984 Math
.floor(numberOfWorkers
/ 2),
986 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
988 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
989 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
991 'jsonIntegerSerialization',
995 const fixedClusterPool
= new FixedClusterPool(
997 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
999 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1000 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1002 'jsonIntegerSerialization',
1008 it('Verify that multiple task functions worker is working', async () => {
1009 const pool
= new DynamicClusterPool(
1010 Math
.floor(numberOfWorkers
/ 2),
1012 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1014 const data
= { n
: 10 }
1015 const result0
= await pool
.execute(data
)
1016 expect(result0
).toStrictEqual({ ok
: 1 })
1017 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1018 expect(result1
).toStrictEqual({ ok
: 1 })
1019 const result2
= await pool
.execute(data
, 'factorial')
1020 expect(result2
).toBe(3628800)
1021 const result3
= await pool
.execute(data
, 'fibonacci')
1022 expect(result3
).toBe(55)
1023 expect(pool
.info
.executingTasks
).toBe(0)
1024 expect(pool
.info
.executedTasks
).toBe(4)
1025 for (const workerNode
of pool
.workerNodes
) {
1026 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1028 'jsonIntegerSerialization',
1032 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1033 for (const name
of pool
.listTaskFunctions()) {
1034 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1036 executed
: expect
.any(Number
),
1037 executing
: expect
.any(Number
),
1042 history
: expect
.any(CircularArray
)
1045 history
: expect
.any(CircularArray
)
1049 history
: expect
.any(CircularArray
)
1052 history
: expect
.any(CircularArray
)
1057 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1058 ).toBeGreaterThanOrEqual(0)