chore: v3.1.22
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
CommitLineData
a074ffee 1import { expect } from 'expect'
ded253e2 2
d35e5717 3import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
ded253e2 4import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
d35e5717
JB
5import { TaskFunctions } from '../../test-types.cjs'
6import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
325f50bc 7
079de991 8describe('Fixed cluster pool test suite', () => {
56d11670 9 const numberOfWorkers = 8
4e377863 10 const tasksConcurrency = 2
e1ffb94f
JB
11 const pool = new FixedClusterPool(
12 numberOfWorkers,
d35e5717 13 './tests/worker-files/cluster/testWorker.cjs',
e1ffb94f 14 {
041dc05b 15 errorHandler: e => console.error(e)
e1ffb94f
JB
16 }
17 )
594bfb84
JB
18 const queuePool = new FixedClusterPool(
19 numberOfWorkers,
d35e5717 20 './tests/worker-files/cluster/testWorker.cjs',
594bfb84
JB
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
4e377863 24 concurrency: tasksConcurrency
594bfb84 25 },
041dc05b 26 errorHandler: e => console.error(e)
594bfb84
JB
27 }
28 )
e1ffb94f
JB
29 const emptyPool = new FixedClusterPool(
30 numberOfWorkers,
d35e5717 31 './tests/worker-files/cluster/emptyWorker.cjs',
73bfd59d 32 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
33 )
34 const echoPool = new FixedClusterPool(
35 numberOfWorkers,
d35e5717 36 './tests/worker-files/cluster/echoWorker.cjs'
e1ffb94f
JB
37 )
38 const errorPool = new FixedClusterPool(
39 numberOfWorkers,
d35e5717 40 './tests/worker-files/cluster/errorWorker.cjs',
e1ffb94f 41 {
041dc05b 42 errorHandler: e => console.error(e)
e1ffb94f
JB
43 }
44 )
45 const asyncErrorPool = new FixedClusterPool(
46 numberOfWorkers,
d35e5717 47 './tests/worker-files/cluster/asyncErrorWorker.cjs',
e1ffb94f 48 {
041dc05b 49 errorHandler: e => console.error(e)
e1ffb94f
JB
50 }
51 )
52 const asyncPool = new FixedClusterPool(
53 numberOfWorkers,
d35e5717 54 './tests/worker-files/cluster/asyncWorker.cjs'
e1ffb94f
JB
55 )
56
8bc77620
APA
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
594bfb84 64 await queuePool.destroy()
8bc77620
APA
65 })
66
325f50bc 67 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 68 let result = await pool.execute({
dbca3be9 69 function: TaskFunctions.fibonacci
6db75ad9 70 })
024daf59 71 expect(result).toBe(75025)
6db75ad9 72 result = await pool.execute({
dbca3be9 73 function: TaskFunctions.factorial
6db75ad9 74 })
70a4f5ea 75 expect(result).toBe(9.33262154439441e157)
325f50bc
S
76 })
77
318d4156 78 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 79 const result = await pool.execute()
30b963d4 80 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
81 })
82
1dcbfefc 83 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 84 const pool = new FixedClusterPool(
079de991 85 numberOfWorkers,
d35e5717 86 './tests/worker-files/cluster/testWorker.cjs',
079de991 87 {
041dc05b 88 errorHandler: e => console.error(e)
079de991
JB
89 }
90 )
c726f66c 91 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 92 let poolReady = 0
0fe39c97
JB
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 96 expect(poolReady).toBe(1)
0fe39c97 97 await pool.destroy()
216541b6
JB
98 })
99
94407def
JB
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
b04bd0f1 102 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 103 let poolBusy = 0
aee46736 104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 106 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 107 promises.add(pool.execute())
7c0ba920 108 }
94407def 109 await Promise.all(promises)
14916bf9
JB
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
113 })
114
594bfb84 115 it('Verify that tasks queuing is working', async () => {
d3d4b67d 116 const promises = new Set()
4e377863 117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 118 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 119 promises.add(queuePool.execute())
594bfb84 120 }
d3d4b67d 121 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 122 for (const workerNode of queuePool.workerNodes) {
5bb5be17 123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
125 queuePool.opts.tasksQueueOptions.concurrency
126 )
f59e1027 127 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 )
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 )
463226a4 134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
56d11670 135 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 136 }
56d11670 137 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
140 )
6b27d407 141 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
142 numberOfWorkers *
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
144 )
145 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
146 numberOfWorkers *
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 148 )
a1763c54 149 expect(queuePool.info.backPressure).toBe(false)
56d11670 150 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfWorkers * maxMultiplier
156 )
4e377863 157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 158 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 )
463226a4
JB
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
163 0
164 )
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfWorkers * maxMultiplier
167 )
56d11670
JB
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfWorkers * maxMultiplier
171 )
594bfb84 172 }
56d11670
JB
173 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfWorkers * maxMultiplier
178 )
594bfb84
JB
179 })
180
325f50bc
S
181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
6db75ad9 183 expect(result).toBeUndefined()
325f50bc
S
184 })
185
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
e1ffb94f 189 expect(result).toStrictEqual(data)
325f50bc
S
190 })
191
192 it('Verify that error handling is working properly:sync', async () => {
193 const data = { f: 10 }
c726f66c 194 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 195 let taskError
041dc05b 196 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
197 taskError = e
198 })
c726f66c 199 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
325f50bc
S
200 let inError
201 try {
202 await errorPool.execute(data)
203 } catch (e) {
204 inError = e
205 }
206 expect(inError).toBeDefined()
8620fb25 207 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
208 expect(inError).toBe('Error Message from ClusterWorker')
209 expect(taskError).toStrictEqual({
6cd5248f 210 name: DEFAULT_TASK_NAME,
985d0e79
JB
211 message: 'Error Message from ClusterWorker',
212 data
213 })
18482cec
JB
214 expect(
215 errorPool.workerNodes.some(
041dc05b 216 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
217 )
218 ).toBe(true)
325f50bc
S
219 })
220
221 it('Verify that error handling is working properly:async', async () => {
222 const data = { f: 10 }
c726f66c 223 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 224 let taskError
041dc05b 225 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
226 taskError = e
227 })
c726f66c
JB
228 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
229 PoolEvents.taskError
230 ])
325f50bc
S
231 let inError
232 try {
233 await asyncErrorPool.execute(data)
234 } catch (e) {
235 inError = e
236 }
237 expect(inError).toBeDefined()
8620fb25 238 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
239 expect(inError).toBe('Error Message from ClusterWorker:async')
240 expect(taskError).toStrictEqual({
6cd5248f 241 name: DEFAULT_TASK_NAME,
985d0e79
JB
242 message: 'Error Message from ClusterWorker:async',
243 data
244 })
18482cec
JB
245 expect(
246 asyncErrorPool.workerNodes.some(
041dc05b 247 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
248 )
249 ).toBe(true)
325f50bc
S
250 })
251
252 it('Verify that async function is working properly', async () => {
253 const data = { f: 10 }
15e5141f 254 const startTime = performance.now()
325f50bc 255 const result = await asyncPool.execute(data)
15e5141f 256 const usedTime = performance.now() - startTime
e1ffb94f 257 expect(result).toStrictEqual(data)
325f50bc
S
258 expect(usedTime).toBeGreaterThanOrEqual(2000)
259 })
260
261 it('Shutdown test', async () => {
bac873bd 262 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
c726f66c 263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
264 let poolDestroy = 0
265 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
266 expect(pool.emitter.eventNames()).toStrictEqual([
267 PoolEvents.busy,
268 PoolEvents.destroy
269 ])
45dbbb14 270 await pool.destroy()
bdacc2d2 271 const numberOfExitEvents = await exitPromise
bb9423b7 272 expect(pool.started).toBe(false)
d67bed32 273 expect(pool.emitter.eventNames()).toStrictEqual(['busy', 'destroy'])
55082af9 274 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 275 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 276 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 277 expect(poolDestroy).toBe(1)
325f50bc
S
278 })
279
1a76932b 280 it('Verify that cluster pool options are checked', async () => {
d35e5717 281 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
0fe39c97
JB
282 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
283 expect(pool.opts.env).toBeUndefined()
284 expect(pool.opts.settings).toBeUndefined()
285 await pool.destroy()
286 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
1a76932b
JB
287 env: { TEST: 'test' },
288 settings: { args: ['--use', 'http'], silent: true }
289 })
0fe39c97
JB
290 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
291 expect(pool.opts.settings).toStrictEqual({
1a76932b
JB
292 args: ['--use', 'http'],
293 silent: true
294 })
0fe39c97 295 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
1a76932b
JB
296 args: ['--use', 'http'],
297 silent: true,
298 exec: workerFilePath
299 })
0fe39c97 300 await pool.destroy()
1a76932b
JB
301 })
302
325f50bc 303 it('Should work even without opts in input', async () => {
d35e5717 304 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
7d27ec0d 305 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
0fe39c97 306 const res = await pool.execute()
30b963d4 307 expect(res).toStrictEqual({ ok: 1 })
8bc77620 308 // We need to clean up the resources after our test
0fe39c97 309 await pool.destroy()
325f50bc 310 })
8d3782fa 311
7d27ec0d 312 it('Verify destroyWorkerNode()', async () => {
d35e5717 313 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
7d27ec0d 314 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
52a23942 315 const workerNodeKey = 0
7d27ec0d 316 let disconnectEvent = 0
52a23942 317 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
7d27ec0d
JB
318 ++disconnectEvent
319 })
320 let exitEvent = 0
52a23942 321 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
322 ++exitEvent
323 })
52a23942 324 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
325 expect(disconnectEvent).toBe(1)
326 expect(exitEvent).toBe(1)
327 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
328 await pool.destroy()
329 })
330
1cc6e9ef 331 it('Verify that a pool with zero worker fails', () => {
8d3782fa
JB
332 expect(
333 () =>
d35e5717 334 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
948faff7 335 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 336 })
325f50bc 337})