1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: (e
) => console
.error(e
)
42 'Cannot start a pool from a worker with the same type as the pool'
47 it('Verify that filePath is checked', () => {
48 const expectedError
= new Error(
49 'Please specify a file with a worker implementation'
51 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
57 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
60 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
64 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
65 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
68 it('Verify that numberOfWorkers is checked', () => {
69 expect(() => new FixedThreadPool()).toThrowError(
71 'Cannot instantiate a pool without specifying the number of workers'
76 it('Verify that a negative number of workers is checked', () => {
79 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
82 'Cannot instantiate a pool with a negative number of workers'
87 it('Verify that a non integer number of workers is checked', () => {
90 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a pool with a non safe integer number of workers'
98 it('Verify that dynamic pool sizing is checked', () => {
101 new DynamicClusterPool(
104 './tests/worker-files/cluster/testWorker.js'
108 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
113 new DynamicThreadPool(
116 './tests/worker-files/thread/testWorker.js'
120 'Cannot instantiate a pool with a non safe integer number of workers'
125 new DynamicClusterPool(
128 './tests/worker-files/cluster/testWorker.js'
132 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
137 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
140 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 it('Verify that pool options are checked', async () => {
166 let pool
= new FixedThreadPool(
168 './tests/worker-files/thread/testWorker.js'
170 expect(pool
.emitter
).toBeDefined()
171 expect(pool
.opts
.enableEvents
).toBe(true)
172 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
173 expect(pool
.opts
.enableTasksQueue
).toBe(false)
174 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
175 expect(pool
.opts
.workerChoiceStrategy
).toBe(
176 WorkerChoiceStrategies
.ROUND_ROBIN
178 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
180 runTime
: { median
: false },
181 waitTime
: { median
: false },
182 elu
: { median
: false }
184 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
186 runTime
: { median
: false },
187 waitTime
: { median
: false },
188 elu
: { median
: false }
190 expect(pool
.opts
.messageHandler
).toBeUndefined()
191 expect(pool
.opts
.errorHandler
).toBeUndefined()
192 expect(pool
.opts
.onlineHandler
).toBeUndefined()
193 expect(pool
.opts
.exitHandler
).toBeUndefined()
195 const testHandler
= () => console
.info('test handler executed')
196 pool
= new FixedThreadPool(
198 './tests/worker-files/thread/testWorker.js',
200 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
201 workerChoiceStrategyOptions
: {
202 runTime
: { median
: true },
203 weights
: { 0: 300, 1: 200 }
206 restartWorkerOnError
: false,
207 enableTasksQueue
: true,
208 tasksQueueOptions
: { concurrency
: 2 },
209 messageHandler
: testHandler
,
210 errorHandler
: testHandler
,
211 onlineHandler
: testHandler
,
212 exitHandler
: testHandler
215 expect(pool
.emitter
).toBeUndefined()
216 expect(pool
.opts
.enableEvents
).toBe(false)
217 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
223 expect(pool
.opts
.workerChoiceStrategy
).toBe(
224 WorkerChoiceStrategies
.LEAST_USED
226 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
228 runTime
: { median
: true },
229 waitTime
: { median
: false },
230 elu
: { median
: false },
231 weights
: { 0: 300, 1: 200 }
233 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
235 runTime
: { median
: true },
236 waitTime
: { median
: false },
237 elu
: { median
: false },
238 weights
: { 0: 300, 1: 200 }
240 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
241 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
242 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
243 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
247 it('Verify that pool options are validated', async () => {
252 './tests/worker-files/thread/testWorker.js',
254 workerChoiceStrategy
: 'invalidStrategy'
258 new Error("Invalid worker choice strategy 'invalidStrategy'")
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategyOptions
: { weights
: {} }
271 'Invalid worker choice strategy options: must have a weight for each worker node'
278 './tests/worker-files/thread/testWorker.js',
280 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
285 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
292 './tests/worker-files/thread/testWorker.js',
294 enableTasksQueue
: true,
295 tasksQueueOptions
: { concurrency
: 0 }
300 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
307 './tests/worker-files/thread/testWorker.js',
309 enableTasksQueue
: true,
310 tasksQueueOptions
: 'invalidTasksQueueOptions'
314 new TypeError('Invalid tasks queue options: must be a plain object')
320 './tests/worker-files/thread/testWorker.js',
322 enableTasksQueue
: true,
323 tasksQueueOptions
: { concurrency
: 0.2 }
327 new TypeError('Invalid worker node tasks concurrency: must be an integer')
331 it('Verify that pool worker choice strategy options can be set', async () => {
332 const pool
= new FixedThreadPool(
334 './tests/worker-files/thread/testWorker.js',
335 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
337 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
339 runTime
: { median
: false },
340 waitTime
: { median
: false },
341 elu
: { median
: false }
343 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
345 runTime
: { median
: false },
346 waitTime
: { median
: false },
347 elu
: { median
: false }
349 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
350 .workerChoiceStrategies
) {
351 expect(workerChoiceStrategy
.opts
).toStrictEqual({
353 runTime
: { median
: false },
354 waitTime
: { median
: false },
355 elu
: { median
: false }
359 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
377 pool
.setWorkerChoiceStrategyOptions({
378 runTime
: { median
: true },
379 elu
: { median
: true }
381 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
383 runTime
: { median
: true },
384 waitTime
: { median
: false },
385 elu
: { median
: true }
387 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
389 runTime
: { median
: true },
390 waitTime
: { median
: false },
391 elu
: { median
: true }
393 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
394 .workerChoiceStrategies
) {
395 expect(workerChoiceStrategy
.opts
).toStrictEqual({
397 runTime
: { median
: true },
398 waitTime
: { median
: false },
399 elu
: { median
: true }
403 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
421 pool
.setWorkerChoiceStrategyOptions({
422 runTime
: { median
: false },
423 elu
: { median
: false }
425 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
427 runTime
: { median
: false },
428 waitTime
: { median
: false },
429 elu
: { median
: false }
431 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
433 runTime
: { median
: false },
434 waitTime
: { median
: false },
435 elu
: { median
: false }
437 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
438 .workerChoiceStrategies
) {
439 expect(workerChoiceStrategy
.opts
).toStrictEqual({
441 runTime
: { median
: false },
442 waitTime
: { median
: false },
443 elu
: { median
: false }
447 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
466 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
469 'Invalid worker choice strategy options: must be a plain object'
473 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
476 'Invalid worker choice strategy options: must have a weight for each worker node'
480 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
483 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
489 it('Verify that pool tasks queue can be enabled/disabled', async () => {
490 const pool
= new FixedThreadPool(
492 './tests/worker-files/thread/testWorker.js'
494 expect(pool
.opts
.enableTasksQueue
).toBe(false)
495 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
496 pool
.enableTasksQueue(true)
497 expect(pool
.opts
.enableTasksQueue
).toBe(true)
498 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
502 pool
.enableTasksQueue(true, { concurrency
: 2 })
503 expect(pool
.opts
.enableTasksQueue
).toBe(true)
504 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
508 pool
.enableTasksQueue(false)
509 expect(pool
.opts
.enableTasksQueue
).toBe(false)
510 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
514 it('Verify that pool tasks queue options can be set', async () => {
515 const pool
= new FixedThreadPool(
517 './tests/worker-files/thread/testWorker.js',
518 { enableTasksQueue
: true }
520 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
524 pool
.setTasksQueueOptions({ concurrency
: 2 })
525 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
530 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
532 new TypeError('Invalid tasks queue options: must be a plain object')
534 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
536 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
539 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
541 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
544 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
545 new TypeError('Invalid worker node tasks concurrency: must be an integer')
547 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
549 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
552 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
554 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
557 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
559 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
562 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
563 new TypeError('Invalid worker node tasks queue size: must be an integer')
568 it('Verify that pool info is set', async () => {
569 let pool
= new FixedThreadPool(
571 './tests/worker-files/thread/testWorker.js'
573 expect(pool
.info
).toStrictEqual({
575 type
: PoolTypes
.fixed
,
576 worker
: WorkerTypes
.thread
,
578 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
579 minSize
: numberOfWorkers
,
580 maxSize
: numberOfWorkers
,
581 workerNodes
: numberOfWorkers
,
582 idleWorkerNodes
: numberOfWorkers
,
589 pool
= new DynamicClusterPool(
590 Math
.floor(numberOfWorkers
/ 2),
592 './tests/worker-files/cluster/testWorker.js'
594 expect(pool
.info
).toStrictEqual({
596 type
: PoolTypes
.dynamic
,
597 worker
: WorkerTypes
.cluster
,
599 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
600 minSize
: Math
.floor(numberOfWorkers
/ 2),
601 maxSize
: numberOfWorkers
,
602 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
603 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
612 it('Verify that pool worker tasks usage are initialized', async () => {
613 const pool
= new FixedClusterPool(
615 './tests/worker-files/cluster/testWorker.js'
617 for (const workerNode
of pool
.workerNodes
) {
618 expect(workerNode
.usage
).toStrictEqual({
628 history
: expect
.any(CircularArray
)
631 history
: expect
.any(CircularArray
)
635 history
: expect
.any(CircularArray
)
638 history
: expect
.any(CircularArray
)
646 it('Verify that pool worker tasks queue are initialized', async () => {
647 let pool
= new FixedClusterPool(
649 './tests/worker-files/cluster/testWorker.js'
651 for (const workerNode
of pool
.workerNodes
) {
652 expect(workerNode
.tasksQueue
).toBeDefined()
653 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
654 expect(workerNode
.tasksQueue
.size
).toBe(0)
655 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
658 pool
= new DynamicThreadPool(
659 Math
.floor(numberOfWorkers
/ 2),
661 './tests/worker-files/thread/testWorker.js'
663 for (const workerNode
of pool
.workerNodes
) {
664 expect(workerNode
.tasksQueue
).toBeDefined()
665 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
666 expect(workerNode
.tasksQueue
.size
).toBe(0)
667 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
671 it('Verify that pool worker info are initialized', async () => {
672 let pool
= new FixedClusterPool(
674 './tests/worker-files/cluster/testWorker.js'
676 for (const workerNode
of pool
.workerNodes
) {
677 expect(workerNode
.info
).toStrictEqual({
678 id
: expect
.any(Number
),
679 type
: WorkerTypes
.cluster
,
685 pool
= new DynamicThreadPool(
686 Math
.floor(numberOfWorkers
/ 2),
688 './tests/worker-files/thread/testWorker.js'
690 for (const workerNode
of pool
.workerNodes
) {
691 expect(workerNode
.info
).toStrictEqual({
692 id
: expect
.any(Number
),
693 type
: WorkerTypes
.thread
,
700 it('Verify that pool execute() arguments are checked', async () => {
701 const pool
= new FixedClusterPool(
703 './tests/worker-files/cluster/testWorker.js'
705 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
706 new TypeError('name argument must be a string')
708 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
709 new TypeError('name argument must not be an empty string')
711 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
712 new TypeError('transferList argument must be an array')
714 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
715 "Task function 'unknown' not found"
718 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
719 new Error('Cannot execute a task on destroyed pool')
723 it('Verify that pool worker tasks usage are computed', async () => {
724 const pool
= new FixedClusterPool(
726 './tests/worker-files/cluster/testWorker.js'
728 const promises
= new Set()
729 const maxMultiplier
= 2
730 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
731 promises
.add(pool
.execute())
733 for (const workerNode
of pool
.workerNodes
) {
734 expect(workerNode
.usage
).toStrictEqual({
737 executing
: maxMultiplier
,
744 history
: expect
.any(CircularArray
)
747 history
: expect
.any(CircularArray
)
751 history
: expect
.any(CircularArray
)
754 history
: expect
.any(CircularArray
)
759 await Promise
.all(promises
)
760 for (const workerNode
of pool
.workerNodes
) {
761 expect(workerNode
.usage
).toStrictEqual({
763 executed
: maxMultiplier
,
771 history
: expect
.any(CircularArray
)
774 history
: expect
.any(CircularArray
)
778 history
: expect
.any(CircularArray
)
781 history
: expect
.any(CircularArray
)
789 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
790 const pool
= new DynamicThreadPool(
791 Math
.floor(numberOfWorkers
/ 2),
793 './tests/worker-files/thread/testWorker.js'
795 const promises
= new Set()
796 const maxMultiplier
= 2
797 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
798 promises
.add(pool
.execute())
800 await Promise
.all(promises
)
801 for (const workerNode
of pool
.workerNodes
) {
802 expect(workerNode
.usage
).toStrictEqual({
804 executed
: expect
.any(Number
),
812 history
: expect
.any(CircularArray
)
815 history
: expect
.any(CircularArray
)
819 history
: expect
.any(CircularArray
)
822 history
: expect
.any(CircularArray
)
826 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
827 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
828 numberOfWorkers
* maxMultiplier
830 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
831 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
832 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
833 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
835 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
836 for (const workerNode
of pool
.workerNodes
) {
837 expect(workerNode
.usage
).toStrictEqual({
847 history
: expect
.any(CircularArray
)
850 history
: expect
.any(CircularArray
)
854 history
: expect
.any(CircularArray
)
857 history
: expect
.any(CircularArray
)
861 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
862 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
863 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
864 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
869 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
870 const pool
= new DynamicClusterPool(
871 Math
.floor(numberOfWorkers
/ 2),
873 './tests/worker-files/cluster/testWorker.js'
877 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
881 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
882 expect(poolReady
).toBe(1)
883 expect(poolInfo
).toStrictEqual({
885 type
: PoolTypes
.dynamic
,
886 worker
: WorkerTypes
.cluster
,
888 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
889 minSize
: expect
.any(Number
),
890 maxSize
: expect
.any(Number
),
891 workerNodes
: expect
.any(Number
),
892 idleWorkerNodes
: expect
.any(Number
),
893 busyWorkerNodes
: expect
.any(Number
),
894 executedTasks
: expect
.any(Number
),
895 executingTasks
: expect
.any(Number
),
896 failedTasks
: expect
.any(Number
)
901 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
902 const pool
= new FixedThreadPool(
904 './tests/worker-files/thread/testWorker.js'
906 const promises
= new Set()
909 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
913 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
914 promises
.add(pool
.execute())
916 await Promise
.all(promises
)
917 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
918 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
919 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
920 expect(poolInfo
).toStrictEqual({
922 type
: PoolTypes
.fixed
,
923 worker
: WorkerTypes
.thread
,
924 ready
: expect
.any(Boolean
),
925 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
926 minSize
: expect
.any(Number
),
927 maxSize
: expect
.any(Number
),
928 workerNodes
: expect
.any(Number
),
929 idleWorkerNodes
: expect
.any(Number
),
930 busyWorkerNodes
: expect
.any(Number
),
931 executedTasks
: expect
.any(Number
),
932 executingTasks
: expect
.any(Number
),
933 failedTasks
: expect
.any(Number
)
938 it("Verify that pool event emitter 'full' event can register a callback", async () => {
939 const pool
= new DynamicThreadPool(
940 Math
.floor(numberOfWorkers
/ 2),
942 './tests/worker-files/thread/testWorker.js'
944 const promises
= new Set()
947 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
951 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
952 promises
.add(pool
.execute())
954 await Promise
.all(promises
)
955 expect(poolFull
).toBe(1)
956 expect(poolInfo
).toStrictEqual({
958 type
: PoolTypes
.dynamic
,
959 worker
: WorkerTypes
.thread
,
960 ready
: expect
.any(Boolean
),
961 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
962 minSize
: expect
.any(Number
),
963 maxSize
: expect
.any(Number
),
964 workerNodes
: expect
.any(Number
),
965 idleWorkerNodes
: expect
.any(Number
),
966 busyWorkerNodes
: expect
.any(Number
),
967 executedTasks
: expect
.any(Number
),
968 executingTasks
: expect
.any(Number
),
969 failedTasks
: expect
.any(Number
)
974 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
975 const pool
= new FixedThreadPool(
977 './tests/worker-files/thread/testWorker.js',
979 enableTasksQueue
: true
982 sinon
.stub(pool
, 'hasBackPressure').returns(true)
983 const promises
= new Set()
984 let poolBackPressure
= 0
986 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
990 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
991 promises
.add(pool
.execute())
993 await Promise
.all(promises
)
994 expect(poolBackPressure
).toBe(1)
995 expect(poolInfo
).toStrictEqual({
997 type
: PoolTypes
.fixed
,
998 worker
: WorkerTypes
.thread
,
999 ready
: expect
.any(Boolean
),
1000 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1001 minSize
: expect
.any(Number
),
1002 maxSize
: expect
.any(Number
),
1003 workerNodes
: expect
.any(Number
),
1004 idleWorkerNodes
: expect
.any(Number
),
1005 busyWorkerNodes
: expect
.any(Number
),
1006 executedTasks
: expect
.any(Number
),
1007 executingTasks
: expect
.any(Number
),
1008 maxQueuedTasks
: expect
.any(Number
),
1009 queuedTasks
: expect
.any(Number
),
1011 stolenTasks
: expect
.any(Number
),
1012 failedTasks
: expect
.any(Number
)
1014 expect(pool
.hasBackPressure
.called
).toBe(true)
1015 await pool
.destroy()
1018 it('Verify that listTaskFunctions() is working', async () => {
1019 const dynamicThreadPool
= new DynamicThreadPool(
1020 Math
.floor(numberOfWorkers
/ 2),
1022 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1024 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1025 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1027 'jsonIntegerSerialization',
1031 const fixedClusterPool
= new FixedClusterPool(
1033 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1035 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1036 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1038 'jsonIntegerSerialization',
1044 it('Verify that multiple task functions worker is working', async () => {
1045 const pool
= new DynamicClusterPool(
1046 Math
.floor(numberOfWorkers
/ 2),
1048 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1050 const data
= { n
: 10 }
1051 const result0
= await pool
.execute(data
)
1052 expect(result0
).toStrictEqual({ ok
: 1 })
1053 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1054 expect(result1
).toStrictEqual({ ok
: 1 })
1055 const result2
= await pool
.execute(data
, 'factorial')
1056 expect(result2
).toBe(3628800)
1057 const result3
= await pool
.execute(data
, 'fibonacci')
1058 expect(result3
).toBe(55)
1059 expect(pool
.info
.executingTasks
).toBe(0)
1060 expect(pool
.info
.executedTasks
).toBe(4)
1061 for (const workerNode
of pool
.workerNodes
) {
1062 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1064 'jsonIntegerSerialization',
1068 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1069 for (const name
of pool
.listTaskFunctions()) {
1070 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1072 executed
: expect
.any(Number
),
1073 executing
: expect
.any(Number
),
1079 history
: expect
.any(CircularArray
)
1082 history
: expect
.any(CircularArray
)
1086 history
: expect
.any(CircularArray
)
1089 history
: expect
.any(CircularArray
)
1094 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1095 ).toBeGreaterThanOrEqual(0)