1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Queue
} = require('../../../lib/queue')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
26 it('Simulate pool creation from a non main thread/process', () => {
29 new StubPoolWithIsMain(
31 './tests/worker-files/thread/testWorker.js',
33 errorHandler
: (e
) => console
.error(e
)
37 'Cannot start a pool from a worker with the same type as the pool'
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
51 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
58 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
59 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
62 it('Verify that numberOfWorkers is checked', () => {
63 expect(() => new FixedThreadPool()).toThrowError(
64 'Cannot instantiate a pool without specifying the number of workers'
68 it('Verify that a negative number of workers is checked', () => {
71 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
74 'Cannot instantiate a pool with a negative number of workers'
79 it('Verify that a non integer number of workers is checked', () => {
82 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
85 'Cannot instantiate a pool with a non safe integer number of workers'
90 it('Verify that dynamic pool sizing is checked', () => {
93 new DynamicClusterPool(
96 './tests/worker-files/cluster/testWorker.js'
100 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
105 new DynamicThreadPool(
108 './tests/worker-files/thread/testWorker.js'
112 'Cannot instantiate a pool with a non safe integer number of workers'
117 new DynamicClusterPool(
120 './tests/worker-files/cluster/testWorker.js'
124 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
129 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
132 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
137 new DynamicClusterPool(
140 './tests/worker-files/cluster/testWorker.js'
144 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
149 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
152 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
157 it('Verify that pool options are checked', async () => {
158 let pool
= new FixedThreadPool(
160 './tests/worker-files/thread/testWorker.js'
162 expect(pool
.emitter
).toBeDefined()
163 expect(pool
.opts
.enableEvents
).toBe(true)
164 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
165 expect(pool
.opts
.enableTasksQueue
).toBe(false)
166 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
167 expect(pool
.opts
.workerChoiceStrategy
).toBe(
168 WorkerChoiceStrategies
.ROUND_ROBIN
170 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
172 runTime
: { median
: false },
173 waitTime
: { median
: false },
174 elu
: { median
: false }
176 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
178 runTime
: { median
: false },
179 waitTime
: { median
: false },
180 elu
: { median
: false }
182 expect(pool
.opts
.messageHandler
).toBeUndefined()
183 expect(pool
.opts
.errorHandler
).toBeUndefined()
184 expect(pool
.opts
.onlineHandler
).toBeUndefined()
185 expect(pool
.opts
.exitHandler
).toBeUndefined()
187 const testHandler
= () => console
.info('test handler executed')
188 pool
= new FixedThreadPool(
190 './tests/worker-files/thread/testWorker.js',
192 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
193 workerChoiceStrategyOptions
: {
194 runTime
: { median
: true },
195 weights
: { 0: 300, 1: 200 }
198 restartWorkerOnError
: false,
199 enableTasksQueue
: true,
200 tasksQueueOptions
: { concurrency
: 2 },
201 messageHandler
: testHandler
,
202 errorHandler
: testHandler
,
203 onlineHandler
: testHandler
,
204 exitHandler
: testHandler
207 expect(pool
.emitter
).toBeUndefined()
208 expect(pool
.opts
.enableEvents
).toBe(false)
209 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
210 expect(pool
.opts
.enableTasksQueue
).toBe(true)
211 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
212 expect(pool
.opts
.workerChoiceStrategy
).toBe(
213 WorkerChoiceStrategies
.LEAST_USED
215 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
217 runTime
: { median
: true },
218 waitTime
: { median
: false },
219 elu
: { median
: false },
220 weights
: { 0: 300, 1: 200 }
222 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
224 runTime
: { median
: true },
225 waitTime
: { median
: false },
226 elu
: { median
: false },
227 weights
: { 0: 300, 1: 200 }
229 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
230 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
231 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
232 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
236 it('Verify that pool options are validated', async () => {
241 './tests/worker-files/thread/testWorker.js',
243 workerChoiceStrategy
: 'invalidStrategy'
247 new Error("Invalid worker choice strategy 'invalidStrategy'")
253 './tests/worker-files/thread/testWorker.js',
255 workerChoiceStrategyOptions
: { weights
: {} }
260 'Invalid worker choice strategy options: must have a weight for each worker node'
267 './tests/worker-files/thread/testWorker.js',
269 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
274 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
281 './tests/worker-files/thread/testWorker.js',
283 enableTasksQueue
: true,
284 tasksQueueOptions
: { concurrency
: 0 }
289 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
296 './tests/worker-files/thread/testWorker.js',
298 enableTasksQueue
: true,
299 tasksQueueOptions
: 'invalidTasksQueueOptions'
303 new TypeError('Invalid tasks queue options: must be a plain object')
309 './tests/worker-files/thread/testWorker.js',
311 enableTasksQueue
: true,
312 tasksQueueOptions
: { concurrency
: 0.2 }
316 new TypeError('Invalid worker tasks concurrency: must be an integer')
320 it('Verify that pool worker choice strategy options can be set', async () => {
321 const pool
= new FixedThreadPool(
323 './tests/worker-files/thread/testWorker.js',
324 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
326 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
328 runTime
: { median
: false },
329 waitTime
: { median
: false },
330 elu
: { median
: false }
332 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
334 runTime
: { median
: false },
335 waitTime
: { median
: false },
336 elu
: { median
: false }
338 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
339 .workerChoiceStrategies
) {
340 expect(workerChoiceStrategy
.opts
).toStrictEqual({
342 runTime
: { median
: false },
343 waitTime
: { median
: false },
344 elu
: { median
: false }
348 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
366 pool
.setWorkerChoiceStrategyOptions({
367 runTime
: { median
: true },
368 elu
: { median
: true }
370 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
372 runTime
: { median
: true },
373 waitTime
: { median
: false },
374 elu
: { median
: true }
376 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
378 runTime
: { median
: true },
379 waitTime
: { median
: false },
380 elu
: { median
: true }
382 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
383 .workerChoiceStrategies
) {
384 expect(workerChoiceStrategy
.opts
).toStrictEqual({
386 runTime
: { median
: true },
387 waitTime
: { median
: false },
388 elu
: { median
: true }
392 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
410 pool
.setWorkerChoiceStrategyOptions({
411 runTime
: { median
: false },
412 elu
: { median
: false }
414 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
416 runTime
: { median
: false },
417 waitTime
: { median
: false },
418 elu
: { median
: false }
420 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
422 runTime
: { median
: false },
423 waitTime
: { median
: false },
424 elu
: { median
: false }
426 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
427 .workerChoiceStrategies
) {
428 expect(workerChoiceStrategy
.opts
).toStrictEqual({
430 runTime
: { median
: false },
431 waitTime
: { median
: false },
432 elu
: { median
: false }
436 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
455 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
458 'Invalid worker choice strategy options: must be a plain object'
462 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
465 'Invalid worker choice strategy options: must have a weight for each worker node'
469 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
472 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
478 it('Verify that pool tasks queue can be enabled/disabled', async () => {
479 const pool
= new FixedThreadPool(
481 './tests/worker-files/thread/testWorker.js'
483 expect(pool
.opts
.enableTasksQueue
).toBe(false)
484 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
485 pool
.enableTasksQueue(true)
486 expect(pool
.opts
.enableTasksQueue
).toBe(true)
487 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
488 pool
.enableTasksQueue(true, { concurrency
: 2 })
489 expect(pool
.opts
.enableTasksQueue
).toBe(true)
490 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
491 pool
.enableTasksQueue(false)
492 expect(pool
.opts
.enableTasksQueue
).toBe(false)
493 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
497 it('Verify that pool tasks queue options can be set', async () => {
498 const pool
= new FixedThreadPool(
500 './tests/worker-files/thread/testWorker.js',
501 { enableTasksQueue
: true }
503 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
504 pool
.setTasksQueueOptions({ concurrency
: 2 })
505 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
507 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
509 new TypeError('Invalid tasks queue options: must be a plain object')
511 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
513 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
516 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
518 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
521 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
522 new TypeError('Invalid worker tasks concurrency: must be an integer')
527 it('Verify that pool info is set', async () => {
528 let pool
= new FixedThreadPool(
530 './tests/worker-files/thread/testWorker.js'
532 expect(pool
.info
).toStrictEqual({
534 type
: PoolTypes
.fixed
,
535 worker
: WorkerTypes
.thread
,
537 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
538 minSize
: numberOfWorkers
,
539 maxSize
: numberOfWorkers
,
540 workerNodes
: numberOfWorkers
,
541 idleWorkerNodes
: numberOfWorkers
,
548 pool
= new DynamicClusterPool(
549 Math
.floor(numberOfWorkers
/ 2),
551 './tests/worker-files/cluster/testWorker.js'
553 expect(pool
.info
).toStrictEqual({
555 type
: PoolTypes
.dynamic
,
556 worker
: WorkerTypes
.cluster
,
558 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
559 minSize
: Math
.floor(numberOfWorkers
/ 2),
560 maxSize
: numberOfWorkers
,
561 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
562 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
571 it('Verify that pool worker tasks usage are initialized', async () => {
572 const pool
= new FixedClusterPool(
574 './tests/worker-files/cluster/testWorker.js'
576 for (const workerNode
of pool
.workerNodes
) {
577 expect(workerNode
.usage
).toStrictEqual({
586 history
: expect
.any(CircularArray
)
589 history
: expect
.any(CircularArray
)
593 history
: expect
.any(CircularArray
)
596 history
: expect
.any(CircularArray
)
604 it('Verify that pool worker tasks queue are initialized', async () => {
605 let pool
= new FixedClusterPool(
607 './tests/worker-files/cluster/testWorker.js'
609 for (const workerNode
of pool
.workerNodes
) {
610 expect(workerNode
.tasksQueue
).toBeDefined()
611 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
612 expect(workerNode
.tasksQueue
.size
).toBe(0)
613 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
616 pool
= new DynamicThreadPool(
617 Math
.floor(numberOfWorkers
/ 2),
619 './tests/worker-files/thread/testWorker.js'
621 for (const workerNode
of pool
.workerNodes
) {
622 expect(workerNode
.tasksQueue
).toBeDefined()
623 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
624 expect(workerNode
.tasksQueue
.size
).toBe(0)
625 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
629 it('Verify that pool worker info are initialized', async () => {
630 let pool
= new FixedClusterPool(
632 './tests/worker-files/cluster/testWorker.js'
634 for (const workerNode
of pool
.workerNodes
) {
635 expect(workerNode
.info
).toStrictEqual({
636 id
: expect
.any(Number
),
637 type
: WorkerTypes
.cluster
,
643 pool
= new DynamicThreadPool(
644 Math
.floor(numberOfWorkers
/ 2),
646 './tests/worker-files/thread/testWorker.js'
648 for (const workerNode
of pool
.workerNodes
) {
649 expect(workerNode
.info
).toStrictEqual({
650 id
: expect
.any(Number
),
651 type
: WorkerTypes
.thread
,
658 it('Verify that pool worker tasks usage are computed', async () => {
659 const pool
= new FixedClusterPool(
661 './tests/worker-files/cluster/testWorker.js'
663 const promises
= new Set()
664 const maxMultiplier
= 2
665 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
666 promises
.add(pool
.execute())
668 for (const workerNode
of pool
.workerNodes
) {
669 expect(workerNode
.usage
).toStrictEqual({
672 executing
: maxMultiplier
,
678 history
: expect
.any(CircularArray
)
681 history
: expect
.any(CircularArray
)
685 history
: expect
.any(CircularArray
)
688 history
: expect
.any(CircularArray
)
693 await Promise
.all(promises
)
694 for (const workerNode
of pool
.workerNodes
) {
695 expect(workerNode
.usage
).toStrictEqual({
697 executed
: maxMultiplier
,
704 history
: expect
.any(CircularArray
)
707 history
: expect
.any(CircularArray
)
711 history
: expect
.any(CircularArray
)
714 history
: expect
.any(CircularArray
)
722 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
723 const pool
= new DynamicThreadPool(
724 Math
.floor(numberOfWorkers
/ 2),
726 './tests/worker-files/thread/testWorker.js'
728 const promises
= new Set()
729 const maxMultiplier
= 2
730 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
731 promises
.add(pool
.execute())
733 await Promise
.all(promises
)
734 for (const workerNode
of pool
.workerNodes
) {
735 expect(workerNode
.usage
).toStrictEqual({
737 executed
: expect
.any(Number
),
744 history
: expect
.any(CircularArray
)
747 history
: expect
.any(CircularArray
)
751 history
: expect
.any(CircularArray
)
754 history
: expect
.any(CircularArray
)
758 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
759 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
760 numberOfWorkers
* maxMultiplier
762 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
763 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
764 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
765 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
767 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
768 for (const workerNode
of pool
.workerNodes
) {
769 expect(workerNode
.usage
).toStrictEqual({
778 history
: expect
.any(CircularArray
)
781 history
: expect
.any(CircularArray
)
785 history
: expect
.any(CircularArray
)
788 history
: expect
.any(CircularArray
)
792 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
793 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
794 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
795 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
800 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
801 const pool
= new DynamicClusterPool(
802 Math
.floor(numberOfWorkers
/ 2),
804 './tests/worker-files/cluster/testWorker.js'
808 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
812 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
813 expect(poolReady
).toBe(1)
814 expect(poolInfo
).toStrictEqual({
816 type
: PoolTypes
.dynamic
,
817 worker
: WorkerTypes
.cluster
,
819 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
820 minSize
: expect
.any(Number
),
821 maxSize
: expect
.any(Number
),
822 workerNodes
: expect
.any(Number
),
823 idleWorkerNodes
: expect
.any(Number
),
824 busyWorkerNodes
: expect
.any(Number
),
825 executedTasks
: expect
.any(Number
),
826 executingTasks
: expect
.any(Number
),
827 failedTasks
: expect
.any(Number
)
832 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
833 const pool
= new FixedThreadPool(
835 './tests/worker-files/thread/testWorker.js'
837 const promises
= new Set()
840 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
844 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
845 promises
.add(pool
.execute())
847 await Promise
.all(promises
)
848 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
849 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
850 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
851 expect(poolInfo
).toStrictEqual({
853 type
: PoolTypes
.fixed
,
854 worker
: WorkerTypes
.thread
,
855 ready
: expect
.any(Boolean
),
856 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
857 minSize
: expect
.any(Number
),
858 maxSize
: expect
.any(Number
),
859 workerNodes
: expect
.any(Number
),
860 idleWorkerNodes
: expect
.any(Number
),
861 busyWorkerNodes
: expect
.any(Number
),
862 executedTasks
: expect
.any(Number
),
863 executingTasks
: expect
.any(Number
),
864 failedTasks
: expect
.any(Number
)
869 it("Verify that pool event emitter 'full' event can register a callback", async () => {
870 const pool
= new DynamicThreadPool(
871 Math
.floor(numberOfWorkers
/ 2),
873 './tests/worker-files/thread/testWorker.js'
875 const promises
= new Set()
878 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
882 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
883 promises
.add(pool
.execute())
885 await Promise
.all(promises
)
886 expect(poolFull
).toBe(1)
887 expect(poolInfo
).toStrictEqual({
889 type
: PoolTypes
.dynamic
,
890 worker
: WorkerTypes
.thread
,
891 ready
: expect
.any(Boolean
),
892 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
893 minSize
: expect
.any(Number
),
894 maxSize
: expect
.any(Number
),
895 workerNodes
: expect
.any(Number
),
896 idleWorkerNodes
: expect
.any(Number
),
897 busyWorkerNodes
: expect
.any(Number
),
898 executedTasks
: expect
.any(Number
),
899 executingTasks
: expect
.any(Number
),
900 failedTasks
: expect
.any(Number
)
905 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
906 const pool
= new FixedThreadPool(
908 './tests/worker-files/thread/testWorker.js',
910 enableTasksQueue
: true
913 sinon
.stub(pool
, 'hasBackPressure').returns(true)
914 const promises
= new Set()
915 let poolBackPressure
= 0
917 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
921 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
922 promises
.add(pool
.execute())
924 await Promise
.all(promises
)
925 expect(poolBackPressure
).toBe(2)
926 expect(poolInfo
).toStrictEqual({
928 type
: PoolTypes
.fixed
,
929 worker
: WorkerTypes
.thread
,
930 ready
: expect
.any(Boolean
),
931 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
932 minSize
: expect
.any(Number
),
933 maxSize
: expect
.any(Number
),
934 workerNodes
: expect
.any(Number
),
935 idleWorkerNodes
: expect
.any(Number
),
936 busyWorkerNodes
: expect
.any(Number
),
937 executedTasks
: expect
.any(Number
),
938 executingTasks
: expect
.any(Number
),
939 maxQueuedTasks
: expect
.any(Number
),
940 queuedTasks
: expect
.any(Number
),
942 failedTasks
: expect
.any(Number
)
944 expect(pool
.hasBackPressure
.called
).toBe(true)
948 it('Verify that listTaskFunctions() is working', async () => {
949 const dynamicThreadPool
= new DynamicThreadPool(
950 Math
.floor(numberOfWorkers
/ 2),
952 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
954 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
955 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
957 'jsonIntegerSerialization',
961 const fixedClusterPool
= new FixedClusterPool(
963 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
965 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
966 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
968 'jsonIntegerSerialization',
974 it('Verify that multiple task functions worker is working', async () => {
975 const pool
= new DynamicClusterPool(
976 Math
.floor(numberOfWorkers
/ 2),
978 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
980 const data
= { n
: 10 }
981 const result0
= await pool
.execute(data
)
982 expect(result0
).toStrictEqual({ ok
: 1 })
983 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
984 expect(result1
).toStrictEqual({ ok
: 1 })
985 const result2
= await pool
.execute(data
, 'factorial')
986 expect(result2
).toBe(3628800)
987 const result3
= await pool
.execute(data
, 'fibonacci')
988 expect(result3
).toBe(55)
989 expect(pool
.info
.executingTasks
).toBe(0)
990 expect(pool
.info
.executedTasks
).toBe(4)
991 for (const workerNode
of pool
.workerNodes
) {
992 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
994 'jsonIntegerSerialization',
998 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
999 for (const name
of pool
.listTaskFunctions()) {
1000 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1002 executed
: expect
.any(Number
),
1003 executing
: expect
.any(Number
),
1008 history
: expect
.any(CircularArray
)
1011 history
: expect
.any(CircularArray
)
1015 history
: expect
.any(CircularArray
)
1018 history
: expect
.any(CircularArray
)
1023 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1024 ).toBeGreaterThanOrEqual(0)