1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedClusterPool(
11 './tests/worker-files/cluster/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedClusterPool(
18 './tests/worker-files/cluster/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedClusterPool(
29 './tests/worker-files/cluster/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedClusterPool(
34 './tests/worker-files/cluster/echoWorker.js'
36 const errorPool
= new FixedClusterPool(
38 './tests/worker-files/cluster/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedClusterPool(
45 './tests/worker-files/cluster/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedClusterPool(
52 './tests/worker-files/cluster/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker cluster', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedClusterPool(
84 './tests/worker-files/cluster/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises
= new Set()
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
100 promises
.add(pool
.execute())
102 await Promise
.all(promises
)
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
105 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
108 it('Verify that tasks queuing is working', async () => {
109 const promises
= new Set()
110 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
111 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
112 promises
.add(queuePool
.execute())
114 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
115 for (const workerNode
of queuePool
.workerNodes
) {
116 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
124 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
125 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(queuePool
.info
.executingTasks
).toBe(
129 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
131 expect(queuePool
.info
.queuedTasks
).toBe(
133 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
135 expect(queuePool
.info
.maxQueuedTasks
).toBe(
137 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
139 expect(queuePool
.info
.backPressure
).toBe(false)
140 await Promise
.all(promises
)
141 for (const workerNode
of queuePool
.workerNodes
) {
142 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
143 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
144 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
145 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
146 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
151 it('Verify that is possible to have a worker that return undefined', async () => {
152 const result
= await emptyPool
.execute()
153 expect(result
).toBeUndefined()
156 it('Verify that data are sent to the worker correctly', async () => {
157 const data
= { f
: 10 }
158 const result
= await echoPool
.execute(data
)
159 expect(result
).toStrictEqual(data
)
162 it('Verify that error handling is working properly:sync', async () => {
163 const data
= { f
: 10 }
165 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
170 await errorPool
.execute(data
)
174 expect(inError
).toBeDefined()
175 expect(typeof inError
=== 'string').toBe(true)
176 expect(inError
).toBe('Error Message from ClusterWorker')
177 expect(taskError
).toStrictEqual({
179 message
: 'Error Message from ClusterWorker',
183 errorPool
.workerNodes
.some(
184 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
189 it('Verify that error handling is working properly:async', async () => {
190 const data
= { f
: 10 }
192 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
197 await asyncErrorPool
.execute(data
)
201 expect(inError
).toBeDefined()
202 expect(typeof inError
=== 'string').toBe(true)
203 expect(inError
).toBe('Error Message from ClusterWorker:async')
204 expect(taskError
).toStrictEqual({
206 message
: 'Error Message from ClusterWorker:async',
210 asyncErrorPool
.workerNodes
.some(
211 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
216 it('Verify that async function is working properly', async () => {
217 const data
= { f
: 10 }
218 const startTime
= performance
.now()
219 const result
= await asyncPool
.execute(data
)
220 const usedTime
= performance
.now() - startTime
221 expect(result
).toStrictEqual(data
)
222 expect(usedTime
).toBeGreaterThanOrEqual(2000)
225 it('Shutdown test', async () => {
226 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
228 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
230 const numberOfExitEvents
= await exitPromise
231 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
232 expect(poolDestroy
).toBe(1)
235 it('Verify that cluster pool options are checked', async () => {
236 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
237 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
238 expect(pool1
.opts
.env
).toBeUndefined()
239 expect(pool1
.opts
.settings
).toBeUndefined()
240 await pool1
.destroy()
241 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
242 env
: { TEST
: 'test' },
243 settings
: { args
: ['--use', 'http'], silent
: true }
245 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
246 expect(pool1
.opts
.settings
).toStrictEqual({
247 args
: ['--use', 'http'],
250 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
251 args
: ['--use', 'http'],
255 await pool1
.destroy()
258 it('Should work even without opts in input', async () => {
259 const pool1
= new FixedClusterPool(
261 './tests/worker-files/cluster/testWorker.js'
263 const res
= await pool1
.execute()
264 expect(res
).toStrictEqual({ ok
: 1 })
265 // We need to clean up the resources after our test
266 await pool1
.destroy()
269 it('Verify that a pool with zero worker fails', async () => {
272 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
273 ).toThrowError('Cannot instantiate a fixed pool with zero worker')