1 const { EventEmitter
} = require('events')
2 const { expect
} = require('expect')
3 const sinon
= require('sinon')
11 WorkerChoiceStrategies
,
13 } = require('../../../lib')
14 const { CircularArray
} = require('../../../lib/circular-array')
15 const { Deque
} = require('../../../lib/deque')
16 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
17 const { version
} = require('../../../package.json')
18 const { waitPoolEvents
} = require('../../test-utils')
20 describe('Abstract pool test suite', () => {
21 const numberOfWorkers
= 2
22 class StubPoolWithIsMain
extends FixedThreadPool
{
32 it('Simulate pool creation from a non main thread/process', () => {
35 new StubPoolWithIsMain(
37 './tests/worker-files/thread/testWorker.js',
39 errorHandler
: (e
) => console
.error(e
)
44 'Cannot start a pool from a worker with the same type as the pool'
49 it('Verify that pool statuses properties are set', async () => {
50 const pool
= new FixedThreadPool(
52 './tests/worker-files/thread/testWorker.js'
54 expect(pool
.starting
).toBe(false)
55 expect(pool
.started
).toBe(true)
59 it('Verify that filePath is checked', () => {
60 const expectedError
= new Error(
61 'Please specify a file with a worker implementation'
63 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
66 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
69 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
72 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
76 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
77 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
80 it('Verify that numberOfWorkers is checked', () => {
81 expect(() => new FixedThreadPool()).toThrowError(
83 'Cannot instantiate a pool without specifying the number of workers'
88 it('Verify that a negative number of workers is checked', () => {
91 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
94 'Cannot instantiate a pool with a negative number of workers'
99 it('Verify that a non integer number of workers is checked', () => {
102 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
105 'Cannot instantiate a pool with a non safe integer number of workers'
110 it('Verify that dynamic pool sizing is checked', () => {
113 new DynamicClusterPool(
116 './tests/worker-files/cluster/testWorker.js'
120 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
125 new DynamicThreadPool(
128 './tests/worker-files/thread/testWorker.js'
132 'Cannot instantiate a pool with a non safe integer number of workers'
137 new DynamicClusterPool(
140 './tests/worker-files/cluster/testWorker.js'
144 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
149 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
152 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 new DynamicClusterPool(
168 './tests/worker-files/cluster/testWorker.js'
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
177 it('Verify that pool options are checked', async () => {
178 let pool
= new FixedThreadPool(
180 './tests/worker-files/thread/testWorker.js'
182 expect(pool
.emitter
).toBeInstanceOf(EventEmitter
)
183 expect(pool
.opts
.enableEvents
).toBe(true)
184 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
185 expect(pool
.opts
.enableTasksQueue
).toBe(false)
186 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
187 expect(pool
.opts
.workerChoiceStrategy
).toBe(
188 WorkerChoiceStrategies
.ROUND_ROBIN
190 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
192 runTime
: { median
: false },
193 waitTime
: { median
: false },
194 elu
: { median
: false }
196 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
198 runTime
: { median
: false },
199 waitTime
: { median
: false },
200 elu
: { median
: false }
202 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
203 .workerChoiceStrategies
) {
204 expect(workerChoiceStrategy
.opts
).toStrictEqual({
206 runTime
: { median
: false },
207 waitTime
: { median
: false },
208 elu
: { median
: false }
211 expect(pool
.opts
.messageHandler
).toBeUndefined()
212 expect(pool
.opts
.errorHandler
).toBeUndefined()
213 expect(pool
.opts
.onlineHandler
).toBeUndefined()
214 expect(pool
.opts
.exitHandler
).toBeUndefined()
216 const testHandler
= () => console
.info('test handler executed')
217 pool
= new FixedThreadPool(
219 './tests/worker-files/thread/testWorker.js',
221 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
222 workerChoiceStrategyOptions
: {
223 runTime
: { median
: true },
224 weights
: { 0: 300, 1: 200 }
227 restartWorkerOnError
: false,
228 enableTasksQueue
: true,
229 tasksQueueOptions
: { concurrency
: 2 },
230 messageHandler
: testHandler
,
231 errorHandler
: testHandler
,
232 onlineHandler
: testHandler
,
233 exitHandler
: testHandler
236 expect(pool
.emitter
).toBeUndefined()
237 expect(pool
.opts
.enableEvents
).toBe(false)
238 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
239 expect(pool
.opts
.enableTasksQueue
).toBe(true)
240 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
244 expect(pool
.opts
.workerChoiceStrategy
).toBe(
245 WorkerChoiceStrategies
.LEAST_USED
247 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
249 runTime
: { median
: true },
250 waitTime
: { median
: false },
251 elu
: { median
: false },
252 weights
: { 0: 300, 1: 200 }
254 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
256 runTime
: { median
: true },
257 waitTime
: { median
: false },
258 elu
: { median
: false },
259 weights
: { 0: 300, 1: 200 }
261 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
262 .workerChoiceStrategies
) {
263 expect(workerChoiceStrategy
.opts
).toStrictEqual({
265 runTime
: { median
: true },
266 waitTime
: { median
: false },
267 elu
: { median
: false },
268 weights
: { 0: 300, 1: 200 }
271 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
272 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
273 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
274 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
278 it('Verify that pool options are validated', async () => {
283 './tests/worker-files/thread/testWorker.js',
285 workerChoiceStrategy
: 'invalidStrategy'
289 new Error("Invalid worker choice strategy 'invalidStrategy'")
295 './tests/worker-files/thread/testWorker.js',
297 workerChoiceStrategyOptions
: {
298 retries
: 'invalidChoiceRetries'
304 'Invalid worker choice strategy options: retries must be an integer'
311 './tests/worker-files/thread/testWorker.js',
313 workerChoiceStrategyOptions
: {
320 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
327 './tests/worker-files/thread/testWorker.js',
329 workerChoiceStrategyOptions
: { weights
: {} }
334 'Invalid worker choice strategy options: must have a weight for each worker node'
341 './tests/worker-files/thread/testWorker.js',
343 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
348 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 './tests/worker-files/thread/testWorker.js',
357 enableTasksQueue
: true,
358 tasksQueueOptions
: 'invalidTasksQueueOptions'
362 new TypeError('Invalid tasks queue options: must be a plain object')
368 './tests/worker-files/thread/testWorker.js',
370 enableTasksQueue
: true,
371 tasksQueueOptions
: { concurrency
: 0 }
376 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
383 './tests/worker-files/thread/testWorker.js',
385 enableTasksQueue
: true,
386 tasksQueueOptions
: { concurrency
: -1 }
391 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 './tests/worker-files/thread/testWorker.js',
400 enableTasksQueue
: true,
401 tasksQueueOptions
: { concurrency
: 0.2 }
405 new TypeError('Invalid worker node tasks concurrency: must be an integer')
411 './tests/worker-files/thread/testWorker.js',
413 enableTasksQueue
: true,
414 tasksQueueOptions
: { queueMaxSize
: 2 }
419 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
426 './tests/worker-files/thread/testWorker.js',
428 enableTasksQueue
: true,
429 tasksQueueOptions
: { size
: 0 }
434 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
441 './tests/worker-files/thread/testWorker.js',
443 enableTasksQueue
: true,
444 tasksQueueOptions
: { size
: -1 }
449 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
456 './tests/worker-files/thread/testWorker.js',
458 enableTasksQueue
: true,
459 tasksQueueOptions
: { size
: 0.2 }
463 new TypeError('Invalid worker node tasks queue size: must be an integer')
467 it('Verify that pool worker choice strategy options can be set', async () => {
468 const pool
= new FixedThreadPool(
470 './tests/worker-files/thread/testWorker.js',
471 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
473 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
475 runTime
: { median
: false },
476 waitTime
: { median
: false },
477 elu
: { median
: false }
479 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
481 runTime
: { median
: false },
482 waitTime
: { median
: false },
483 elu
: { median
: false }
485 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
486 .workerChoiceStrategies
) {
487 expect(workerChoiceStrategy
.opts
).toStrictEqual({
489 runTime
: { median
: false },
490 waitTime
: { median
: false },
491 elu
: { median
: false }
495 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
513 pool
.setWorkerChoiceStrategyOptions({
514 runTime
: { median
: true },
515 elu
: { median
: true }
517 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
519 runTime
: { median
: true },
520 waitTime
: { median
: false },
521 elu
: { median
: true }
523 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
525 runTime
: { median
: true },
526 waitTime
: { median
: false },
527 elu
: { median
: true }
529 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
530 .workerChoiceStrategies
) {
531 expect(workerChoiceStrategy
.opts
).toStrictEqual({
533 runTime
: { median
: true },
534 waitTime
: { median
: false },
535 elu
: { median
: true }
539 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
557 pool
.setWorkerChoiceStrategyOptions({
558 runTime
: { median
: false },
559 elu
: { median
: false }
561 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
563 runTime
: { median
: false },
564 waitTime
: { median
: false },
565 elu
: { median
: false }
567 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
569 runTime
: { median
: false },
570 waitTime
: { median
: false },
571 elu
: { median
: false }
573 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
574 .workerChoiceStrategies
) {
575 expect(workerChoiceStrategy
.opts
).toStrictEqual({
577 runTime
: { median
: false },
578 waitTime
: { median
: false },
579 elu
: { median
: false }
583 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
602 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
605 'Invalid worker choice strategy options: must be a plain object'
609 pool
.setWorkerChoiceStrategyOptions({
610 retries
: 'invalidChoiceRetries'
614 'Invalid worker choice strategy options: retries must be an integer'
618 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
621 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
625 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
628 'Invalid worker choice strategy options: must have a weight for each worker node'
632 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool
= new FixedThreadPool(
644 './tests/worker-files/thread/testWorker.js'
646 expect(pool
.opts
.enableTasksQueue
).toBe(false)
647 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
648 pool
.enableTasksQueue(true)
649 expect(pool
.opts
.enableTasksQueue
).toBe(true)
650 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
654 pool
.enableTasksQueue(true, { concurrency
: 2 })
655 expect(pool
.opts
.enableTasksQueue
).toBe(true)
656 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
660 pool
.enableTasksQueue(false)
661 expect(pool
.opts
.enableTasksQueue
).toBe(false)
662 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
666 it('Verify that pool tasks queue options can be set', async () => {
667 const pool
= new FixedThreadPool(
669 './tests/worker-files/thread/testWorker.js',
670 { enableTasksQueue
: true }
672 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
676 pool
.setTasksQueueOptions({ concurrency
: 2 })
677 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
682 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
684 new TypeError('Invalid tasks queue options: must be a plain object')
686 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
688 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
691 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
693 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
696 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
697 new TypeError('Invalid worker node tasks concurrency: must be an integer')
699 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
701 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
704 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
706 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
709 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
711 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
714 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
715 new TypeError('Invalid worker node tasks queue size: must be an integer')
720 it('Verify that pool info is set', async () => {
721 let pool
= new FixedThreadPool(
723 './tests/worker-files/thread/testWorker.js'
725 expect(pool
.info
).toStrictEqual({
727 type
: PoolTypes
.fixed
,
728 worker
: WorkerTypes
.thread
,
730 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
731 minSize
: numberOfWorkers
,
732 maxSize
: numberOfWorkers
,
733 workerNodes
: numberOfWorkers
,
734 idleWorkerNodes
: numberOfWorkers
,
741 pool
= new DynamicClusterPool(
742 Math
.floor(numberOfWorkers
/ 2),
744 './tests/worker-files/cluster/testWorker.js'
746 expect(pool
.info
).toStrictEqual({
748 type
: PoolTypes
.dynamic
,
749 worker
: WorkerTypes
.cluster
,
751 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
752 minSize
: Math
.floor(numberOfWorkers
/ 2),
753 maxSize
: numberOfWorkers
,
754 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
755 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
764 it('Verify that pool worker tasks usage are initialized', async () => {
765 const pool
= new FixedClusterPool(
767 './tests/worker-files/cluster/testWorker.js'
769 for (const workerNode
of pool
.workerNodes
) {
770 expect(workerNode
.usage
).toStrictEqual({
780 history
: new CircularArray()
783 history
: new CircularArray()
787 history
: new CircularArray()
790 history
: new CircularArray()
798 it('Verify that pool worker tasks queue are initialized', async () => {
799 let pool
= new FixedClusterPool(
801 './tests/worker-files/cluster/testWorker.js'
803 for (const workerNode
of pool
.workerNodes
) {
804 expect(workerNode
.tasksQueue
).toBeDefined()
805 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
806 expect(workerNode
.tasksQueue
.size
).toBe(0)
807 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
810 pool
= new DynamicThreadPool(
811 Math
.floor(numberOfWorkers
/ 2),
813 './tests/worker-files/thread/testWorker.js'
815 for (const workerNode
of pool
.workerNodes
) {
816 expect(workerNode
.tasksQueue
).toBeDefined()
817 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
818 expect(workerNode
.tasksQueue
.size
).toBe(0)
819 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
824 it('Verify that pool worker info are initialized', async () => {
825 let pool
= new FixedClusterPool(
827 './tests/worker-files/cluster/testWorker.js'
829 for (const workerNode
of pool
.workerNodes
) {
830 expect(workerNode
.info
).toStrictEqual({
831 id
: expect
.any(Number
),
832 type
: WorkerTypes
.cluster
,
838 pool
= new DynamicThreadPool(
839 Math
.floor(numberOfWorkers
/ 2),
841 './tests/worker-files/thread/testWorker.js'
843 for (const workerNode
of pool
.workerNodes
) {
844 expect(workerNode
.info
).toStrictEqual({
845 id
: expect
.any(Number
),
846 type
: WorkerTypes
.thread
,
854 it('Verify that pool execute() arguments are checked', async () => {
855 const pool
= new FixedClusterPool(
857 './tests/worker-files/cluster/testWorker.js'
859 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
860 new TypeError('name argument must be a string')
862 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
863 new TypeError('name argument must not be an empty string')
865 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
866 new TypeError('transferList argument must be an array')
868 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
869 "Task function 'unknown' not found"
872 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
873 new Error('Cannot execute a task on destroyed pool')
877 it('Verify that pool worker tasks usage are computed', async () => {
878 const pool
= new FixedClusterPool(
880 './tests/worker-files/cluster/testWorker.js'
882 const promises
= new Set()
883 const maxMultiplier
= 2
884 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
885 promises
.add(pool
.execute())
887 for (const workerNode
of pool
.workerNodes
) {
888 expect(workerNode
.usage
).toStrictEqual({
891 executing
: maxMultiplier
,
898 history
: expect
.any(CircularArray
)
901 history
: expect
.any(CircularArray
)
905 history
: expect
.any(CircularArray
)
908 history
: expect
.any(CircularArray
)
913 await Promise
.all(promises
)
914 for (const workerNode
of pool
.workerNodes
) {
915 expect(workerNode
.usage
).toStrictEqual({
917 executed
: maxMultiplier
,
925 history
: expect
.any(CircularArray
)
928 history
: expect
.any(CircularArray
)
932 history
: expect
.any(CircularArray
)
935 history
: expect
.any(CircularArray
)
943 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
944 const pool
= new DynamicThreadPool(
945 Math
.floor(numberOfWorkers
/ 2),
947 './tests/worker-files/thread/testWorker.js'
949 const promises
= new Set()
950 const maxMultiplier
= 2
951 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
952 promises
.add(pool
.execute())
954 await Promise
.all(promises
)
955 for (const workerNode
of pool
.workerNodes
) {
956 expect(workerNode
.usage
).toStrictEqual({
958 executed
: expect
.any(Number
),
966 history
: expect
.any(CircularArray
)
969 history
: expect
.any(CircularArray
)
973 history
: expect
.any(CircularArray
)
976 history
: expect
.any(CircularArray
)
980 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
981 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
982 numberOfWorkers
* maxMultiplier
984 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
985 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
986 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
987 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
989 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
990 for (const workerNode
of pool
.workerNodes
) {
991 expect(workerNode
.usage
).toStrictEqual({
1001 history
: expect
.any(CircularArray
)
1004 history
: expect
.any(CircularArray
)
1008 history
: expect
.any(CircularArray
)
1011 history
: expect
.any(CircularArray
)
1015 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1016 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1017 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1018 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1020 await pool
.destroy()
1023 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1024 const pool
= new DynamicClusterPool(
1025 Math
.floor(numberOfWorkers
/ 2),
1027 './tests/worker-files/cluster/testWorker.js'
1031 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
1035 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1036 expect(poolReady
).toBe(1)
1037 expect(poolInfo
).toStrictEqual({
1039 type
: PoolTypes
.dynamic
,
1040 worker
: WorkerTypes
.cluster
,
1042 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1043 minSize
: expect
.any(Number
),
1044 maxSize
: expect
.any(Number
),
1045 workerNodes
: expect
.any(Number
),
1046 idleWorkerNodes
: expect
.any(Number
),
1047 busyWorkerNodes
: expect
.any(Number
),
1048 executedTasks
: expect
.any(Number
),
1049 executingTasks
: expect
.any(Number
),
1050 failedTasks
: expect
.any(Number
)
1052 await pool
.destroy()
1055 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1056 const pool
= new FixedThreadPool(
1058 './tests/worker-files/thread/testWorker.js'
1060 const promises
= new Set()
1063 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
1067 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1068 promises
.add(pool
.execute())
1070 await Promise
.all(promises
)
1071 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1072 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1073 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1074 expect(poolInfo
).toStrictEqual({
1076 type
: PoolTypes
.fixed
,
1077 worker
: WorkerTypes
.thread
,
1078 ready
: expect
.any(Boolean
),
1079 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1080 minSize
: expect
.any(Number
),
1081 maxSize
: expect
.any(Number
),
1082 workerNodes
: expect
.any(Number
),
1083 idleWorkerNodes
: expect
.any(Number
),
1084 busyWorkerNodes
: expect
.any(Number
),
1085 executedTasks
: expect
.any(Number
),
1086 executingTasks
: expect
.any(Number
),
1087 failedTasks
: expect
.any(Number
)
1089 await pool
.destroy()
1092 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1093 const pool
= new DynamicThreadPool(
1094 Math
.floor(numberOfWorkers
/ 2),
1096 './tests/worker-files/thread/testWorker.js'
1098 const promises
= new Set()
1101 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
1105 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1106 promises
.add(pool
.execute())
1108 await Promise
.all(promises
)
1109 expect(poolFull
).toBe(1)
1110 expect(poolInfo
).toStrictEqual({
1112 type
: PoolTypes
.dynamic
,
1113 worker
: WorkerTypes
.thread
,
1114 ready
: expect
.any(Boolean
),
1115 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1116 minSize
: expect
.any(Number
),
1117 maxSize
: expect
.any(Number
),
1118 workerNodes
: expect
.any(Number
),
1119 idleWorkerNodes
: expect
.any(Number
),
1120 busyWorkerNodes
: expect
.any(Number
),
1121 executedTasks
: expect
.any(Number
),
1122 executingTasks
: expect
.any(Number
),
1123 failedTasks
: expect
.any(Number
)
1125 await pool
.destroy()
1128 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1129 const pool
= new FixedThreadPool(
1131 './tests/worker-files/thread/testWorker.js',
1133 enableTasksQueue
: true
1136 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1137 const promises
= new Set()
1138 let poolBackPressure
= 0
1140 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1144 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1145 promises
.add(pool
.execute())
1147 await Promise
.all(promises
)
1148 expect(poolBackPressure
).toBe(1)
1149 expect(poolInfo
).toStrictEqual({
1151 type
: PoolTypes
.fixed
,
1152 worker
: WorkerTypes
.thread
,
1153 ready
: expect
.any(Boolean
),
1154 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1155 minSize
: expect
.any(Number
),
1156 maxSize
: expect
.any(Number
),
1157 workerNodes
: expect
.any(Number
),
1158 idleWorkerNodes
: expect
.any(Number
),
1159 busyWorkerNodes
: expect
.any(Number
),
1160 executedTasks
: expect
.any(Number
),
1161 executingTasks
: expect
.any(Number
),
1162 maxQueuedTasks
: expect
.any(Number
),
1163 queuedTasks
: expect
.any(Number
),
1165 stolenTasks
: expect
.any(Number
),
1166 failedTasks
: expect
.any(Number
)
1168 expect(pool
.hasBackPressure
.called
).toBe(true)
1169 await pool
.destroy()
1172 it('Verify that listTaskFunctions() is working', async () => {
1173 const dynamicThreadPool
= new DynamicThreadPool(
1174 Math
.floor(numberOfWorkers
/ 2),
1176 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1178 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1179 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1181 'jsonIntegerSerialization',
1185 const fixedClusterPool
= new FixedClusterPool(
1187 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1189 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1190 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1192 'jsonIntegerSerialization',
1196 await dynamicThreadPool
.destroy()
1197 await fixedClusterPool
.destroy()
1200 it('Verify that multiple task functions worker is working', async () => {
1201 const pool
= new DynamicClusterPool(
1202 Math
.floor(numberOfWorkers
/ 2),
1204 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1206 const data
= { n
: 10 }
1207 const result0
= await pool
.execute(data
)
1208 expect(result0
).toStrictEqual({ ok
: 1 })
1209 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1210 expect(result1
).toStrictEqual({ ok
: 1 })
1211 const result2
= await pool
.execute(data
, 'factorial')
1212 expect(result2
).toBe(3628800)
1213 const result3
= await pool
.execute(data
, 'fibonacci')
1214 expect(result3
).toBe(55)
1215 expect(pool
.info
.executingTasks
).toBe(0)
1216 expect(pool
.info
.executedTasks
).toBe(4)
1217 for (const workerNode
of pool
.workerNodes
) {
1218 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1220 'jsonIntegerSerialization',
1224 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1225 for (const name
of pool
.listTaskFunctions()) {
1226 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1228 executed
: expect
.any(Number
),
1235 history
: expect
.any(CircularArray
)
1238 history
: expect
.any(CircularArray
)
1242 history
: expect
.any(CircularArray
)
1245 history
: expect
.any(CircularArray
)
1250 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executed
1251 ).toBeGreaterThan(0)
1254 workerNode
.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME
)
1256 workerNode
.getTaskFunctionWorkerUsage(workerNode
.info
.taskFunctions
[1])
1259 await pool
.destroy()
1262 it('Verify sendKillMessageToWorker()', async () => {
1263 const pool
= new DynamicClusterPool(
1264 Math
.floor(numberOfWorkers
/ 2),
1266 './tests/worker-files/cluster/testWorker.js'
1268 const workerNodeKey
= 0
1270 pool
.sendKillMessageToWorker(
1272 pool
.workerNodes
[workerNodeKey
].info
.id
1274 ).resolves
.toBeUndefined()
1275 await pool
.destroy()