1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: (e
) => console
.error(e
)
42 'Cannot start a pool from a worker with the same type as the pool'
47 it('Verify that filePath is checked', () => {
48 const expectedError
= new Error(
49 'Please specify a file with a worker implementation'
51 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
57 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
60 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
64 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
65 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
68 it('Verify that numberOfWorkers is checked', () => {
69 expect(() => new FixedThreadPool()).toThrowError(
71 'Cannot instantiate a pool without specifying the number of workers'
76 it('Verify that a negative number of workers is checked', () => {
79 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
82 'Cannot instantiate a pool with a negative number of workers'
87 it('Verify that a non integer number of workers is checked', () => {
90 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a pool with a non safe integer number of workers'
98 it('Verify that dynamic pool sizing is checked', () => {
101 new DynamicClusterPool(
104 './tests/worker-files/cluster/testWorker.js'
108 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
113 new DynamicThreadPool(
116 './tests/worker-files/thread/testWorker.js'
120 'Cannot instantiate a pool with a non safe integer number of workers'
125 new DynamicClusterPool(
128 './tests/worker-files/cluster/testWorker.js'
132 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
137 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
140 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 it('Verify that pool options are checked', async () => {
166 let pool
= new FixedThreadPool(
168 './tests/worker-files/thread/testWorker.js'
170 expect(pool
.emitter
).toBeDefined()
171 expect(pool
.opts
.enableEvents
).toBe(true)
172 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
173 expect(pool
.opts
.enableTasksQueue
).toBe(false)
174 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
175 expect(pool
.opts
.workerChoiceStrategy
).toBe(
176 WorkerChoiceStrategies
.ROUND_ROBIN
178 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
180 runTime
: { median
: false },
181 waitTime
: { median
: false },
182 elu
: { median
: false }
184 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
186 runTime
: { median
: false },
187 waitTime
: { median
: false },
188 elu
: { median
: false }
190 expect(pool
.opts
.messageHandler
).toBeUndefined()
191 expect(pool
.opts
.errorHandler
).toBeUndefined()
192 expect(pool
.opts
.onlineHandler
).toBeUndefined()
193 expect(pool
.opts
.exitHandler
).toBeUndefined()
195 const testHandler
= () => console
.info('test handler executed')
196 pool
= new FixedThreadPool(
198 './tests/worker-files/thread/testWorker.js',
200 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
201 workerChoiceStrategyOptions
: {
202 runTime
: { median
: true },
203 weights
: { 0: 300, 1: 200 }
206 restartWorkerOnError
: false,
207 enableTasksQueue
: true,
208 tasksQueueOptions
: { concurrency
: 2 },
209 messageHandler
: testHandler
,
210 errorHandler
: testHandler
,
211 onlineHandler
: testHandler
,
212 exitHandler
: testHandler
215 expect(pool
.emitter
).toBeUndefined()
216 expect(pool
.opts
.enableEvents
).toBe(false)
217 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
223 expect(pool
.opts
.workerChoiceStrategy
).toBe(
224 WorkerChoiceStrategies
.LEAST_USED
226 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
228 runTime
: { median
: true },
229 waitTime
: { median
: false },
230 elu
: { median
: false },
231 weights
: { 0: 300, 1: 200 }
233 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
235 runTime
: { median
: true },
236 waitTime
: { median
: false },
237 elu
: { median
: false },
238 weights
: { 0: 300, 1: 200 }
240 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
241 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
242 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
243 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
247 it('Verify that pool options are validated', async () => {
252 './tests/worker-files/thread/testWorker.js',
254 workerChoiceStrategy
: 'invalidStrategy'
258 new Error("Invalid worker choice strategy 'invalidStrategy'")
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategyOptions
: { weights
: {} }
271 'Invalid worker choice strategy options: must have a weight for each worker node'
278 './tests/worker-files/thread/testWorker.js',
280 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
285 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
292 './tests/worker-files/thread/testWorker.js',
294 enableTasksQueue
: true,
295 tasksQueueOptions
: { concurrency
: 0 }
300 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
307 './tests/worker-files/thread/testWorker.js',
309 enableTasksQueue
: true,
310 tasksQueueOptions
: 'invalidTasksQueueOptions'
314 new TypeError('Invalid tasks queue options: must be a plain object')
320 './tests/worker-files/thread/testWorker.js',
322 enableTasksQueue
: true,
323 tasksQueueOptions
: { concurrency
: 0.2 }
327 new TypeError('Invalid worker node tasks concurrency: must be an integer')
331 it('Verify that pool worker choice strategy options can be set', async () => {
332 const pool
= new FixedThreadPool(
334 './tests/worker-files/thread/testWorker.js',
335 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
337 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
339 runTime
: { median
: false },
340 waitTime
: { median
: false },
341 elu
: { median
: false }
343 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
345 runTime
: { median
: false },
346 waitTime
: { median
: false },
347 elu
: { median
: false }
349 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
350 .workerChoiceStrategies
) {
351 expect(workerChoiceStrategy
.opts
).toStrictEqual({
353 runTime
: { median
: false },
354 waitTime
: { median
: false },
355 elu
: { median
: false }
359 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
377 pool
.setWorkerChoiceStrategyOptions({
378 runTime
: { median
: true },
379 elu
: { median
: true }
381 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
383 runTime
: { median
: true },
384 waitTime
: { median
: false },
385 elu
: { median
: true }
387 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
389 runTime
: { median
: true },
390 waitTime
: { median
: false },
391 elu
: { median
: true }
393 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
394 .workerChoiceStrategies
) {
395 expect(workerChoiceStrategy
.opts
).toStrictEqual({
397 runTime
: { median
: true },
398 waitTime
: { median
: false },
399 elu
: { median
: true }
403 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
421 pool
.setWorkerChoiceStrategyOptions({
422 runTime
: { median
: false },
423 elu
: { median
: false }
425 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
427 runTime
: { median
: false },
428 waitTime
: { median
: false },
429 elu
: { median
: false }
431 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
433 runTime
: { median
: false },
434 waitTime
: { median
: false },
435 elu
: { median
: false }
437 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
438 .workerChoiceStrategies
) {
439 expect(workerChoiceStrategy
.opts
).toStrictEqual({
441 runTime
: { median
: false },
442 waitTime
: { median
: false },
443 elu
: { median
: false }
447 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
466 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
469 'Invalid worker choice strategy options: must be a plain object'
473 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
476 'Invalid worker choice strategy options: must have a weight for each worker node'
480 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
483 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
489 it('Verify that pool tasks queue can be enabled/disabled', async () => {
490 const pool
= new FixedThreadPool(
492 './tests/worker-files/thread/testWorker.js'
494 expect(pool
.opts
.enableTasksQueue
).toBe(false)
495 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
496 pool
.enableTasksQueue(true)
497 expect(pool
.opts
.enableTasksQueue
).toBe(true)
498 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
502 pool
.enableTasksQueue(true, { concurrency
: 2 })
503 expect(pool
.opts
.enableTasksQueue
).toBe(true)
504 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
508 pool
.enableTasksQueue(false)
509 expect(pool
.opts
.enableTasksQueue
).toBe(false)
510 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
514 it('Verify that pool tasks queue options can be set', async () => {
515 const pool
= new FixedThreadPool(
517 './tests/worker-files/thread/testWorker.js',
518 { enableTasksQueue
: true }
520 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
524 pool
.setTasksQueueOptions({ concurrency
: 2 })
525 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
530 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
532 new TypeError('Invalid tasks queue options: must be a plain object')
534 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
536 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
539 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
541 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
544 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
545 new TypeError('Invalid worker node tasks concurrency: must be an integer')
547 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
549 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
552 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
554 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
557 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
559 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
562 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
563 new TypeError('Invalid worker node tasks queue size: must be an integer')
568 it('Verify that pool info is set', async () => {
569 let pool
= new FixedThreadPool(
571 './tests/worker-files/thread/testWorker.js'
573 expect(pool
.info
).toStrictEqual({
575 type
: PoolTypes
.fixed
,
576 worker
: WorkerTypes
.thread
,
578 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
579 minSize
: numberOfWorkers
,
580 maxSize
: numberOfWorkers
,
581 workerNodes
: numberOfWorkers
,
582 idleWorkerNodes
: numberOfWorkers
,
589 pool
= new DynamicClusterPool(
590 Math
.floor(numberOfWorkers
/ 2),
592 './tests/worker-files/cluster/testWorker.js'
594 expect(pool
.info
).toStrictEqual({
596 type
: PoolTypes
.dynamic
,
597 worker
: WorkerTypes
.cluster
,
599 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
600 minSize
: Math
.floor(numberOfWorkers
/ 2),
601 maxSize
: numberOfWorkers
,
602 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
603 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
612 it('Verify that pool worker tasks usage are initialized', async () => {
613 const pool
= new FixedClusterPool(
615 './tests/worker-files/cluster/testWorker.js'
617 for (const workerNode
of pool
.workerNodes
) {
618 expect(workerNode
.usage
).toStrictEqual({
628 history
: expect
.any(CircularArray
)
631 history
: expect
.any(CircularArray
)
635 history
: expect
.any(CircularArray
)
638 history
: expect
.any(CircularArray
)
646 it('Verify that pool worker tasks queue are initialized', async () => {
647 let pool
= new FixedClusterPool(
649 './tests/worker-files/cluster/testWorker.js'
651 for (const workerNode
of pool
.workerNodes
) {
652 expect(workerNode
.tasksQueue
).toBeDefined()
653 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
654 expect(workerNode
.tasksQueue
.size
).toBe(0)
655 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
658 pool
= new DynamicThreadPool(
659 Math
.floor(numberOfWorkers
/ 2),
661 './tests/worker-files/thread/testWorker.js'
663 for (const workerNode
of pool
.workerNodes
) {
664 expect(workerNode
.tasksQueue
).toBeDefined()
665 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
666 expect(workerNode
.tasksQueue
.size
).toBe(0)
667 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
671 it('Verify that pool worker info are initialized', async () => {
672 let pool
= new FixedClusterPool(
674 './tests/worker-files/cluster/testWorker.js'
676 for (const workerNode
of pool
.workerNodes
) {
677 expect(workerNode
.info
).toStrictEqual({
678 id
: expect
.any(Number
),
679 type
: WorkerTypes
.cluster
,
685 pool
= new DynamicThreadPool(
686 Math
.floor(numberOfWorkers
/ 2),
688 './tests/worker-files/thread/testWorker.js'
690 for (const workerNode
of pool
.workerNodes
) {
691 expect(workerNode
.info
).toStrictEqual({
692 id
: expect
.any(Number
),
693 type
: WorkerTypes
.thread
,
700 it('Verify that pool worker tasks usage are computed', async () => {
701 const pool
= new FixedClusterPool(
703 './tests/worker-files/cluster/testWorker.js'
705 const promises
= new Set()
706 const maxMultiplier
= 2
707 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
708 promises
.add(pool
.execute())
710 for (const workerNode
of pool
.workerNodes
) {
711 expect(workerNode
.usage
).toStrictEqual({
714 executing
: maxMultiplier
,
721 history
: expect
.any(CircularArray
)
724 history
: expect
.any(CircularArray
)
728 history
: expect
.any(CircularArray
)
731 history
: expect
.any(CircularArray
)
736 await Promise
.all(promises
)
737 for (const workerNode
of pool
.workerNodes
) {
738 expect(workerNode
.usage
).toStrictEqual({
740 executed
: maxMultiplier
,
748 history
: expect
.any(CircularArray
)
751 history
: expect
.any(CircularArray
)
755 history
: expect
.any(CircularArray
)
758 history
: expect
.any(CircularArray
)
766 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
767 const pool
= new DynamicThreadPool(
768 Math
.floor(numberOfWorkers
/ 2),
770 './tests/worker-files/thread/testWorker.js'
772 const promises
= new Set()
773 const maxMultiplier
= 2
774 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
775 promises
.add(pool
.execute())
777 await Promise
.all(promises
)
778 for (const workerNode
of pool
.workerNodes
) {
779 expect(workerNode
.usage
).toStrictEqual({
781 executed
: expect
.any(Number
),
789 history
: expect
.any(CircularArray
)
792 history
: expect
.any(CircularArray
)
796 history
: expect
.any(CircularArray
)
799 history
: expect
.any(CircularArray
)
803 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
804 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
805 numberOfWorkers
* maxMultiplier
807 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
808 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
809 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
810 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
812 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
813 for (const workerNode
of pool
.workerNodes
) {
814 expect(workerNode
.usage
).toStrictEqual({
824 history
: expect
.any(CircularArray
)
827 history
: expect
.any(CircularArray
)
831 history
: expect
.any(CircularArray
)
834 history
: expect
.any(CircularArray
)
838 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
839 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
840 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
841 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
846 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
847 const pool
= new DynamicClusterPool(
848 Math
.floor(numberOfWorkers
/ 2),
850 './tests/worker-files/cluster/testWorker.js'
854 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
858 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
859 expect(poolReady
).toBe(1)
860 expect(poolInfo
).toStrictEqual({
862 type
: PoolTypes
.dynamic
,
863 worker
: WorkerTypes
.cluster
,
865 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
866 minSize
: expect
.any(Number
),
867 maxSize
: expect
.any(Number
),
868 workerNodes
: expect
.any(Number
),
869 idleWorkerNodes
: expect
.any(Number
),
870 busyWorkerNodes
: expect
.any(Number
),
871 executedTasks
: expect
.any(Number
),
872 executingTasks
: expect
.any(Number
),
873 failedTasks
: expect
.any(Number
)
878 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
879 const pool
= new FixedThreadPool(
881 './tests/worker-files/thread/testWorker.js'
883 const promises
= new Set()
886 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
890 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
891 promises
.add(pool
.execute())
893 await Promise
.all(promises
)
894 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
895 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
896 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
897 expect(poolInfo
).toStrictEqual({
899 type
: PoolTypes
.fixed
,
900 worker
: WorkerTypes
.thread
,
901 ready
: expect
.any(Boolean
),
902 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
903 minSize
: expect
.any(Number
),
904 maxSize
: expect
.any(Number
),
905 workerNodes
: expect
.any(Number
),
906 idleWorkerNodes
: expect
.any(Number
),
907 busyWorkerNodes
: expect
.any(Number
),
908 executedTasks
: expect
.any(Number
),
909 executingTasks
: expect
.any(Number
),
910 failedTasks
: expect
.any(Number
)
915 it("Verify that pool event emitter 'full' event can register a callback", async () => {
916 const pool
= new DynamicThreadPool(
917 Math
.floor(numberOfWorkers
/ 2),
919 './tests/worker-files/thread/testWorker.js'
921 const promises
= new Set()
924 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
928 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
929 promises
.add(pool
.execute())
931 await Promise
.all(promises
)
932 expect(poolFull
).toBe(1)
933 expect(poolInfo
).toStrictEqual({
935 type
: PoolTypes
.dynamic
,
936 worker
: WorkerTypes
.thread
,
937 ready
: expect
.any(Boolean
),
938 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
939 minSize
: expect
.any(Number
),
940 maxSize
: expect
.any(Number
),
941 workerNodes
: expect
.any(Number
),
942 idleWorkerNodes
: expect
.any(Number
),
943 busyWorkerNodes
: expect
.any(Number
),
944 executedTasks
: expect
.any(Number
),
945 executingTasks
: expect
.any(Number
),
946 failedTasks
: expect
.any(Number
)
951 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
952 const pool
= new FixedThreadPool(
954 './tests/worker-files/thread/testWorker.js',
956 enableTasksQueue
: true
959 sinon
.stub(pool
, 'hasBackPressure').returns(true)
960 const promises
= new Set()
961 let poolBackPressure
= 0
963 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
967 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
968 promises
.add(pool
.execute())
970 await Promise
.all(promises
)
971 expect(poolBackPressure
).toBe(1)
972 expect(poolInfo
).toStrictEqual({
974 type
: PoolTypes
.fixed
,
975 worker
: WorkerTypes
.thread
,
976 ready
: expect
.any(Boolean
),
977 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
978 minSize
: expect
.any(Number
),
979 maxSize
: expect
.any(Number
),
980 workerNodes
: expect
.any(Number
),
981 idleWorkerNodes
: expect
.any(Number
),
982 busyWorkerNodes
: expect
.any(Number
),
983 executedTasks
: expect
.any(Number
),
984 executingTasks
: expect
.any(Number
),
985 maxQueuedTasks
: expect
.any(Number
),
986 queuedTasks
: expect
.any(Number
),
988 stolenTasks
: expect
.any(Number
),
989 failedTasks
: expect
.any(Number
)
991 expect(pool
.hasBackPressure
.called
).toBe(true)
995 it('Verify that listTaskFunctions() is working', async () => {
996 const dynamicThreadPool
= new DynamicThreadPool(
997 Math
.floor(numberOfWorkers
/ 2),
999 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1001 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1002 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1004 'jsonIntegerSerialization',
1008 const fixedClusterPool
= new FixedClusterPool(
1010 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1012 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1013 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1015 'jsonIntegerSerialization',
1021 it('Verify that multiple task functions worker is working', async () => {
1022 const pool
= new DynamicClusterPool(
1023 Math
.floor(numberOfWorkers
/ 2),
1025 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1027 const data
= { n
: 10 }
1028 const result0
= await pool
.execute(data
)
1029 expect(result0
).toStrictEqual({ ok
: 1 })
1030 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1031 expect(result1
).toStrictEqual({ ok
: 1 })
1032 const result2
= await pool
.execute(data
, 'factorial')
1033 expect(result2
).toBe(3628800)
1034 const result3
= await pool
.execute(data
, 'fibonacci')
1035 expect(result3
).toBe(55)
1036 expect(pool
.info
.executingTasks
).toBe(0)
1037 expect(pool
.info
.executedTasks
).toBe(4)
1038 for (const workerNode
of pool
.workerNodes
) {
1039 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1041 'jsonIntegerSerialization',
1045 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1046 for (const name
of pool
.listTaskFunctions()) {
1047 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1049 executed
: expect
.any(Number
),
1050 executing
: expect
.any(Number
),
1056 history
: expect
.any(CircularArray
)
1059 history
: expect
.any(CircularArray
)
1063 history
: expect
.any(CircularArray
)
1066 history
: expect
.any(CircularArray
)
1071 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1072 ).toBeGreaterThanOrEqual(0)