1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
16 const { version
} = require('../../../package.json')
17 const { waitPoolEvents
} = require('../../test-utils')
19 describe('Abstract pool test suite', () => {
20 const numberOfWorkers
= 2
21 class StubPoolWithIsMain
extends FixedThreadPool
{
31 it('Simulate pool creation from a non main thread/process', () => {
34 new StubPoolWithIsMain(
36 './tests/worker-files/thread/testWorker.js',
38 errorHandler
: (e
) => console
.error(e
)
43 'Cannot start a pool from a worker with the same type as the pool'
48 it('Verify that pool statuses properties are set', async () => {
49 const pool
= new FixedThreadPool(
51 './tests/worker-files/thread/testWorker.js'
53 expect(pool
.starting
).toBe(false)
54 expect(pool
.started
).toBe(true)
56 expect(pool
.started
).toBe(false)
59 it('Verify that filePath is checked', () => {
60 const expectedError
= new Error(
61 'Please specify a file with a worker implementation'
63 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
66 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
69 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
72 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
76 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
77 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
80 it('Verify that numberOfWorkers is checked', () => {
81 expect(() => new FixedThreadPool()).toThrowError(
83 'Cannot instantiate a pool without specifying the number of workers'
88 it('Verify that a negative number of workers is checked', () => {
91 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
94 'Cannot instantiate a pool with a negative number of workers'
99 it('Verify that a non integer number of workers is checked', () => {
102 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
105 'Cannot instantiate a pool with a non safe integer number of workers'
110 it('Verify that dynamic pool sizing is checked', () => {
113 new DynamicClusterPool(
116 './tests/worker-files/cluster/testWorker.js'
120 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
125 new DynamicThreadPool(
128 './tests/worker-files/thread/testWorker.js'
132 'Cannot instantiate a pool with a non safe integer number of workers'
137 new DynamicClusterPool(
140 './tests/worker-files/cluster/testWorker.js'
144 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
149 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
152 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
157 new DynamicClusterPool(
160 './tests/worker-files/cluster/testWorker.js'
164 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
169 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
172 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
177 it('Verify that pool options are checked', async () => {
178 let pool
= new FixedThreadPool(
180 './tests/worker-files/thread/testWorker.js'
182 expect(pool
.emitter
).toBeDefined()
183 expect(pool
.opts
.enableEvents
).toBe(true)
184 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
185 expect(pool
.opts
.enableTasksQueue
).toBe(false)
186 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
187 expect(pool
.opts
.workerChoiceStrategy
).toBe(
188 WorkerChoiceStrategies
.ROUND_ROBIN
190 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
192 runTime
: { median
: false },
193 waitTime
: { median
: false },
194 elu
: { median
: false }
196 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
198 runTime
: { median
: false },
199 waitTime
: { median
: false },
200 elu
: { median
: false }
202 expect(pool
.opts
.messageHandler
).toBeUndefined()
203 expect(pool
.opts
.errorHandler
).toBeUndefined()
204 expect(pool
.opts
.onlineHandler
).toBeUndefined()
205 expect(pool
.opts
.exitHandler
).toBeUndefined()
207 const testHandler
= () => console
.info('test handler executed')
208 pool
= new FixedThreadPool(
210 './tests/worker-files/thread/testWorker.js',
212 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
213 workerChoiceStrategyOptions
: {
214 runTime
: { median
: true },
215 weights
: { 0: 300, 1: 200 }
218 restartWorkerOnError
: false,
219 enableTasksQueue
: true,
220 tasksQueueOptions
: { concurrency
: 2 },
221 messageHandler
: testHandler
,
222 errorHandler
: testHandler
,
223 onlineHandler
: testHandler
,
224 exitHandler
: testHandler
227 expect(pool
.emitter
).toBeUndefined()
228 expect(pool
.opts
.enableEvents
).toBe(false)
229 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
230 expect(pool
.opts
.enableTasksQueue
).toBe(true)
231 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
235 expect(pool
.opts
.workerChoiceStrategy
).toBe(
236 WorkerChoiceStrategies
.LEAST_USED
238 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
240 runTime
: { median
: true },
241 waitTime
: { median
: false },
242 elu
: { median
: false },
243 weights
: { 0: 300, 1: 200 }
245 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
247 runTime
: { median
: true },
248 waitTime
: { median
: false },
249 elu
: { median
: false },
250 weights
: { 0: 300, 1: 200 }
252 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
253 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
254 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
255 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
259 it('Verify that pool options are validated', async () => {
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategy
: 'invalidStrategy'
270 new Error("Invalid worker choice strategy 'invalidStrategy'")
276 './tests/worker-files/thread/testWorker.js',
278 workerChoiceStrategyOptions
: {
279 choiceRetries
: 'invalidChoiceRetries'
285 'Invalid worker choice strategy options: choice retries must be an integer'
292 './tests/worker-files/thread/testWorker.js',
294 workerChoiceStrategyOptions
: {
301 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
308 './tests/worker-files/thread/testWorker.js',
310 workerChoiceStrategyOptions
: { weights
: {} }
315 'Invalid worker choice strategy options: must have a weight for each worker node'
322 './tests/worker-files/thread/testWorker.js',
324 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
329 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
336 './tests/worker-files/thread/testWorker.js',
338 enableTasksQueue
: true,
339 tasksQueueOptions
: 'invalidTasksQueueOptions'
343 new TypeError('Invalid tasks queue options: must be a plain object')
349 './tests/worker-files/thread/testWorker.js',
351 enableTasksQueue
: true,
352 tasksQueueOptions
: { concurrency
: 0 }
357 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
364 './tests/worker-files/thread/testWorker.js',
366 enableTasksQueue
: true,
367 tasksQueueOptions
: { concurrency
: -1 }
372 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
379 './tests/worker-files/thread/testWorker.js',
381 enableTasksQueue
: true,
382 tasksQueueOptions
: { concurrency
: 0.2 }
386 new TypeError('Invalid worker node tasks concurrency: must be an integer')
392 './tests/worker-files/thread/testWorker.js',
394 enableTasksQueue
: true,
395 tasksQueueOptions
: { queueMaxSize
: 2 }
400 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
407 './tests/worker-files/thread/testWorker.js',
409 enableTasksQueue
: true,
410 tasksQueueOptions
: { size
: 0 }
415 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
422 './tests/worker-files/thread/testWorker.js',
424 enableTasksQueue
: true,
425 tasksQueueOptions
: { size
: -1 }
430 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
437 './tests/worker-files/thread/testWorker.js',
439 enableTasksQueue
: true,
440 tasksQueueOptions
: { size
: 0.2 }
444 new TypeError('Invalid worker node tasks queue size: must be an integer')
448 it('Verify that pool worker choice strategy options can be set', async () => {
449 const pool
= new FixedThreadPool(
451 './tests/worker-files/thread/testWorker.js',
452 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
454 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
456 runTime
: { median
: false },
457 waitTime
: { median
: false },
458 elu
: { median
: false }
460 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
462 runTime
: { median
: false },
463 waitTime
: { median
: false },
464 elu
: { median
: false }
466 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
467 .workerChoiceStrategies
) {
468 expect(workerChoiceStrategy
.opts
).toStrictEqual({
470 runTime
: { median
: false },
471 waitTime
: { median
: false },
472 elu
: { median
: false }
476 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
494 pool
.setWorkerChoiceStrategyOptions({
495 runTime
: { median
: true },
496 elu
: { median
: true }
498 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
500 runTime
: { median
: true },
501 waitTime
: { median
: false },
502 elu
: { median
: true }
504 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
506 runTime
: { median
: true },
507 waitTime
: { median
: false },
508 elu
: { median
: true }
510 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
511 .workerChoiceStrategies
) {
512 expect(workerChoiceStrategy
.opts
).toStrictEqual({
514 runTime
: { median
: true },
515 waitTime
: { median
: false },
516 elu
: { median
: true }
520 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
538 pool
.setWorkerChoiceStrategyOptions({
539 runTime
: { median
: false },
540 elu
: { median
: false }
542 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
544 runTime
: { median
: false },
545 waitTime
: { median
: false },
546 elu
: { median
: false }
548 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
550 runTime
: { median
: false },
551 waitTime
: { median
: false },
552 elu
: { median
: false }
554 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
555 .workerChoiceStrategies
) {
556 expect(workerChoiceStrategy
.opts
).toStrictEqual({
558 runTime
: { median
: false },
559 waitTime
: { median
: false },
560 elu
: { median
: false }
564 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
583 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
586 'Invalid worker choice strategy options: must be a plain object'
590 pool
.setWorkerChoiceStrategyOptions({
591 choiceRetries
: 'invalidChoiceRetries'
595 'Invalid worker choice strategy options: choice retries must be an integer'
599 pool
.setWorkerChoiceStrategyOptions({ choiceRetries
: -1 })
602 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
606 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
609 'Invalid worker choice strategy options: must have a weight for each worker node'
613 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
616 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
622 it('Verify that pool tasks queue can be enabled/disabled', async () => {
623 const pool
= new FixedThreadPool(
625 './tests/worker-files/thread/testWorker.js'
627 expect(pool
.opts
.enableTasksQueue
).toBe(false)
628 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
629 pool
.enableTasksQueue(true)
630 expect(pool
.opts
.enableTasksQueue
).toBe(true)
631 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
635 pool
.enableTasksQueue(true, { concurrency
: 2 })
636 expect(pool
.opts
.enableTasksQueue
).toBe(true)
637 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
641 pool
.enableTasksQueue(false)
642 expect(pool
.opts
.enableTasksQueue
).toBe(false)
643 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
647 it('Verify that pool tasks queue options can be set', async () => {
648 const pool
= new FixedThreadPool(
650 './tests/worker-files/thread/testWorker.js',
651 { enableTasksQueue
: true }
653 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
657 pool
.setTasksQueueOptions({ concurrency
: 2 })
658 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
663 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
665 new TypeError('Invalid tasks queue options: must be a plain object')
667 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
669 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
672 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
674 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
677 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
678 new TypeError('Invalid worker node tasks concurrency: must be an integer')
680 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
682 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
685 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
687 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
690 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
692 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
695 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
696 new TypeError('Invalid worker node tasks queue size: must be an integer')
701 it('Verify that pool info is set', async () => {
702 let pool
= new FixedThreadPool(
704 './tests/worker-files/thread/testWorker.js'
706 expect(pool
.info
).toStrictEqual({
708 type
: PoolTypes
.fixed
,
709 worker
: WorkerTypes
.thread
,
711 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
712 minSize
: numberOfWorkers
,
713 maxSize
: numberOfWorkers
,
714 workerNodes
: numberOfWorkers
,
715 idleWorkerNodes
: numberOfWorkers
,
722 pool
= new DynamicClusterPool(
723 Math
.floor(numberOfWorkers
/ 2),
725 './tests/worker-files/cluster/testWorker.js'
727 expect(pool
.info
).toStrictEqual({
729 type
: PoolTypes
.dynamic
,
730 worker
: WorkerTypes
.cluster
,
732 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
733 minSize
: Math
.floor(numberOfWorkers
/ 2),
734 maxSize
: numberOfWorkers
,
735 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
736 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
745 it('Verify that pool worker tasks usage are initialized', async () => {
746 const pool
= new FixedClusterPool(
748 './tests/worker-files/cluster/testWorker.js'
750 for (const workerNode
of pool
.workerNodes
) {
751 expect(workerNode
.usage
).toStrictEqual({
761 history
: expect
.any(CircularArray
)
764 history
: expect
.any(CircularArray
)
768 history
: expect
.any(CircularArray
)
771 history
: expect
.any(CircularArray
)
779 it('Verify that pool worker tasks queue are initialized', async () => {
780 let pool
= new FixedClusterPool(
782 './tests/worker-files/cluster/testWorker.js'
784 for (const workerNode
of pool
.workerNodes
) {
785 expect(workerNode
.tasksQueue
).toBeDefined()
786 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
787 expect(workerNode
.tasksQueue
.size
).toBe(0)
788 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
791 pool
= new DynamicThreadPool(
792 Math
.floor(numberOfWorkers
/ 2),
794 './tests/worker-files/thread/testWorker.js'
796 for (const workerNode
of pool
.workerNodes
) {
797 expect(workerNode
.tasksQueue
).toBeDefined()
798 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
799 expect(workerNode
.tasksQueue
.size
).toBe(0)
800 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
804 it('Verify that pool worker info are initialized', async () => {
805 let pool
= new FixedClusterPool(
807 './tests/worker-files/cluster/testWorker.js'
809 for (const workerNode
of pool
.workerNodes
) {
810 expect(workerNode
.info
).toStrictEqual({
811 id
: expect
.any(Number
),
812 type
: WorkerTypes
.cluster
,
818 pool
= new DynamicThreadPool(
819 Math
.floor(numberOfWorkers
/ 2),
821 './tests/worker-files/thread/testWorker.js'
823 for (const workerNode
of pool
.workerNodes
) {
824 expect(workerNode
.info
).toStrictEqual({
825 id
: expect
.any(Number
),
826 type
: WorkerTypes
.thread
,
833 it('Verify that pool execute() arguments are checked', async () => {
834 const pool
= new FixedClusterPool(
836 './tests/worker-files/cluster/testWorker.js'
838 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
839 new TypeError('name argument must be a string')
841 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
842 new TypeError('name argument must not be an empty string')
844 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
845 new TypeError('transferList argument must be an array')
847 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
848 "Task function 'unknown' not found"
851 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
852 new Error('Cannot execute a task on destroyed pool')
856 it('Verify that pool worker tasks usage are computed', async () => {
857 const pool
= new FixedClusterPool(
859 './tests/worker-files/cluster/testWorker.js'
861 const promises
= new Set()
862 const maxMultiplier
= 2
863 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
864 promises
.add(pool
.execute())
866 for (const workerNode
of pool
.workerNodes
) {
867 expect(workerNode
.usage
).toStrictEqual({
870 executing
: maxMultiplier
,
877 history
: expect
.any(CircularArray
)
880 history
: expect
.any(CircularArray
)
884 history
: expect
.any(CircularArray
)
887 history
: expect
.any(CircularArray
)
892 await Promise
.all(promises
)
893 for (const workerNode
of pool
.workerNodes
) {
894 expect(workerNode
.usage
).toStrictEqual({
896 executed
: maxMultiplier
,
904 history
: expect
.any(CircularArray
)
907 history
: expect
.any(CircularArray
)
911 history
: expect
.any(CircularArray
)
914 history
: expect
.any(CircularArray
)
922 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
923 const pool
= new DynamicThreadPool(
924 Math
.floor(numberOfWorkers
/ 2),
926 './tests/worker-files/thread/testWorker.js'
928 const promises
= new Set()
929 const maxMultiplier
= 2
930 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
931 promises
.add(pool
.execute())
933 await Promise
.all(promises
)
934 for (const workerNode
of pool
.workerNodes
) {
935 expect(workerNode
.usage
).toStrictEqual({
937 executed
: expect
.any(Number
),
945 history
: expect
.any(CircularArray
)
948 history
: expect
.any(CircularArray
)
952 history
: expect
.any(CircularArray
)
955 history
: expect
.any(CircularArray
)
959 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
960 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
961 numberOfWorkers
* maxMultiplier
963 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
964 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
965 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
966 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
968 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
969 for (const workerNode
of pool
.workerNodes
) {
970 expect(workerNode
.usage
).toStrictEqual({
980 history
: expect
.any(CircularArray
)
983 history
: expect
.any(CircularArray
)
987 history
: expect
.any(CircularArray
)
990 history
: expect
.any(CircularArray
)
994 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
995 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
996 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
997 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1002 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1003 const pool
= new DynamicClusterPool(
1004 Math
.floor(numberOfWorkers
/ 2),
1006 './tests/worker-files/cluster/testWorker.js'
1010 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
1014 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1015 expect(poolReady
).toBe(1)
1016 expect(poolInfo
).toStrictEqual({
1018 type
: PoolTypes
.dynamic
,
1019 worker
: WorkerTypes
.cluster
,
1021 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1022 minSize
: expect
.any(Number
),
1023 maxSize
: expect
.any(Number
),
1024 workerNodes
: expect
.any(Number
),
1025 idleWorkerNodes
: expect
.any(Number
),
1026 busyWorkerNodes
: expect
.any(Number
),
1027 executedTasks
: expect
.any(Number
),
1028 executingTasks
: expect
.any(Number
),
1029 failedTasks
: expect
.any(Number
)
1031 await pool
.destroy()
1034 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1035 const pool
= new FixedThreadPool(
1037 './tests/worker-files/thread/testWorker.js'
1039 const promises
= new Set()
1042 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
1046 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1047 promises
.add(pool
.execute())
1049 await Promise
.all(promises
)
1050 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1051 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1052 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1053 expect(poolInfo
).toStrictEqual({
1055 type
: PoolTypes
.fixed
,
1056 worker
: WorkerTypes
.thread
,
1057 ready
: expect
.any(Boolean
),
1058 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1059 minSize
: expect
.any(Number
),
1060 maxSize
: expect
.any(Number
),
1061 workerNodes
: expect
.any(Number
),
1062 idleWorkerNodes
: expect
.any(Number
),
1063 busyWorkerNodes
: expect
.any(Number
),
1064 executedTasks
: expect
.any(Number
),
1065 executingTasks
: expect
.any(Number
),
1066 failedTasks
: expect
.any(Number
)
1068 await pool
.destroy()
1071 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1072 const pool
= new DynamicThreadPool(
1073 Math
.floor(numberOfWorkers
/ 2),
1075 './tests/worker-files/thread/testWorker.js'
1077 const promises
= new Set()
1080 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
1084 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1085 promises
.add(pool
.execute())
1087 await Promise
.all(promises
)
1088 expect(poolFull
).toBe(1)
1089 expect(poolInfo
).toStrictEqual({
1091 type
: PoolTypes
.dynamic
,
1092 worker
: WorkerTypes
.thread
,
1093 ready
: expect
.any(Boolean
),
1094 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1095 minSize
: expect
.any(Number
),
1096 maxSize
: expect
.any(Number
),
1097 workerNodes
: expect
.any(Number
),
1098 idleWorkerNodes
: expect
.any(Number
),
1099 busyWorkerNodes
: expect
.any(Number
),
1100 executedTasks
: expect
.any(Number
),
1101 executingTasks
: expect
.any(Number
),
1102 failedTasks
: expect
.any(Number
)
1104 await pool
.destroy()
1107 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1108 const pool
= new FixedThreadPool(
1110 './tests/worker-files/thread/testWorker.js',
1112 enableTasksQueue
: true
1115 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1116 const promises
= new Set()
1117 let poolBackPressure
= 0
1119 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1123 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1124 promises
.add(pool
.execute())
1126 await Promise
.all(promises
)
1127 expect(poolBackPressure
).toBe(1)
1128 expect(poolInfo
).toStrictEqual({
1130 type
: PoolTypes
.fixed
,
1131 worker
: WorkerTypes
.thread
,
1132 ready
: expect
.any(Boolean
),
1133 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1134 minSize
: expect
.any(Number
),
1135 maxSize
: expect
.any(Number
),
1136 workerNodes
: expect
.any(Number
),
1137 idleWorkerNodes
: expect
.any(Number
),
1138 busyWorkerNodes
: expect
.any(Number
),
1139 executedTasks
: expect
.any(Number
),
1140 executingTasks
: expect
.any(Number
),
1141 maxQueuedTasks
: expect
.any(Number
),
1142 queuedTasks
: expect
.any(Number
),
1144 stolenTasks
: expect
.any(Number
),
1145 failedTasks
: expect
.any(Number
)
1147 expect(pool
.hasBackPressure
.called
).toBe(true)
1148 await pool
.destroy()
1151 it('Verify that listTaskFunctions() is working', async () => {
1152 const dynamicThreadPool
= new DynamicThreadPool(
1153 Math
.floor(numberOfWorkers
/ 2),
1155 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1157 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1158 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1160 'jsonIntegerSerialization',
1164 const fixedClusterPool
= new FixedClusterPool(
1166 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1168 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1169 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1171 'jsonIntegerSerialization',
1177 it('Verify that multiple task functions worker is working', async () => {
1178 const pool
= new DynamicClusterPool(
1179 Math
.floor(numberOfWorkers
/ 2),
1181 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1183 const data
= { n
: 10 }
1184 const result0
= await pool
.execute(data
)
1185 expect(result0
).toStrictEqual({ ok
: 1 })
1186 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1187 expect(result1
).toStrictEqual({ ok
: 1 })
1188 const result2
= await pool
.execute(data
, 'factorial')
1189 expect(result2
).toBe(3628800)
1190 const result3
= await pool
.execute(data
, 'fibonacci')
1191 expect(result3
).toBe(55)
1192 expect(pool
.info
.executingTasks
).toBe(0)
1193 expect(pool
.info
.executedTasks
).toBe(4)
1194 for (const workerNode
of pool
.workerNodes
) {
1195 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1197 'jsonIntegerSerialization',
1201 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1202 for (const name
of pool
.listTaskFunctions()) {
1203 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1205 executed
: expect
.any(Number
),
1206 executing
: expect
.any(Number
),
1212 history
: expect
.any(CircularArray
)
1215 history
: expect
.any(CircularArray
)
1219 history
: expect
.any(CircularArray
)
1222 history
: expect
.any(CircularArray
)
1227 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1228 ).toBeGreaterThanOrEqual(0)