1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Queue
} = require('../../../lib/queue')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
26 it('Simulate pool creation from a non main thread/process', () => {
29 new StubPoolWithIsMain(
31 './tests/worker-files/thread/testWorker.js',
33 errorHandler
: (e
) => console
.error(e
)
38 'Cannot start a pool from a worker with the same type as the pool'
43 it('Verify that filePath is checked', () => {
44 const expectedError
= new Error(
45 'Please specify a file with a worker implementation'
47 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
50 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
56 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
60 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
61 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
64 it('Verify that numberOfWorkers is checked', () => {
65 expect(() => new FixedThreadPool()).toThrowError(
67 'Cannot instantiate a pool without specifying the number of workers'
72 it('Verify that a negative number of workers is checked', () => {
75 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
78 'Cannot instantiate a pool with a negative number of workers'
83 it('Verify that a non integer number of workers is checked', () => {
86 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
89 'Cannot instantiate a pool with a non safe integer number of workers'
94 it('Verify that dynamic pool sizing is checked', () => {
97 new DynamicClusterPool(
100 './tests/worker-files/cluster/testWorker.js'
104 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
109 new DynamicThreadPool(
112 './tests/worker-files/thread/testWorker.js'
116 'Cannot instantiate a pool with a non safe integer number of workers'
121 new DynamicClusterPool(
124 './tests/worker-files/cluster/testWorker.js'
128 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
133 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
136 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
141 new DynamicClusterPool(
144 './tests/worker-files/cluster/testWorker.js'
148 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
153 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
156 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
161 it('Verify that pool options are checked', async () => {
162 let pool
= new FixedThreadPool(
164 './tests/worker-files/thread/testWorker.js'
166 expect(pool
.emitter
).toBeDefined()
167 expect(pool
.opts
.enableEvents
).toBe(true)
168 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
169 expect(pool
.opts
.enableTasksQueue
).toBe(false)
170 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
171 expect(pool
.opts
.workerChoiceStrategy
).toBe(
172 WorkerChoiceStrategies
.ROUND_ROBIN
174 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
176 runTime
: { median
: false },
177 waitTime
: { median
: false },
178 elu
: { median
: false }
180 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
182 runTime
: { median
: false },
183 waitTime
: { median
: false },
184 elu
: { median
: false }
186 expect(pool
.opts
.messageHandler
).toBeUndefined()
187 expect(pool
.opts
.errorHandler
).toBeUndefined()
188 expect(pool
.opts
.onlineHandler
).toBeUndefined()
189 expect(pool
.opts
.exitHandler
).toBeUndefined()
191 const testHandler
= () => console
.info('test handler executed')
192 pool
= new FixedThreadPool(
194 './tests/worker-files/thread/testWorker.js',
196 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
197 workerChoiceStrategyOptions
: {
198 runTime
: { median
: true },
199 weights
: { 0: 300, 1: 200 }
202 restartWorkerOnError
: false,
203 enableTasksQueue
: true,
204 tasksQueueOptions
: { concurrency
: 2 },
205 messageHandler
: testHandler
,
206 errorHandler
: testHandler
,
207 onlineHandler
: testHandler
,
208 exitHandler
: testHandler
211 expect(pool
.emitter
).toBeUndefined()
212 expect(pool
.opts
.enableEvents
).toBe(false)
213 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
214 expect(pool
.opts
.enableTasksQueue
).toBe(true)
215 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
216 expect(pool
.opts
.workerChoiceStrategy
).toBe(
217 WorkerChoiceStrategies
.LEAST_USED
219 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
221 runTime
: { median
: true },
222 waitTime
: { median
: false },
223 elu
: { median
: false },
224 weights
: { 0: 300, 1: 200 }
226 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
228 runTime
: { median
: true },
229 waitTime
: { median
: false },
230 elu
: { median
: false },
231 weights
: { 0: 300, 1: 200 }
233 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
234 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
235 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
236 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
240 it('Verify that pool options are validated', async () => {
245 './tests/worker-files/thread/testWorker.js',
247 workerChoiceStrategy
: 'invalidStrategy'
251 new Error("Invalid worker choice strategy 'invalidStrategy'")
257 './tests/worker-files/thread/testWorker.js',
259 workerChoiceStrategyOptions
: { weights
: {} }
264 'Invalid worker choice strategy options: must have a weight for each worker node'
271 './tests/worker-files/thread/testWorker.js',
273 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
278 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
285 './tests/worker-files/thread/testWorker.js',
287 enableTasksQueue
: true,
288 tasksQueueOptions
: { concurrency
: 0 }
293 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
300 './tests/worker-files/thread/testWorker.js',
302 enableTasksQueue
: true,
303 tasksQueueOptions
: 'invalidTasksQueueOptions'
307 new TypeError('Invalid tasks queue options: must be a plain object')
313 './tests/worker-files/thread/testWorker.js',
315 enableTasksQueue
: true,
316 tasksQueueOptions
: { concurrency
: 0.2 }
320 new TypeError('Invalid worker tasks concurrency: must be an integer')
324 it('Verify that pool worker choice strategy options can be set', async () => {
325 const pool
= new FixedThreadPool(
327 './tests/worker-files/thread/testWorker.js',
328 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
330 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
332 runTime
: { median
: false },
333 waitTime
: { median
: false },
334 elu
: { median
: false }
336 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
338 runTime
: { median
: false },
339 waitTime
: { median
: false },
340 elu
: { median
: false }
342 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
343 .workerChoiceStrategies
) {
344 expect(workerChoiceStrategy
.opts
).toStrictEqual({
346 runTime
: { median
: false },
347 waitTime
: { median
: false },
348 elu
: { median
: false }
352 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
370 pool
.setWorkerChoiceStrategyOptions({
371 runTime
: { median
: true },
372 elu
: { median
: true }
374 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
376 runTime
: { median
: true },
377 waitTime
: { median
: false },
378 elu
: { median
: true }
380 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
382 runTime
: { median
: true },
383 waitTime
: { median
: false },
384 elu
: { median
: true }
386 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
387 .workerChoiceStrategies
) {
388 expect(workerChoiceStrategy
.opts
).toStrictEqual({
390 runTime
: { median
: true },
391 waitTime
: { median
: false },
392 elu
: { median
: true }
396 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
414 pool
.setWorkerChoiceStrategyOptions({
415 runTime
: { median
: false },
416 elu
: { median
: false }
418 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
420 runTime
: { median
: false },
421 waitTime
: { median
: false },
422 elu
: { median
: false }
424 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
426 runTime
: { median
: false },
427 waitTime
: { median
: false },
428 elu
: { median
: false }
430 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
431 .workerChoiceStrategies
) {
432 expect(workerChoiceStrategy
.opts
).toStrictEqual({
434 runTime
: { median
: false },
435 waitTime
: { median
: false },
436 elu
: { median
: false }
440 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
459 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
462 'Invalid worker choice strategy options: must be a plain object'
466 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
469 'Invalid worker choice strategy options: must have a weight for each worker node'
473 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
476 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
482 it('Verify that pool tasks queue can be enabled/disabled', async () => {
483 const pool
= new FixedThreadPool(
485 './tests/worker-files/thread/testWorker.js'
487 expect(pool
.opts
.enableTasksQueue
).toBe(false)
488 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
489 pool
.enableTasksQueue(true)
490 expect(pool
.opts
.enableTasksQueue
).toBe(true)
491 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
492 pool
.enableTasksQueue(true, { concurrency
: 2 })
493 expect(pool
.opts
.enableTasksQueue
).toBe(true)
494 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
495 pool
.enableTasksQueue(false)
496 expect(pool
.opts
.enableTasksQueue
).toBe(false)
497 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
501 it('Verify that pool tasks queue options can be set', async () => {
502 const pool
= new FixedThreadPool(
504 './tests/worker-files/thread/testWorker.js',
505 { enableTasksQueue
: true }
507 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
508 pool
.setTasksQueueOptions({ concurrency
: 2 })
509 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
511 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
513 new TypeError('Invalid tasks queue options: must be a plain object')
515 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
517 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
520 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
522 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
525 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
526 new TypeError('Invalid worker tasks concurrency: must be an integer')
531 it('Verify that pool info is set', async () => {
532 let pool
= new FixedThreadPool(
534 './tests/worker-files/thread/testWorker.js'
536 expect(pool
.info
).toStrictEqual({
538 type
: PoolTypes
.fixed
,
539 worker
: WorkerTypes
.thread
,
541 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
542 minSize
: numberOfWorkers
,
543 maxSize
: numberOfWorkers
,
544 workerNodes
: numberOfWorkers
,
545 idleWorkerNodes
: numberOfWorkers
,
552 pool
= new DynamicClusterPool(
553 Math
.floor(numberOfWorkers
/ 2),
555 './tests/worker-files/cluster/testWorker.js'
557 expect(pool
.info
).toStrictEqual({
559 type
: PoolTypes
.dynamic
,
560 worker
: WorkerTypes
.cluster
,
562 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
563 minSize
: Math
.floor(numberOfWorkers
/ 2),
564 maxSize
: numberOfWorkers
,
565 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
566 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
575 it('Verify that pool worker tasks usage are initialized', async () => {
576 const pool
= new FixedClusterPool(
578 './tests/worker-files/cluster/testWorker.js'
580 for (const workerNode
of pool
.workerNodes
) {
581 expect(workerNode
.usage
).toStrictEqual({
590 history
: expect
.any(CircularArray
)
593 history
: expect
.any(CircularArray
)
597 history
: expect
.any(CircularArray
)
600 history
: expect
.any(CircularArray
)
608 it('Verify that pool worker tasks queue are initialized', async () => {
609 let pool
= new FixedClusterPool(
611 './tests/worker-files/cluster/testWorker.js'
613 for (const workerNode
of pool
.workerNodes
) {
614 expect(workerNode
.tasksQueue
).toBeDefined()
615 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
616 expect(workerNode
.tasksQueue
.size
).toBe(0)
617 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
620 pool
= new DynamicThreadPool(
621 Math
.floor(numberOfWorkers
/ 2),
623 './tests/worker-files/thread/testWorker.js'
625 for (const workerNode
of pool
.workerNodes
) {
626 expect(workerNode
.tasksQueue
).toBeDefined()
627 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
628 expect(workerNode
.tasksQueue
.size
).toBe(0)
629 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
633 it('Verify that pool worker info are initialized', async () => {
634 let pool
= new FixedClusterPool(
636 './tests/worker-files/cluster/testWorker.js'
638 for (const workerNode
of pool
.workerNodes
) {
639 expect(workerNode
.info
).toStrictEqual({
640 id
: expect
.any(Number
),
641 type
: WorkerTypes
.cluster
,
647 pool
= new DynamicThreadPool(
648 Math
.floor(numberOfWorkers
/ 2),
650 './tests/worker-files/thread/testWorker.js'
652 for (const workerNode
of pool
.workerNodes
) {
653 expect(workerNode
.info
).toStrictEqual({
654 id
: expect
.any(Number
),
655 type
: WorkerTypes
.thread
,
662 it('Verify that pool worker tasks usage are computed', async () => {
663 const pool
= new FixedClusterPool(
665 './tests/worker-files/cluster/testWorker.js'
667 const promises
= new Set()
668 const maxMultiplier
= 2
669 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
670 promises
.add(pool
.execute())
672 for (const workerNode
of pool
.workerNodes
) {
673 expect(workerNode
.usage
).toStrictEqual({
676 executing
: maxMultiplier
,
682 history
: expect
.any(CircularArray
)
685 history
: expect
.any(CircularArray
)
689 history
: expect
.any(CircularArray
)
692 history
: expect
.any(CircularArray
)
697 await Promise
.all(promises
)
698 for (const workerNode
of pool
.workerNodes
) {
699 expect(workerNode
.usage
).toStrictEqual({
701 executed
: maxMultiplier
,
708 history
: expect
.any(CircularArray
)
711 history
: expect
.any(CircularArray
)
715 history
: expect
.any(CircularArray
)
718 history
: expect
.any(CircularArray
)
726 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
727 const pool
= new DynamicThreadPool(
728 Math
.floor(numberOfWorkers
/ 2),
730 './tests/worker-files/thread/testWorker.js'
732 const promises
= new Set()
733 const maxMultiplier
= 2
734 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
735 promises
.add(pool
.execute())
737 await Promise
.all(promises
)
738 for (const workerNode
of pool
.workerNodes
) {
739 expect(workerNode
.usage
).toStrictEqual({
741 executed
: expect
.any(Number
),
748 history
: expect
.any(CircularArray
)
751 history
: expect
.any(CircularArray
)
755 history
: expect
.any(CircularArray
)
758 history
: expect
.any(CircularArray
)
762 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
763 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
764 numberOfWorkers
* maxMultiplier
766 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
767 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
768 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
769 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
771 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
772 for (const workerNode
of pool
.workerNodes
) {
773 expect(workerNode
.usage
).toStrictEqual({
782 history
: expect
.any(CircularArray
)
785 history
: expect
.any(CircularArray
)
789 history
: expect
.any(CircularArray
)
792 history
: expect
.any(CircularArray
)
796 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
797 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
798 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
799 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
804 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
805 const pool
= new DynamicClusterPool(
806 Math
.floor(numberOfWorkers
/ 2),
808 './tests/worker-files/cluster/testWorker.js'
812 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
816 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
817 expect(poolReady
).toBe(1)
818 expect(poolInfo
).toStrictEqual({
820 type
: PoolTypes
.dynamic
,
821 worker
: WorkerTypes
.cluster
,
823 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
824 minSize
: expect
.any(Number
),
825 maxSize
: expect
.any(Number
),
826 workerNodes
: expect
.any(Number
),
827 idleWorkerNodes
: expect
.any(Number
),
828 busyWorkerNodes
: expect
.any(Number
),
829 executedTasks
: expect
.any(Number
),
830 executingTasks
: expect
.any(Number
),
831 failedTasks
: expect
.any(Number
)
836 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
837 const pool
= new FixedThreadPool(
839 './tests/worker-files/thread/testWorker.js'
841 const promises
= new Set()
844 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
848 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
849 promises
.add(pool
.execute())
851 await Promise
.all(promises
)
852 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
853 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
854 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
855 expect(poolInfo
).toStrictEqual({
857 type
: PoolTypes
.fixed
,
858 worker
: WorkerTypes
.thread
,
859 ready
: expect
.any(Boolean
),
860 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
861 minSize
: expect
.any(Number
),
862 maxSize
: expect
.any(Number
),
863 workerNodes
: expect
.any(Number
),
864 idleWorkerNodes
: expect
.any(Number
),
865 busyWorkerNodes
: expect
.any(Number
),
866 executedTasks
: expect
.any(Number
),
867 executingTasks
: expect
.any(Number
),
868 failedTasks
: expect
.any(Number
)
873 it("Verify that pool event emitter 'full' event can register a callback", async () => {
874 const pool
= new DynamicThreadPool(
875 Math
.floor(numberOfWorkers
/ 2),
877 './tests/worker-files/thread/testWorker.js'
879 const promises
= new Set()
882 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
886 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
887 promises
.add(pool
.execute())
889 await Promise
.all(promises
)
890 expect(poolFull
).toBe(1)
891 expect(poolInfo
).toStrictEqual({
893 type
: PoolTypes
.dynamic
,
894 worker
: WorkerTypes
.thread
,
895 ready
: expect
.any(Boolean
),
896 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
897 minSize
: expect
.any(Number
),
898 maxSize
: expect
.any(Number
),
899 workerNodes
: expect
.any(Number
),
900 idleWorkerNodes
: expect
.any(Number
),
901 busyWorkerNodes
: expect
.any(Number
),
902 executedTasks
: expect
.any(Number
),
903 executingTasks
: expect
.any(Number
),
904 failedTasks
: expect
.any(Number
)
909 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
910 const pool
= new FixedThreadPool(
912 './tests/worker-files/thread/testWorker.js',
914 enableTasksQueue
: true
917 sinon
.stub(pool
, 'hasBackPressure').returns(true)
918 const promises
= new Set()
919 let poolBackPressure
= 0
921 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
925 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
926 promises
.add(pool
.execute())
928 await Promise
.all(promises
)
929 expect(poolBackPressure
).toBe(2)
930 expect(poolInfo
).toStrictEqual({
932 type
: PoolTypes
.fixed
,
933 worker
: WorkerTypes
.thread
,
934 ready
: expect
.any(Boolean
),
935 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
936 minSize
: expect
.any(Number
),
937 maxSize
: expect
.any(Number
),
938 workerNodes
: expect
.any(Number
),
939 idleWorkerNodes
: expect
.any(Number
),
940 busyWorkerNodes
: expect
.any(Number
),
941 executedTasks
: expect
.any(Number
),
942 executingTasks
: expect
.any(Number
),
943 maxQueuedTasks
: expect
.any(Number
),
944 queuedTasks
: expect
.any(Number
),
946 failedTasks
: expect
.any(Number
)
948 expect(pool
.hasBackPressure
.called
).toBe(true)
952 it('Verify that listTaskFunctions() is working', async () => {
953 const dynamicThreadPool
= new DynamicThreadPool(
954 Math
.floor(numberOfWorkers
/ 2),
956 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
958 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
959 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
961 'jsonIntegerSerialization',
965 const fixedClusterPool
= new FixedClusterPool(
967 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
969 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
970 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
972 'jsonIntegerSerialization',
978 it('Verify that multiple task functions worker is working', async () => {
979 const pool
= new DynamicClusterPool(
980 Math
.floor(numberOfWorkers
/ 2),
982 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
984 const data
= { n
: 10 }
985 const result0
= await pool
.execute(data
)
986 expect(result0
).toStrictEqual({ ok
: 1 })
987 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
988 expect(result1
).toStrictEqual({ ok
: 1 })
989 const result2
= await pool
.execute(data
, 'factorial')
990 expect(result2
).toBe(3628800)
991 const result3
= await pool
.execute(data
, 'fibonacci')
992 expect(result3
).toBe(55)
993 expect(pool
.info
.executingTasks
).toBe(0)
994 expect(pool
.info
.executedTasks
).toBe(4)
995 for (const workerNode
of pool
.workerNodes
) {
996 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
998 'jsonIntegerSerialization',
1002 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1003 for (const name
of pool
.listTaskFunctions()) {
1004 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1006 executed
: expect
.any(Number
),
1007 executing
: expect
.any(Number
),
1012 history
: expect
.any(CircularArray
)
1015 history
: expect
.any(CircularArray
)
1019 history
: expect
.any(CircularArray
)
1022 history
: expect
.any(CircularArray
)
1027 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1028 ).toBeGreaterThanOrEqual(0)