1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
16 const { version
} = require('../../../package.json')
17 const { waitPoolEvents
} = require('../../test-utils')
19 describe('Abstract pool test suite', () => {
20 const numberOfWorkers
= 2
21 class StubPoolWithIsMain
extends FixedThreadPool
{
31 it('Simulate pool creation from a non main thread/process', () => {
34 new StubPoolWithIsMain(
36 './tests/worker-files/thread/testWorker.js',
38 errorHandler
: (e
) => console
.error(e
)
43 'Cannot start a pool from a worker with the same type as the pool'
48 it('Verify that pool statuses properties are set', async () => {
49 const pool
= new FixedThreadPool(
51 './tests/worker-files/thread/testWorker.js'
53 expect(pool
.starting
).toBe(false)
54 expect(pool
.started
).toBe(true)
58 it('Verify that filePath is checked', () => {
59 const expectedError
= new Error(
60 'Please specify a file with a worker implementation'
62 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
65 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
68 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
71 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
75 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
76 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
79 it('Verify that numberOfWorkers is checked', () => {
80 expect(() => new FixedThreadPool()).toThrowError(
82 'Cannot instantiate a pool without specifying the number of workers'
87 it('Verify that a negative number of workers is checked', () => {
90 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
93 'Cannot instantiate a pool with a negative number of workers'
98 it('Verify that a non integer number of workers is checked', () => {
101 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
104 'Cannot instantiate a pool with a non safe integer number of workers'
109 it('Verify that dynamic pool sizing is checked', () => {
112 new DynamicClusterPool(
115 './tests/worker-files/cluster/testWorker.js'
119 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 new DynamicThreadPool(
127 './tests/worker-files/thread/testWorker.js'
131 'Cannot instantiate a pool with a non safe integer number of workers'
136 new DynamicClusterPool(
139 './tests/worker-files/cluster/testWorker.js'
143 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
151 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
156 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
159 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
164 new DynamicClusterPool(
167 './tests/worker-files/cluster/testWorker.js'
171 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
176 it('Verify that pool options are checked', async () => {
177 let pool
= new FixedThreadPool(
179 './tests/worker-files/thread/testWorker.js'
181 expect(pool
.emitter
).toBeDefined()
182 expect(pool
.opts
.enableEvents
).toBe(true)
183 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
184 expect(pool
.opts
.enableTasksQueue
).toBe(false)
185 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
186 expect(pool
.opts
.workerChoiceStrategy
).toBe(
187 WorkerChoiceStrategies
.ROUND_ROBIN
189 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
191 runTime
: { median
: false },
192 waitTime
: { median
: false },
193 elu
: { median
: false }
195 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
197 runTime
: { median
: false },
198 waitTime
: { median
: false },
199 elu
: { median
: false }
201 expect(pool
.opts
.messageHandler
).toBeUndefined()
202 expect(pool
.opts
.errorHandler
).toBeUndefined()
203 expect(pool
.opts
.onlineHandler
).toBeUndefined()
204 expect(pool
.opts
.exitHandler
).toBeUndefined()
206 const testHandler
= () => console
.info('test handler executed')
207 pool
= new FixedThreadPool(
209 './tests/worker-files/thread/testWorker.js',
211 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
212 workerChoiceStrategyOptions
: {
213 runTime
: { median
: true },
214 weights
: { 0: 300, 1: 200 }
217 restartWorkerOnError
: false,
218 enableTasksQueue
: true,
219 tasksQueueOptions
: { concurrency
: 2 },
220 messageHandler
: testHandler
,
221 errorHandler
: testHandler
,
222 onlineHandler
: testHandler
,
223 exitHandler
: testHandler
226 expect(pool
.emitter
).toBeUndefined()
227 expect(pool
.opts
.enableEvents
).toBe(false)
228 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
229 expect(pool
.opts
.enableTasksQueue
).toBe(true)
230 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
234 expect(pool
.opts
.workerChoiceStrategy
).toBe(
235 WorkerChoiceStrategies
.LEAST_USED
237 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
239 runTime
: { median
: true },
240 waitTime
: { median
: false },
241 elu
: { median
: false },
242 weights
: { 0: 300, 1: 200 }
244 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
246 runTime
: { median
: true },
247 waitTime
: { median
: false },
248 elu
: { median
: false },
249 weights
: { 0: 300, 1: 200 }
251 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
252 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
253 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
254 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
258 it('Verify that pool options are validated', async () => {
263 './tests/worker-files/thread/testWorker.js',
265 workerChoiceStrategy
: 'invalidStrategy'
269 new Error("Invalid worker choice strategy 'invalidStrategy'")
275 './tests/worker-files/thread/testWorker.js',
277 workerChoiceStrategyOptions
: {
278 retries
: 'invalidChoiceRetries'
284 'Invalid worker choice strategy options: retries must be an integer'
291 './tests/worker-files/thread/testWorker.js',
293 workerChoiceStrategyOptions
: {
300 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
307 './tests/worker-files/thread/testWorker.js',
309 workerChoiceStrategyOptions
: { weights
: {} }
314 'Invalid worker choice strategy options: must have a weight for each worker node'
321 './tests/worker-files/thread/testWorker.js',
323 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
328 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
335 './tests/worker-files/thread/testWorker.js',
337 enableTasksQueue
: true,
338 tasksQueueOptions
: 'invalidTasksQueueOptions'
342 new TypeError('Invalid tasks queue options: must be a plain object')
348 './tests/worker-files/thread/testWorker.js',
350 enableTasksQueue
: true,
351 tasksQueueOptions
: { concurrency
: 0 }
356 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
363 './tests/worker-files/thread/testWorker.js',
365 enableTasksQueue
: true,
366 tasksQueueOptions
: { concurrency
: -1 }
371 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
378 './tests/worker-files/thread/testWorker.js',
380 enableTasksQueue
: true,
381 tasksQueueOptions
: { concurrency
: 0.2 }
385 new TypeError('Invalid worker node tasks concurrency: must be an integer')
391 './tests/worker-files/thread/testWorker.js',
393 enableTasksQueue
: true,
394 tasksQueueOptions
: { queueMaxSize
: 2 }
399 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
406 './tests/worker-files/thread/testWorker.js',
408 enableTasksQueue
: true,
409 tasksQueueOptions
: { size
: 0 }
414 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
421 './tests/worker-files/thread/testWorker.js',
423 enableTasksQueue
: true,
424 tasksQueueOptions
: { size
: -1 }
429 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
436 './tests/worker-files/thread/testWorker.js',
438 enableTasksQueue
: true,
439 tasksQueueOptions
: { size
: 0.2 }
443 new TypeError('Invalid worker node tasks queue size: must be an integer')
447 it('Verify that pool worker choice strategy options can be set', async () => {
448 const pool
= new FixedThreadPool(
450 './tests/worker-files/thread/testWorker.js',
451 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
453 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
455 runTime
: { median
: false },
456 waitTime
: { median
: false },
457 elu
: { median
: false }
459 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
461 runTime
: { median
: false },
462 waitTime
: { median
: false },
463 elu
: { median
: false }
465 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
466 .workerChoiceStrategies
) {
467 expect(workerChoiceStrategy
.opts
).toStrictEqual({
469 runTime
: { median
: false },
470 waitTime
: { median
: false },
471 elu
: { median
: false }
475 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
493 pool
.setWorkerChoiceStrategyOptions({
494 runTime
: { median
: true },
495 elu
: { median
: true }
497 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
499 runTime
: { median
: true },
500 waitTime
: { median
: false },
501 elu
: { median
: true }
503 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
505 runTime
: { median
: true },
506 waitTime
: { median
: false },
507 elu
: { median
: true }
509 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
510 .workerChoiceStrategies
) {
511 expect(workerChoiceStrategy
.opts
).toStrictEqual({
513 runTime
: { median
: true },
514 waitTime
: { median
: false },
515 elu
: { median
: true }
519 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
537 pool
.setWorkerChoiceStrategyOptions({
538 runTime
: { median
: false },
539 elu
: { median
: false }
541 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
543 runTime
: { median
: false },
544 waitTime
: { median
: false },
545 elu
: { median
: false }
547 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
549 runTime
: { median
: false },
550 waitTime
: { median
: false },
551 elu
: { median
: false }
553 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
554 .workerChoiceStrategies
) {
555 expect(workerChoiceStrategy
.opts
).toStrictEqual({
557 runTime
: { median
: false },
558 waitTime
: { median
: false },
559 elu
: { median
: false }
563 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
582 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
585 'Invalid worker choice strategy options: must be a plain object'
589 pool
.setWorkerChoiceStrategyOptions({
590 retries
: 'invalidChoiceRetries'
594 'Invalid worker choice strategy options: retries must be an integer'
598 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
601 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
605 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
608 'Invalid worker choice strategy options: must have a weight for each worker node'
612 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
615 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
621 it('Verify that pool tasks queue can be enabled/disabled', async () => {
622 const pool
= new FixedThreadPool(
624 './tests/worker-files/thread/testWorker.js'
626 expect(pool
.opts
.enableTasksQueue
).toBe(false)
627 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
628 pool
.enableTasksQueue(true)
629 expect(pool
.opts
.enableTasksQueue
).toBe(true)
630 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
634 pool
.enableTasksQueue(true, { concurrency
: 2 })
635 expect(pool
.opts
.enableTasksQueue
).toBe(true)
636 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
640 pool
.enableTasksQueue(false)
641 expect(pool
.opts
.enableTasksQueue
).toBe(false)
642 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
646 it('Verify that pool tasks queue options can be set', async () => {
647 const pool
= new FixedThreadPool(
649 './tests/worker-files/thread/testWorker.js',
650 { enableTasksQueue
: true }
652 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
656 pool
.setTasksQueueOptions({ concurrency
: 2 })
657 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
662 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
664 new TypeError('Invalid tasks queue options: must be a plain object')
666 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
671 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
676 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
679 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
681 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
684 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
686 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
689 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
691 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
694 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
695 new TypeError('Invalid worker node tasks queue size: must be an integer')
700 it('Verify that pool info is set', async () => {
701 let pool
= new FixedThreadPool(
703 './tests/worker-files/thread/testWorker.js'
705 expect(pool
.info
).toStrictEqual({
707 type
: PoolTypes
.fixed
,
708 worker
: WorkerTypes
.thread
,
710 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
711 minSize
: numberOfWorkers
,
712 maxSize
: numberOfWorkers
,
713 workerNodes
: numberOfWorkers
,
714 idleWorkerNodes
: numberOfWorkers
,
721 pool
= new DynamicClusterPool(
722 Math
.floor(numberOfWorkers
/ 2),
724 './tests/worker-files/cluster/testWorker.js'
726 expect(pool
.info
).toStrictEqual({
728 type
: PoolTypes
.dynamic
,
729 worker
: WorkerTypes
.cluster
,
731 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
732 minSize
: Math
.floor(numberOfWorkers
/ 2),
733 maxSize
: numberOfWorkers
,
734 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
735 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
744 it('Verify that pool worker tasks usage are initialized', async () => {
745 const pool
= new FixedClusterPool(
747 './tests/worker-files/cluster/testWorker.js'
749 for (const workerNode
of pool
.workerNodes
) {
750 expect(workerNode
.usage
).toStrictEqual({
760 history
: expect
.any(CircularArray
)
763 history
: expect
.any(CircularArray
)
767 history
: expect
.any(CircularArray
)
770 history
: expect
.any(CircularArray
)
778 it('Verify that pool worker tasks queue are initialized', async () => {
779 let pool
= new FixedClusterPool(
781 './tests/worker-files/cluster/testWorker.js'
783 for (const workerNode
of pool
.workerNodes
) {
784 expect(workerNode
.tasksQueue
).toBeDefined()
785 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
786 expect(workerNode
.tasksQueue
.size
).toBe(0)
787 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
790 pool
= new DynamicThreadPool(
791 Math
.floor(numberOfWorkers
/ 2),
793 './tests/worker-files/thread/testWorker.js'
795 for (const workerNode
of pool
.workerNodes
) {
796 expect(workerNode
.tasksQueue
).toBeDefined()
797 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
798 expect(workerNode
.tasksQueue
.size
).toBe(0)
799 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
804 it('Verify that pool worker info are initialized', async () => {
805 let pool
= new FixedClusterPool(
807 './tests/worker-files/cluster/testWorker.js'
809 for (const workerNode
of pool
.workerNodes
) {
810 expect(workerNode
.info
).toStrictEqual({
811 id
: expect
.any(Number
),
812 type
: WorkerTypes
.cluster
,
818 pool
= new DynamicThreadPool(
819 Math
.floor(numberOfWorkers
/ 2),
821 './tests/worker-files/thread/testWorker.js'
823 for (const workerNode
of pool
.workerNodes
) {
824 expect(workerNode
.info
).toStrictEqual({
825 id
: expect
.any(Number
),
826 type
: WorkerTypes
.thread
,
834 it('Verify that pool execute() arguments are checked', async () => {
835 const pool
= new FixedClusterPool(
837 './tests/worker-files/cluster/testWorker.js'
839 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
840 new TypeError('name argument must be a string')
842 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
843 new TypeError('name argument must not be an empty string')
845 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
846 new TypeError('transferList argument must be an array')
848 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
849 "Task function 'unknown' not found"
852 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
853 new Error('Cannot execute a task on destroyed pool')
857 it('Verify that pool worker tasks usage are computed', async () => {
858 const pool
= new FixedClusterPool(
860 './tests/worker-files/cluster/testWorker.js'
862 const promises
= new Set()
863 const maxMultiplier
= 2
864 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
865 promises
.add(pool
.execute())
867 for (const workerNode
of pool
.workerNodes
) {
868 expect(workerNode
.usage
).toStrictEqual({
871 executing
: maxMultiplier
,
878 history
: expect
.any(CircularArray
)
881 history
: expect
.any(CircularArray
)
885 history
: expect
.any(CircularArray
)
888 history
: expect
.any(CircularArray
)
893 await Promise
.all(promises
)
894 for (const workerNode
of pool
.workerNodes
) {
895 expect(workerNode
.usage
).toStrictEqual({
897 executed
: maxMultiplier
,
905 history
: expect
.any(CircularArray
)
908 history
: expect
.any(CircularArray
)
912 history
: expect
.any(CircularArray
)
915 history
: expect
.any(CircularArray
)
923 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
924 const pool
= new DynamicThreadPool(
925 Math
.floor(numberOfWorkers
/ 2),
927 './tests/worker-files/thread/testWorker.js'
929 const promises
= new Set()
930 const maxMultiplier
= 2
931 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
932 promises
.add(pool
.execute())
934 await Promise
.all(promises
)
935 for (const workerNode
of pool
.workerNodes
) {
936 expect(workerNode
.usage
).toStrictEqual({
938 executed
: expect
.any(Number
),
946 history
: expect
.any(CircularArray
)
949 history
: expect
.any(CircularArray
)
953 history
: expect
.any(CircularArray
)
956 history
: expect
.any(CircularArray
)
960 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
961 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
962 numberOfWorkers
* maxMultiplier
964 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
965 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
966 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
967 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
969 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
970 for (const workerNode
of pool
.workerNodes
) {
971 expect(workerNode
.usage
).toStrictEqual({
981 history
: expect
.any(CircularArray
)
984 history
: expect
.any(CircularArray
)
988 history
: expect
.any(CircularArray
)
991 history
: expect
.any(CircularArray
)
995 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
996 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
997 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
998 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1000 await pool
.destroy()
1003 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1004 const pool
= new DynamicClusterPool(
1005 Math
.floor(numberOfWorkers
/ 2),
1007 './tests/worker-files/cluster/testWorker.js'
1011 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
1015 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1016 expect(poolReady
).toBe(1)
1017 expect(poolInfo
).toStrictEqual({
1019 type
: PoolTypes
.dynamic
,
1020 worker
: WorkerTypes
.cluster
,
1022 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1023 minSize
: expect
.any(Number
),
1024 maxSize
: expect
.any(Number
),
1025 workerNodes
: expect
.any(Number
),
1026 idleWorkerNodes
: expect
.any(Number
),
1027 busyWorkerNodes
: expect
.any(Number
),
1028 executedTasks
: expect
.any(Number
),
1029 executingTasks
: expect
.any(Number
),
1030 failedTasks
: expect
.any(Number
)
1032 await pool
.destroy()
1035 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1036 const pool
= new FixedThreadPool(
1038 './tests/worker-files/thread/testWorker.js'
1040 const promises
= new Set()
1043 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
1047 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1048 promises
.add(pool
.execute())
1050 await Promise
.all(promises
)
1051 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1052 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1053 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1054 expect(poolInfo
).toStrictEqual({
1056 type
: PoolTypes
.fixed
,
1057 worker
: WorkerTypes
.thread
,
1058 ready
: expect
.any(Boolean
),
1059 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1060 minSize
: expect
.any(Number
),
1061 maxSize
: expect
.any(Number
),
1062 workerNodes
: expect
.any(Number
),
1063 idleWorkerNodes
: expect
.any(Number
),
1064 busyWorkerNodes
: expect
.any(Number
),
1065 executedTasks
: expect
.any(Number
),
1066 executingTasks
: expect
.any(Number
),
1067 failedTasks
: expect
.any(Number
)
1069 await pool
.destroy()
1072 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1073 const pool
= new DynamicThreadPool(
1074 Math
.floor(numberOfWorkers
/ 2),
1076 './tests/worker-files/thread/testWorker.js'
1078 const promises
= new Set()
1081 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
1085 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1086 promises
.add(pool
.execute())
1088 await Promise
.all(promises
)
1089 expect(poolFull
).toBe(1)
1090 expect(poolInfo
).toStrictEqual({
1092 type
: PoolTypes
.dynamic
,
1093 worker
: WorkerTypes
.thread
,
1094 ready
: expect
.any(Boolean
),
1095 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1096 minSize
: expect
.any(Number
),
1097 maxSize
: expect
.any(Number
),
1098 workerNodes
: expect
.any(Number
),
1099 idleWorkerNodes
: expect
.any(Number
),
1100 busyWorkerNodes
: expect
.any(Number
),
1101 executedTasks
: expect
.any(Number
),
1102 executingTasks
: expect
.any(Number
),
1103 failedTasks
: expect
.any(Number
)
1105 await pool
.destroy()
1108 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1109 const pool
= new FixedThreadPool(
1111 './tests/worker-files/thread/testWorker.js',
1113 enableTasksQueue
: true
1116 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1117 const promises
= new Set()
1118 let poolBackPressure
= 0
1120 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1124 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1125 promises
.add(pool
.execute())
1127 await Promise
.all(promises
)
1128 expect(poolBackPressure
).toBe(1)
1129 expect(poolInfo
).toStrictEqual({
1131 type
: PoolTypes
.fixed
,
1132 worker
: WorkerTypes
.thread
,
1133 ready
: expect
.any(Boolean
),
1134 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1135 minSize
: expect
.any(Number
),
1136 maxSize
: expect
.any(Number
),
1137 workerNodes
: expect
.any(Number
),
1138 idleWorkerNodes
: expect
.any(Number
),
1139 busyWorkerNodes
: expect
.any(Number
),
1140 executedTasks
: expect
.any(Number
),
1141 executingTasks
: expect
.any(Number
),
1142 maxQueuedTasks
: expect
.any(Number
),
1143 queuedTasks
: expect
.any(Number
),
1145 stolenTasks
: expect
.any(Number
),
1146 failedTasks
: expect
.any(Number
)
1148 expect(pool
.hasBackPressure
.called
).toBe(true)
1149 await pool
.destroy()
1152 it('Verify that listTaskFunctions() is working', async () => {
1153 const dynamicThreadPool
= new DynamicThreadPool(
1154 Math
.floor(numberOfWorkers
/ 2),
1156 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1158 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1159 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1161 'jsonIntegerSerialization',
1165 const fixedClusterPool
= new FixedClusterPool(
1167 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1169 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1170 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1172 'jsonIntegerSerialization',
1176 await dynamicThreadPool
.destroy()
1177 await fixedClusterPool
.destroy()
1180 it('Verify that multiple task functions worker is working', async () => {
1181 const pool
= new DynamicClusterPool(
1182 Math
.floor(numberOfWorkers
/ 2),
1184 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1186 const data
= { n
: 10 }
1187 const result0
= await pool
.execute(data
)
1188 expect(result0
).toStrictEqual({ ok
: 1 })
1189 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1190 expect(result1
).toStrictEqual({ ok
: 1 })
1191 const result2
= await pool
.execute(data
, 'factorial')
1192 expect(result2
).toBe(3628800)
1193 const result3
= await pool
.execute(data
, 'fibonacci')
1194 expect(result3
).toBe(55)
1195 expect(pool
.info
.executingTasks
).toBe(0)
1196 expect(pool
.info
.executedTasks
).toBe(4)
1197 for (const workerNode
of pool
.workerNodes
) {
1198 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1200 'jsonIntegerSerialization',
1204 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1205 for (const name
of pool
.listTaskFunctions()) {
1206 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1208 executed
: expect
.any(Number
),
1209 executing
: expect
.any(Number
),
1215 history
: expect
.any(CircularArray
)
1218 history
: expect
.any(CircularArray
)
1222 history
: expect
.any(CircularArray
)
1225 history
: expect
.any(CircularArray
)
1230 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1231 ).toBeGreaterThanOrEqual(0)
1234 await pool
.destroy()