test: improve event emitter tests
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
6cd5248f 5const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
325f50bc 6
079de991 7describe('Fixed cluster pool test suite', () => {
56d11670 8 const numberOfWorkers = 8
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedClusterPool(
11 numberOfWorkers,
12 './tests/worker-files/cluster/testWorker.js',
13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedClusterPool(
18 numberOfWorkers,
19 './tests/worker-files/cluster/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
041dc05b 25 errorHandler: e => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedClusterPool(
29 numberOfWorkers,
30 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/echoWorker.js'
36 )
37 const errorPool = new FixedClusterPool(
38 numberOfWorkers,
39 './tests/worker-files/cluster/errorWorker.js',
40 {
041dc05b 41 errorHandler: e => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedClusterPool(
45 numberOfWorkers,
46 './tests/worker-files/cluster/asyncErrorWorker.js',
47 {
041dc05b 48 errorHandler: e => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedClusterPool(
52 numberOfWorkers,
53 './tests/worker-files/cluster/asyncWorker.js'
54 )
55
8bc77620
APA
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
8bc77620
APA
64 })
65
325f50bc 66 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
325f50bc
S
75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedClusterPool(
079de991
JB
84 numberOfWorkers,
85 './tests/worker-files/cluster/testWorker.js',
86 {
041dc05b 87 errorHandler: e => console.error(e)
079de991
JB
88 }
89 )
c726f66c 90 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 91 let poolReady = 0
0fe39c97
JB
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 95 expect(poolReady).toBe(1)
0fe39c97 96 await pool.destroy()
216541b6
JB
97 })
98
94407def 99 it("Verify that 'busy' event is emitted", async () => {
c726f66c 100 expect(pool.emitter.eventNames()).toStrictEqual([])
94407def 101 const promises = new Set()
7c0ba920 102 let poolBusy = 0
aee46736 103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 105 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 106 promises.add(pool.execute())
7c0ba920 107 }
94407def 108 await Promise.all(promises)
14916bf9
JB
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
112 })
113
594bfb84 114 it('Verify that tasks queuing is working', async () => {
d3d4b67d 115 const promises = new Set()
4e377863 116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 117 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 118 promises.add(queuePool.execute())
594bfb84 119 }
d3d4b67d 120 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 121 for (const workerNode of queuePool.workerNodes) {
5bb5be17 122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
f59e1027 126 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
56d11670 133 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 134 }
56d11670 135 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
138 )
6b27d407 139 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
140 numberOfWorkers *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
142 )
143 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
144 numberOfWorkers *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 146 )
a1763c54 147 expect(queuePool.info.backPressure).toBe(false)
56d11670 148 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfWorkers * maxMultiplier
154 )
4e377863 155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 156 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
159 )
56d11670
JB
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfWorkers * maxMultiplier
163 )
594bfb84 164 }
56d11670
JB
165 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfWorkers * maxMultiplier
170 )
594bfb84
JB
171 })
172
325f50bc
S
173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
6db75ad9 175 expect(result).toBeUndefined()
325f50bc
S
176 })
177
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
e1ffb94f 181 expect(result).toStrictEqual(data)
325f50bc
S
182 })
183
184 it('Verify that error handling is working properly:sync', async () => {
185 const data = { f: 10 }
c726f66c 186 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 187 let taskError
041dc05b 188 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
189 taskError = e
190 })
c726f66c 191 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
325f50bc
S
192 let inError
193 try {
194 await errorPool.execute(data)
195 } catch (e) {
196 inError = e
197 }
198 expect(inError).toBeDefined()
8620fb25 199 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
200 expect(inError).toBe('Error Message from ClusterWorker')
201 expect(taskError).toStrictEqual({
6cd5248f 202 name: DEFAULT_TASK_NAME,
985d0e79
JB
203 message: 'Error Message from ClusterWorker',
204 data
205 })
18482cec
JB
206 expect(
207 errorPool.workerNodes.some(
041dc05b 208 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
209 )
210 ).toBe(true)
325f50bc
S
211 })
212
213 it('Verify that error handling is working properly:async', async () => {
214 const data = { f: 10 }
c726f66c 215 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 216 let taskError
041dc05b 217 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
218 taskError = e
219 })
c726f66c
JB
220 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
221 PoolEvents.taskError
222 ])
325f50bc
S
223 let inError
224 try {
225 await asyncErrorPool.execute(data)
226 } catch (e) {
227 inError = e
228 }
229 expect(inError).toBeDefined()
8620fb25 230 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
231 expect(inError).toBe('Error Message from ClusterWorker:async')
232 expect(taskError).toStrictEqual({
6cd5248f 233 name: DEFAULT_TASK_NAME,
985d0e79
JB
234 message: 'Error Message from ClusterWorker:async',
235 data
236 })
18482cec
JB
237 expect(
238 asyncErrorPool.workerNodes.some(
041dc05b 239 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
240 )
241 ).toBe(true)
325f50bc
S
242 })
243
244 it('Verify that async function is working properly', async () => {
245 const data = { f: 10 }
15e5141f 246 const startTime = performance.now()
325f50bc 247 const result = await asyncPool.execute(data)
15e5141f 248 const usedTime = performance.now() - startTime
e1ffb94f 249 expect(result).toStrictEqual(data)
325f50bc
S
250 expect(usedTime).toBeGreaterThanOrEqual(2000)
251 })
252
253 it('Shutdown test', async () => {
bac873bd 254 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
c726f66c 255 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
256 let poolDestroy = 0
257 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
258 expect(pool.emitter.eventNames()).toStrictEqual([
259 PoolEvents.busy,
260 PoolEvents.destroy
261 ])
45dbbb14 262 await pool.destroy()
bdacc2d2 263 const numberOfExitEvents = await exitPromise
bb9423b7
JB
264 expect(pool.started).toBe(false)
265 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 266 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 267 expect(poolDestroy).toBe(1)
325f50bc
S
268 })
269
1a76932b
JB
270 it('Verify that cluster pool options are checked', async () => {
271 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
0fe39c97
JB
272 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
273 expect(pool.opts.env).toBeUndefined()
274 expect(pool.opts.settings).toBeUndefined()
275 await pool.destroy()
276 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
1a76932b
JB
277 env: { TEST: 'test' },
278 settings: { args: ['--use', 'http'], silent: true }
279 })
0fe39c97
JB
280 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
281 expect(pool.opts.settings).toStrictEqual({
1a76932b
JB
282 args: ['--use', 'http'],
283 silent: true
284 })
0fe39c97 285 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
1a76932b
JB
286 args: ['--use', 'http'],
287 silent: true,
288 exec: workerFilePath
289 })
0fe39c97 290 await pool.destroy()
1a76932b
JB
291 })
292
325f50bc 293 it('Should work even without opts in input', async () => {
7d27ec0d
JB
294 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
295 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
0fe39c97 296 const res = await pool.execute()
30b963d4 297 expect(res).toStrictEqual({ ok: 1 })
8bc77620 298 // We need to clean up the resources after our test
0fe39c97 299 await pool.destroy()
325f50bc 300 })
8d3782fa 301
7d27ec0d
JB
302 it('Verify destroyWorkerNode()', async () => {
303 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
304 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
52a23942 305 const workerNodeKey = 0
7d27ec0d 306 let disconnectEvent = 0
52a23942 307 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
7d27ec0d
JB
308 ++disconnectEvent
309 })
310 let exitEvent = 0
52a23942 311 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
312 ++exitEvent
313 })
52a23942 314 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
315 expect(disconnectEvent).toBe(1)
316 expect(exitEvent).toBe(1)
317 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
318 await pool.destroy()
319 })
320
8d3782fa
JB
321 it('Verify that a pool with zero worker fails', async () => {
322 expect(
323 () =>
324 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 325 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 326 })
325f50bc 327})