build(deps-dev): apply updates
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
CommitLineData
1fd60f0d
JB
1import cluster from 'node:cluster'
2
a074ffee 3import { expect } from 'expect'
ded253e2 4
d35e5717 5import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
ded253e2 6import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
d35e5717
JB
7import { TaskFunctions } from '../../test-types.cjs'
8import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
325f50bc 9
079de991 10describe('Fixed cluster pool test suite', () => {
56d11670 11 const numberOfWorkers = 8
4e377863 12 const tasksConcurrency = 2
e1ffb94f
JB
13 const pool = new FixedClusterPool(
14 numberOfWorkers,
d35e5717 15 './tests/worker-files/cluster/testWorker.cjs',
e1ffb94f 16 {
3a502712 17 errorHandler: e => console.error(e),
e1ffb94f
JB
18 }
19 )
594bfb84
JB
20 const queuePool = new FixedClusterPool(
21 numberOfWorkers,
d35e5717 22 './tests/worker-files/cluster/testWorker.cjs',
594bfb84
JB
23 {
24 enableTasksQueue: true,
25 tasksQueueOptions: {
3a502712 26 concurrency: tasksConcurrency,
594bfb84 27 },
3a502712 28 errorHandler: e => console.error(e),
594bfb84
JB
29 }
30 )
e1ffb94f
JB
31 const emptyPool = new FixedClusterPool(
32 numberOfWorkers,
d35e5717 33 './tests/worker-files/cluster/emptyWorker.cjs',
73bfd59d 34 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
35 )
36 const echoPool = new FixedClusterPool(
37 numberOfWorkers,
d35e5717 38 './tests/worker-files/cluster/echoWorker.cjs'
e1ffb94f
JB
39 )
40 const errorPool = new FixedClusterPool(
41 numberOfWorkers,
d35e5717 42 './tests/worker-files/cluster/errorWorker.cjs',
e1ffb94f 43 {
3a502712 44 errorHandler: e => console.error(e),
e1ffb94f
JB
45 }
46 )
47 const asyncErrorPool = new FixedClusterPool(
48 numberOfWorkers,
d35e5717 49 './tests/worker-files/cluster/asyncErrorWorker.cjs',
e1ffb94f 50 {
3a502712 51 errorHandler: e => console.error(e),
e1ffb94f
JB
52 }
53 )
54 const asyncPool = new FixedClusterPool(
55 numberOfWorkers,
d35e5717 56 './tests/worker-files/cluster/asyncWorker.cjs'
e1ffb94f
JB
57 )
58
8bc77620
APA
59 after('Destroy all pools', async () => {
60 // We need to clean up the resources after our test
61 await echoPool.destroy()
62 await asyncPool.destroy()
63 await errorPool.destroy()
64 await asyncErrorPool.destroy()
65 await emptyPool.destroy()
594bfb84 66 await queuePool.destroy()
8bc77620
APA
67 })
68
325f50bc 69 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 70 let result = await pool.execute({
3a502712 71 function: TaskFunctions.fibonacci,
6db75ad9 72 })
66f0c14c 73 expect(result).toBe(354224848179262000000)
6db75ad9 74 result = await pool.execute({
3a502712 75 function: TaskFunctions.factorial,
6db75ad9 76 })
70a4f5ea 77 expect(result).toBe(9.33262154439441e157)
325f50bc
S
78 })
79
318d4156 80 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 81 const result = await pool.execute()
30b963d4 82 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
83 })
84
1dcbfefc 85 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 86 const pool = new FixedClusterPool(
079de991 87 numberOfWorkers,
d35e5717 88 './tests/worker-files/cluster/testWorker.cjs',
079de991 89 {
3a502712 90 errorHandler: e => console.error(e),
079de991
JB
91 }
92 )
c726f66c 93 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 94 let poolReady = 0
0fe39c97
JB
95 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
96 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 97 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 98 expect(poolReady).toBe(1)
0fe39c97 99 await pool.destroy()
216541b6
JB
100 })
101
94407def
JB
102 it("Verify that 'busy' event is emitted", async () => {
103 const promises = new Set()
b04bd0f1 104 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 105 let poolBusy = 0
aee46736 106 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 107 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 108 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 109 promises.add(pool.execute())
7c0ba920 110 }
94407def 111 await Promise.all(promises)
14916bf9
JB
112 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
113 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
114 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
115 })
116
594bfb84 117 it('Verify that tasks queuing is working', async () => {
d3d4b67d 118 const promises = new Set()
4e377863 119 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 120 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 121 promises.add(queuePool.execute())
594bfb84 122 }
d3d4b67d 123 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 124 for (const workerNode of queuePool.workerNodes) {
5bb5be17 125 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 126 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
127 queuePool.opts.tasksQueueOptions.concurrency
128 )
f59e1027 129 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
130 expect(workerNode.usage.tasks.queued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
133 expect(workerNode.usage.tasks.maxQueued).toBe(
134 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
135 )
463226a4 136 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
56d11670 137 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 138 }
56d11670 139 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
140 expect(queuePool.info.executingTasks).toBe(
141 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
142 )
6b27d407 143 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
144 numberOfWorkers *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
146 )
147 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
148 numberOfWorkers *
149 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 150 )
a1763c54 151 expect(queuePool.info.backPressure).toBe(false)
56d11670 152 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
153 await Promise.all(promises)
154 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
155 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
156 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
157 numberOfWorkers * maxMultiplier
158 )
4e377863 159 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 160 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
161 expect(workerNode.usage.tasks.maxQueued).toBe(
162 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
163 )
463226a4
JB
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
165 0
166 )
167 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
168 numberOfWorkers * maxMultiplier
169 )
56d11670
JB
170 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
171 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
172 numberOfWorkers * maxMultiplier
173 )
594bfb84 174 }
56d11670
JB
175 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
176 expect(queuePool.info.backPressure).toBe(false)
177 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
178 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
179 numberOfWorkers * maxMultiplier
180 )
594bfb84
JB
181 })
182
325f50bc
S
183 it('Verify that is possible to have a worker that return undefined', async () => {
184 const result = await emptyPool.execute()
6db75ad9 185 expect(result).toBeUndefined()
325f50bc
S
186 })
187
188 it('Verify that data are sent to the worker correctly', async () => {
189 const data = { f: 10 }
190 const result = await echoPool.execute(data)
e1ffb94f 191 expect(result).toStrictEqual(data)
325f50bc
S
192 })
193
194 it('Verify that error handling is working properly:sync', async () => {
195 const data = { f: 10 }
c726f66c 196 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 197 let taskError
041dc05b 198 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
199 taskError = e
200 })
c726f66c 201 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
325f50bc
S
202 let inError
203 try {
204 await errorPool.execute(data)
205 } catch (e) {
206 inError = e
207 }
208 expect(inError).toBeDefined()
8620fb25 209 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
210 expect(inError).toBe('Error Message from ClusterWorker')
211 expect(taskError).toStrictEqual({
6cd5248f 212 name: DEFAULT_TASK_NAME,
985d0e79 213 message: 'Error Message from ClusterWorker',
3a502712 214 data,
985d0e79 215 })
18482cec
JB
216 expect(
217 errorPool.workerNodes.some(
041dc05b 218 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
219 )
220 ).toBe(true)
325f50bc
S
221 })
222
223 it('Verify that error handling is working properly:async', async () => {
224 const data = { f: 10 }
c726f66c 225 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 226 let taskError
041dc05b 227 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
228 taskError = e
229 })
c726f66c 230 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
3a502712 231 PoolEvents.taskError,
c726f66c 232 ])
325f50bc
S
233 let inError
234 try {
235 await asyncErrorPool.execute(data)
236 } catch (e) {
237 inError = e
238 }
239 expect(inError).toBeDefined()
8620fb25 240 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
241 expect(inError).toBe('Error Message from ClusterWorker:async')
242 expect(taskError).toStrictEqual({
6cd5248f 243 name: DEFAULT_TASK_NAME,
985d0e79 244 message: 'Error Message from ClusterWorker:async',
3a502712 245 data,
985d0e79 246 })
18482cec
JB
247 expect(
248 asyncErrorPool.workerNodes.some(
041dc05b 249 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
250 )
251 ).toBe(true)
325f50bc
S
252 })
253
254 it('Verify that async function is working properly', async () => {
255 const data = { f: 10 }
15e5141f 256 const startTime = performance.now()
325f50bc 257 const result = await asyncPool.execute(data)
15e5141f 258 const usedTime = performance.now() - startTime
e1ffb94f 259 expect(result).toStrictEqual(data)
325f50bc
S
260 expect(usedTime).toBeGreaterThanOrEqual(2000)
261 })
262
263 it('Shutdown test', async () => {
bac873bd 264 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
c726f66c 265 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
266 let poolDestroy = 0
267 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
268 expect(pool.emitter.eventNames()).toStrictEqual([
269 PoolEvents.busy,
3a502712 270 PoolEvents.destroy,
c726f66c 271 ])
45dbbb14 272 await pool.destroy()
bdacc2d2 273 const numberOfExitEvents = await exitPromise
bb9423b7 274 expect(pool.started).toBe(false)
a621172f
JB
275 expect(pool.emitter.eventNames()).toStrictEqual([
276 PoolEvents.busy,
3a502712 277 PoolEvents.destroy,
a621172f 278 ])
55082af9 279 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 280 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 281 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 282 expect(poolDestroy).toBe(1)
325f50bc
S
283 })
284
1a76932b 285 it('Verify that cluster pool options are checked', async () => {
d35e5717 286 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
0fe39c97
JB
287 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
288 expect(pool.opts.env).toBeUndefined()
289 expect(pool.opts.settings).toBeUndefined()
7cb5dbdb
JB
290 expect(cluster.settings).toMatchObject({
291 exec: workerFilePath,
3a502712 292 silent: false,
7cb5dbdb 293 })
0fe39c97
JB
294 await pool.destroy()
295 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
1a76932b 296 env: { TEST: 'test' },
3a502712 297 settings: { args: ['--use', 'http'], silent: true },
1a76932b 298 })
0fe39c97
JB
299 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
300 expect(pool.opts.settings).toStrictEqual({
1a76932b 301 args: ['--use', 'http'],
3a502712 302 silent: true,
1a76932b 303 })
1fd60f0d
JB
304 expect(cluster.settings).toMatchObject({
305 args: ['--use', 'http'],
306 silent: true,
3a502712 307 exec: workerFilePath,
1fd60f0d 308 })
0fe39c97 309 await pool.destroy()
1a76932b
JB
310 })
311
325f50bc 312 it('Should work even without opts in input', async () => {
d35e5717 313 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
7d27ec0d 314 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
0fe39c97 315 const res = await pool.execute()
30b963d4 316 expect(res).toStrictEqual({ ok: 1 })
8bc77620 317 // We need to clean up the resources after our test
0fe39c97 318 await pool.destroy()
325f50bc 319 })
8d3782fa 320
7d27ec0d 321 it('Verify destroyWorkerNode()', async () => {
d35e5717 322 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
7d27ec0d 323 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
52a23942 324 const workerNodeKey = 0
7d27ec0d 325 let disconnectEvent = 0
52a23942 326 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
7d27ec0d
JB
327 ++disconnectEvent
328 })
329 let exitEvent = 0
52a23942 330 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
331 ++exitEvent
332 })
52a23942 333 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
334 expect(disconnectEvent).toBe(1)
335 expect(exitEvent).toBe(1)
60dc0a9c
JB
336 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
337 expect(pool.workerNodes.length).toBe(numberOfWorkers)
7d27ec0d
JB
338 await pool.destroy()
339 })
340
1cc6e9ef 341 it('Verify that a pool with zero worker fails', () => {
8d3782fa
JB
342 expect(
343 () =>
d35e5717 344 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
948faff7 345 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 346 })
325f50bc 347})