refactor: use PoolEvents enum
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
CommitLineData
a074ffee 1import { expect } from 'expect'
ded253e2 2
d35e5717 3import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
ded253e2 4import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
d35e5717
JB
5import { TaskFunctions } from '../../test-types.cjs'
6import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
325f50bc 7
079de991 8describe('Fixed cluster pool test suite', () => {
56d11670 9 const numberOfWorkers = 8
4e377863 10 const tasksConcurrency = 2
e1ffb94f
JB
11 const pool = new FixedClusterPool(
12 numberOfWorkers,
d35e5717 13 './tests/worker-files/cluster/testWorker.cjs',
e1ffb94f 14 {
041dc05b 15 errorHandler: e => console.error(e)
e1ffb94f
JB
16 }
17 )
594bfb84
JB
18 const queuePool = new FixedClusterPool(
19 numberOfWorkers,
d35e5717 20 './tests/worker-files/cluster/testWorker.cjs',
594bfb84
JB
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
4e377863 24 concurrency: tasksConcurrency
594bfb84 25 },
041dc05b 26 errorHandler: e => console.error(e)
594bfb84
JB
27 }
28 )
e1ffb94f
JB
29 const emptyPool = new FixedClusterPool(
30 numberOfWorkers,
d35e5717 31 './tests/worker-files/cluster/emptyWorker.cjs',
73bfd59d 32 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
33 )
34 const echoPool = new FixedClusterPool(
35 numberOfWorkers,
d35e5717 36 './tests/worker-files/cluster/echoWorker.cjs'
e1ffb94f
JB
37 )
38 const errorPool = new FixedClusterPool(
39 numberOfWorkers,
d35e5717 40 './tests/worker-files/cluster/errorWorker.cjs',
e1ffb94f 41 {
041dc05b 42 errorHandler: e => console.error(e)
e1ffb94f
JB
43 }
44 )
45 const asyncErrorPool = new FixedClusterPool(
46 numberOfWorkers,
d35e5717 47 './tests/worker-files/cluster/asyncErrorWorker.cjs',
e1ffb94f 48 {
041dc05b 49 errorHandler: e => console.error(e)
e1ffb94f
JB
50 }
51 )
52 const asyncPool = new FixedClusterPool(
53 numberOfWorkers,
d35e5717 54 './tests/worker-files/cluster/asyncWorker.cjs'
e1ffb94f
JB
55 )
56
8bc77620
APA
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
594bfb84 64 await queuePool.destroy()
8bc77620
APA
65 })
66
325f50bc 67 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 68 let result = await pool.execute({
dbca3be9 69 function: TaskFunctions.fibonacci
6db75ad9 70 })
024daf59 71 expect(result).toBe(75025)
6db75ad9 72 result = await pool.execute({
dbca3be9 73 function: TaskFunctions.factorial
6db75ad9 74 })
70a4f5ea 75 expect(result).toBe(9.33262154439441e157)
325f50bc
S
76 })
77
318d4156 78 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 79 const result = await pool.execute()
30b963d4 80 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
81 })
82
1dcbfefc 83 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 84 const pool = new FixedClusterPool(
079de991 85 numberOfWorkers,
d35e5717 86 './tests/worker-files/cluster/testWorker.cjs',
079de991 87 {
041dc05b 88 errorHandler: e => console.error(e)
079de991
JB
89 }
90 )
c726f66c 91 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 92 let poolReady = 0
0fe39c97
JB
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 96 expect(poolReady).toBe(1)
0fe39c97 97 await pool.destroy()
216541b6
JB
98 })
99
94407def
JB
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
b04bd0f1 102 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 103 let poolBusy = 0
aee46736 104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 106 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 107 promises.add(pool.execute())
7c0ba920 108 }
94407def 109 await Promise.all(promises)
14916bf9
JB
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
113 })
114
594bfb84 115 it('Verify that tasks queuing is working', async () => {
d3d4b67d 116 const promises = new Set()
4e377863 117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 118 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 119 promises.add(queuePool.execute())
594bfb84 120 }
d3d4b67d 121 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 122 for (const workerNode of queuePool.workerNodes) {
5bb5be17 123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
125 queuePool.opts.tasksQueueOptions.concurrency
126 )
f59e1027 127 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 )
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 )
463226a4 134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
56d11670 135 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 136 }
56d11670 137 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
140 )
6b27d407 141 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
142 numberOfWorkers *
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
144 )
145 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
146 numberOfWorkers *
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 148 )
a1763c54 149 expect(queuePool.info.backPressure).toBe(false)
56d11670 150 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfWorkers * maxMultiplier
156 )
4e377863 157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 158 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 )
463226a4
JB
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
163 0
164 )
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfWorkers * maxMultiplier
167 )
56d11670
JB
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfWorkers * maxMultiplier
171 )
594bfb84 172 }
56d11670
JB
173 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfWorkers * maxMultiplier
178 )
594bfb84
JB
179 })
180
325f50bc
S
181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
6db75ad9 183 expect(result).toBeUndefined()
325f50bc
S
184 })
185
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
e1ffb94f 189 expect(result).toStrictEqual(data)
325f50bc
S
190 })
191
192 it('Verify that error handling is working properly:sync', async () => {
193 const data = { f: 10 }
c726f66c 194 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 195 let taskError
041dc05b 196 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
197 taskError = e
198 })
c726f66c 199 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
325f50bc
S
200 let inError
201 try {
202 await errorPool.execute(data)
203 } catch (e) {
204 inError = e
205 }
206 expect(inError).toBeDefined()
8620fb25 207 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
208 expect(inError).toBe('Error Message from ClusterWorker')
209 expect(taskError).toStrictEqual({
6cd5248f 210 name: DEFAULT_TASK_NAME,
985d0e79
JB
211 message: 'Error Message from ClusterWorker',
212 data
213 })
18482cec
JB
214 expect(
215 errorPool.workerNodes.some(
041dc05b 216 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
217 )
218 ).toBe(true)
325f50bc
S
219 })
220
221 it('Verify that error handling is working properly:async', async () => {
222 const data = { f: 10 }
c726f66c 223 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 224 let taskError
041dc05b 225 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
226 taskError = e
227 })
c726f66c
JB
228 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
229 PoolEvents.taskError
230 ])
325f50bc
S
231 let inError
232 try {
233 await asyncErrorPool.execute(data)
234 } catch (e) {
235 inError = e
236 }
237 expect(inError).toBeDefined()
8620fb25 238 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
239 expect(inError).toBe('Error Message from ClusterWorker:async')
240 expect(taskError).toStrictEqual({
6cd5248f 241 name: DEFAULT_TASK_NAME,
985d0e79
JB
242 message: 'Error Message from ClusterWorker:async',
243 data
244 })
18482cec
JB
245 expect(
246 asyncErrorPool.workerNodes.some(
041dc05b 247 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
248 )
249 ).toBe(true)
325f50bc
S
250 })
251
252 it('Verify that async function is working properly', async () => {
253 const data = { f: 10 }
15e5141f 254 const startTime = performance.now()
325f50bc 255 const result = await asyncPool.execute(data)
15e5141f 256 const usedTime = performance.now() - startTime
e1ffb94f 257 expect(result).toStrictEqual(data)
325f50bc
S
258 expect(usedTime).toBeGreaterThanOrEqual(2000)
259 })
260
261 it('Shutdown test', async () => {
bac873bd 262 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
c726f66c 263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
264 let poolDestroy = 0
265 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
266 expect(pool.emitter.eventNames()).toStrictEqual([
267 PoolEvents.busy,
268 PoolEvents.destroy
269 ])
45dbbb14 270 await pool.destroy()
bdacc2d2 271 const numberOfExitEvents = await exitPromise
bb9423b7 272 expect(pool.started).toBe(false)
a621172f
JB
273 expect(pool.emitter.eventNames()).toStrictEqual([
274 PoolEvents.busy,
275 PoolEvents.destroy
276 ])
55082af9 277 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 278 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 279 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 280 expect(poolDestroy).toBe(1)
325f50bc
S
281 })
282
1a76932b 283 it('Verify that cluster pool options are checked', async () => {
d35e5717 284 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
0fe39c97
JB
285 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
286 expect(pool.opts.env).toBeUndefined()
287 expect(pool.opts.settings).toBeUndefined()
288 await pool.destroy()
289 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
1a76932b
JB
290 env: { TEST: 'test' },
291 settings: { args: ['--use', 'http'], silent: true }
292 })
0fe39c97
JB
293 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
294 expect(pool.opts.settings).toStrictEqual({
1a76932b
JB
295 args: ['--use', 'http'],
296 silent: true
297 })
0fe39c97 298 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
1a76932b
JB
299 args: ['--use', 'http'],
300 silent: true,
301 exec: workerFilePath
302 })
0fe39c97 303 await pool.destroy()
1a76932b
JB
304 })
305
325f50bc 306 it('Should work even without opts in input', async () => {
d35e5717 307 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
7d27ec0d 308 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
0fe39c97 309 const res = await pool.execute()
30b963d4 310 expect(res).toStrictEqual({ ok: 1 })
8bc77620 311 // We need to clean up the resources after our test
0fe39c97 312 await pool.destroy()
325f50bc 313 })
8d3782fa 314
7d27ec0d 315 it('Verify destroyWorkerNode()', async () => {
d35e5717 316 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
7d27ec0d 317 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
52a23942 318 const workerNodeKey = 0
7d27ec0d 319 let disconnectEvent = 0
52a23942 320 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
7d27ec0d
JB
321 ++disconnectEvent
322 })
323 let exitEvent = 0
52a23942 324 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
325 ++exitEvent
326 })
52a23942 327 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
328 expect(disconnectEvent).toBe(1)
329 expect(exitEvent).toBe(1)
330 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
331 await pool.destroy()
332 })
333
1cc6e9ef 334 it('Verify that a pool with zero worker fails', () => {
8d3782fa
JB
335 expect(
336 () =>
d35e5717 337 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
948faff7 338 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 339 })
325f50bc 340})