chore: use `biome` instead of `rome`
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
6cd5248f 5const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
325f50bc 6
079de991 7describe('Fixed cluster pool test suite', () => {
56d11670 8 const numberOfWorkers = 8
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedClusterPool(
11 numberOfWorkers,
12 './tests/worker-files/cluster/testWorker.js',
13 {
8ebe6c30 14 errorHandler: (e) => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedClusterPool(
18 numberOfWorkers,
19 './tests/worker-files/cluster/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
8ebe6c30 25 errorHandler: (e) => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedClusterPool(
29 numberOfWorkers,
30 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/echoWorker.js'
36 )
37 const errorPool = new FixedClusterPool(
38 numberOfWorkers,
39 './tests/worker-files/cluster/errorWorker.js',
40 {
8ebe6c30 41 errorHandler: (e) => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedClusterPool(
45 numberOfWorkers,
46 './tests/worker-files/cluster/asyncErrorWorker.js',
47 {
8ebe6c30 48 errorHandler: (e) => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedClusterPool(
52 numberOfWorkers,
53 './tests/worker-files/cluster/asyncWorker.js'
54 )
55
8bc77620
APA
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
8bc77620
APA
64 })
65
325f50bc 66 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
325f50bc
S
75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedClusterPool(
079de991
JB
84 numberOfWorkers,
85 './tests/worker-files/cluster/testWorker.js',
86 {
8ebe6c30 87 errorHandler: (e) => console.error(e)
079de991
JB
88 }
89 )
90 let poolReady = 0
0fe39c97
JB
91 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
92 await waitPoolEvents(pool, PoolEvents.ready, 1)
216541b6 93 expect(poolReady).toBe(1)
0fe39c97 94 await pool.destroy()
216541b6
JB
95 })
96
94407def
JB
97 it("Verify that 'busy' event is emitted", async () => {
98 const promises = new Set()
7c0ba920 99 let poolBusy = 0
aee46736 100 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 101 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 102 promises.add(pool.execute())
7c0ba920 103 }
94407def 104 await Promise.all(promises)
14916bf9
JB
105 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
106 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
107 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
108 })
109
594bfb84 110 it('Verify that tasks queuing is working', async () => {
d3d4b67d 111 const promises = new Set()
4e377863 112 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 113 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 114 promises.add(queuePool.execute())
594bfb84 115 }
d3d4b67d 116 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 117 for (const workerNode of queuePool.workerNodes) {
5bb5be17 118 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 119 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
120 queuePool.opts.tasksQueueOptions.concurrency
121 )
f59e1027 122 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
123 expect(workerNode.usage.tasks.queued).toBe(
124 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
125 )
126 expect(workerNode.usage.tasks.maxQueued).toBe(
127 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
128 )
56d11670 129 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 130 }
56d11670 131 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
132 expect(queuePool.info.executingTasks).toBe(
133 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
134 )
6b27d407 135 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
136 numberOfWorkers *
137 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
138 )
139 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
140 numberOfWorkers *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 142 )
a1763c54 143 expect(queuePool.info.backPressure).toBe(false)
56d11670 144 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
145 await Promise.all(promises)
146 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
147 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
148 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
149 numberOfWorkers * maxMultiplier
150 )
4e377863 151 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 152 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
153 expect(workerNode.usage.tasks.maxQueued).toBe(
154 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
155 )
56d11670
JB
156 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
157 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
158 numberOfWorkers * maxMultiplier
159 )
594bfb84 160 }
56d11670
JB
161 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
162 expect(queuePool.info.backPressure).toBe(false)
163 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
164 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
165 numberOfWorkers * maxMultiplier
166 )
594bfb84
JB
167 })
168
325f50bc
S
169 it('Verify that is possible to have a worker that return undefined', async () => {
170 const result = await emptyPool.execute()
6db75ad9 171 expect(result).toBeUndefined()
325f50bc
S
172 })
173
174 it('Verify that data are sent to the worker correctly', async () => {
175 const data = { f: 10 }
176 const result = await echoPool.execute(data)
e1ffb94f 177 expect(result).toStrictEqual(data)
325f50bc
S
178 })
179
180 it('Verify that error handling is working properly:sync', async () => {
181 const data = { f: 10 }
d46660cd 182 let taskError
8ebe6c30 183 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
184 taskError = e
185 })
325f50bc
S
186 let inError
187 try {
188 await errorPool.execute(data)
189 } catch (e) {
190 inError = e
191 }
192 expect(inError).toBeDefined()
8620fb25 193 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
194 expect(inError).toBe('Error Message from ClusterWorker')
195 expect(taskError).toStrictEqual({
6cd5248f 196 name: DEFAULT_TASK_NAME,
985d0e79
JB
197 message: 'Error Message from ClusterWorker',
198 data
199 })
18482cec
JB
200 expect(
201 errorPool.workerNodes.some(
8ebe6c30 202 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
203 )
204 ).toBe(true)
325f50bc
S
205 })
206
207 it('Verify that error handling is working properly:async', async () => {
208 const data = { f: 10 }
4f0b85b3 209 let taskError
8ebe6c30 210 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
211 taskError = e
212 })
325f50bc
S
213 let inError
214 try {
215 await asyncErrorPool.execute(data)
216 } catch (e) {
217 inError = e
218 }
219 expect(inError).toBeDefined()
8620fb25 220 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
221 expect(inError).toBe('Error Message from ClusterWorker:async')
222 expect(taskError).toStrictEqual({
6cd5248f 223 name: DEFAULT_TASK_NAME,
985d0e79
JB
224 message: 'Error Message from ClusterWorker:async',
225 data
226 })
18482cec
JB
227 expect(
228 asyncErrorPool.workerNodes.some(
8ebe6c30 229 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
230 )
231 ).toBe(true)
325f50bc
S
232 })
233
234 it('Verify that async function is working properly', async () => {
235 const data = { f: 10 }
15e5141f 236 const startTime = performance.now()
325f50bc 237 const result = await asyncPool.execute(data)
15e5141f 238 const usedTime = performance.now() - startTime
e1ffb94f 239 expect(result).toStrictEqual(data)
325f50bc
S
240 expect(usedTime).toBeGreaterThanOrEqual(2000)
241 })
242
243 it('Shutdown test', async () => {
bac873bd 244 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
ef3891a3
JB
245 let poolDestroy = 0
246 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
45dbbb14 247 await pool.destroy()
bdacc2d2 248 const numberOfExitEvents = await exitPromise
bb9423b7
JB
249 expect(pool.started).toBe(false)
250 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 251 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 252 expect(poolDestroy).toBe(1)
325f50bc
S
253 })
254
1a76932b
JB
255 it('Verify that cluster pool options are checked', async () => {
256 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
0fe39c97
JB
257 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
258 expect(pool.opts.env).toBeUndefined()
259 expect(pool.opts.settings).toBeUndefined()
260 await pool.destroy()
261 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
1a76932b
JB
262 env: { TEST: 'test' },
263 settings: { args: ['--use', 'http'], silent: true }
264 })
0fe39c97
JB
265 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
266 expect(pool.opts.settings).toStrictEqual({
1a76932b
JB
267 args: ['--use', 'http'],
268 silent: true
269 })
0fe39c97 270 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
1a76932b
JB
271 args: ['--use', 'http'],
272 silent: true,
273 exec: workerFilePath
274 })
0fe39c97 275 await pool.destroy()
1a76932b
JB
276 })
277
325f50bc 278 it('Should work even without opts in input', async () => {
7d27ec0d
JB
279 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
280 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
0fe39c97 281 const res = await pool.execute()
30b963d4 282 expect(res).toStrictEqual({ ok: 1 })
8bc77620 283 // We need to clean up the resources after our test
0fe39c97 284 await pool.destroy()
325f50bc 285 })
8d3782fa 286
7d27ec0d
JB
287 it('Verify destroyWorkerNode()', async () => {
288 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
289 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
52a23942 290 const workerNodeKey = 0
7d27ec0d 291 let disconnectEvent = 0
52a23942 292 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
7d27ec0d
JB
293 ++disconnectEvent
294 })
295 let exitEvent = 0
52a23942 296 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
297 ++exitEvent
298 })
52a23942 299 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
300 expect(disconnectEvent).toBe(1)
301 expect(exitEvent).toBe(1)
302 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
303 await pool.destroy()
304 })
305
8d3782fa
JB
306 it('Verify that a pool with zero worker fails', async () => {
307 expect(
308 () =>
309 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 310 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 311 })
325f50bc 312})