1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedClusterPool(
11 './tests/worker-files/cluster/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedClusterPool(
18 './tests/worker-files/cluster/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedClusterPool(
29 './tests/worker-files/cluster/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedClusterPool(
34 './tests/worker-files/cluster/echoWorker.js'
36 const errorPool
= new FixedClusterPool(
38 './tests/worker-files/cluster/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedClusterPool(
45 './tests/worker-files/cluster/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedClusterPool(
52 './tests/worker-files/cluster/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker cluster', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedClusterPool(
84 './tests/worker-files/cluster/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises
= new Set()
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
100 promises
.add(pool
.execute())
102 await Promise
.all(promises
)
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
105 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
108 it('Verify that tasks queuing is working', async () => {
109 const promises
= new Set()
110 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
111 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
112 promises
.add(queuePool
.execute())
114 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
115 for (const workerNode
of queuePool
.workerNodes
) {
116 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
124 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
125 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(queuePool
.info
.executingTasks
).toBe(
129 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
131 expect(queuePool
.info
.queuedTasks
).toBe(
133 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
135 expect(queuePool
.info
.maxQueuedTasks
).toBe(
137 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
139 expect(queuePool
.info
.backPressure
).toBe(false)
140 await Promise
.all(promises
)
141 for (const workerNode
of queuePool
.workerNodes
) {
142 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
143 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
144 numberOfWorkers
* maxMultiplier
146 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
147 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
148 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
149 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
154 it('Verify that is possible to have a worker that return undefined', async () => {
155 const result
= await emptyPool
.execute()
156 expect(result
).toBeUndefined()
159 it('Verify that data are sent to the worker correctly', async () => {
160 const data
= { f
: 10 }
161 const result
= await echoPool
.execute(data
)
162 expect(result
).toStrictEqual(data
)
165 it('Verify that error handling is working properly:sync', async () => {
166 const data
= { f
: 10 }
168 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
173 await errorPool
.execute(data
)
177 expect(inError
).toBeDefined()
178 expect(typeof inError
=== 'string').toBe(true)
179 expect(inError
).toBe('Error Message from ClusterWorker')
180 expect(taskError
).toStrictEqual({
182 message
: 'Error Message from ClusterWorker',
186 errorPool
.workerNodes
.some(
187 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
192 it('Verify that error handling is working properly:async', async () => {
193 const data
= { f
: 10 }
195 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
200 await asyncErrorPool
.execute(data
)
204 expect(inError
).toBeDefined()
205 expect(typeof inError
=== 'string').toBe(true)
206 expect(inError
).toBe('Error Message from ClusterWorker:async')
207 expect(taskError
).toStrictEqual({
209 message
: 'Error Message from ClusterWorker:async',
213 asyncErrorPool
.workerNodes
.some(
214 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
219 it('Verify that async function is working properly', async () => {
220 const data
= { f
: 10 }
221 const startTime
= performance
.now()
222 const result
= await asyncPool
.execute(data
)
223 const usedTime
= performance
.now() - startTime
224 expect(result
).toStrictEqual(data
)
225 expect(usedTime
).toBeGreaterThanOrEqual(2000)
228 it('Shutdown test', async () => {
229 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
231 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
233 const numberOfExitEvents
= await exitPromise
234 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
235 expect(poolDestroy
).toBe(1)
238 it('Verify that cluster pool options are checked', async () => {
239 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
240 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
241 expect(pool1
.opts
.env
).toBeUndefined()
242 expect(pool1
.opts
.settings
).toBeUndefined()
243 await pool1
.destroy()
244 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
245 env
: { TEST
: 'test' },
246 settings
: { args
: ['--use', 'http'], silent
: true }
248 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
249 expect(pool1
.opts
.settings
).toStrictEqual({
250 args
: ['--use', 'http'],
253 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
254 args
: ['--use', 'http'],
258 await pool1
.destroy()
261 it('Should work even without opts in input', async () => {
262 const pool1
= new FixedClusterPool(
264 './tests/worker-files/cluster/testWorker.js'
266 const res
= await pool1
.execute()
267 expect(res
).toStrictEqual({ ok
: 1 })
268 // We need to clean up the resources after our test
269 await pool1
.destroy()
272 it('Verify that a pool with zero worker fails', async () => {
275 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
276 ).toThrowError('Cannot instantiate a fixed pool with zero worker')