chore: v3.0.7
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
CommitLineData
a074ffee
JB
1import { expect } from 'expect'
2import { FixedClusterPool, PoolEvents } from '../../../lib/index.js'
3import { TaskFunctions } from '../../test-types.js'
4import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
325f50bc 6
079de991 7describe('Fixed cluster pool test suite', () => {
56d11670 8 const numberOfWorkers = 8
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedClusterPool(
11 numberOfWorkers,
12 './tests/worker-files/cluster/testWorker.js',
13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedClusterPool(
18 numberOfWorkers,
19 './tests/worker-files/cluster/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
041dc05b 25 errorHandler: e => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedClusterPool(
29 numberOfWorkers,
30 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/echoWorker.js'
36 )
37 const errorPool = new FixedClusterPool(
38 numberOfWorkers,
39 './tests/worker-files/cluster/errorWorker.js',
40 {
041dc05b 41 errorHandler: e => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedClusterPool(
45 numberOfWorkers,
46 './tests/worker-files/cluster/asyncErrorWorker.js',
47 {
041dc05b 48 errorHandler: e => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedClusterPool(
52 numberOfWorkers,
53 './tests/worker-files/cluster/asyncWorker.js'
54 )
55
8bc77620
APA
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
8bc77620
APA
64 })
65
325f50bc 66 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
325f50bc
S
75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedClusterPool(
079de991
JB
84 numberOfWorkers,
85 './tests/worker-files/cluster/testWorker.js',
86 {
041dc05b 87 errorHandler: e => console.error(e)
079de991
JB
88 }
89 )
c726f66c 90 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 91 let poolReady = 0
0fe39c97
JB
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 95 expect(poolReady).toBe(1)
0fe39c97 96 await pool.destroy()
216541b6
JB
97 })
98
94407def
JB
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
b04bd0f1 101 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 102 let poolBusy = 0
aee46736 103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 105 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 106 promises.add(pool.execute())
7c0ba920 107 }
94407def 108 await Promise.all(promises)
14916bf9
JB
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
112 })
113
594bfb84 114 it('Verify that tasks queuing is working', async () => {
d3d4b67d 115 const promises = new Set()
4e377863 116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 117 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 118 promises.add(queuePool.execute())
594bfb84 119 }
d3d4b67d 120 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 121 for (const workerNode of queuePool.workerNodes) {
5bb5be17 122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
f59e1027 126 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
56d11670 133 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 134 }
56d11670 135 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
138 )
6b27d407 139 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
140 numberOfWorkers *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
142 )
143 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
144 numberOfWorkers *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 146 )
a1763c54 147 expect(queuePool.info.backPressure).toBe(false)
56d11670 148 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfWorkers * maxMultiplier
154 )
4e377863 155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 156 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
159 )
56d11670
JB
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfWorkers * maxMultiplier
163 )
594bfb84 164 }
56d11670
JB
165 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfWorkers * maxMultiplier
170 )
594bfb84
JB
171 })
172
325f50bc
S
173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
6db75ad9 175 expect(result).toBeUndefined()
325f50bc
S
176 })
177
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
e1ffb94f 181 expect(result).toStrictEqual(data)
325f50bc
S
182 })
183
184 it('Verify that error handling is working properly:sync', async () => {
185 const data = { f: 10 }
c726f66c 186 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 187 let taskError
041dc05b 188 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
189 taskError = e
190 })
c726f66c 191 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
325f50bc
S
192 let inError
193 try {
194 await errorPool.execute(data)
195 } catch (e) {
196 inError = e
197 }
198 expect(inError).toBeDefined()
8620fb25 199 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
200 expect(inError).toBe('Error Message from ClusterWorker')
201 expect(taskError).toStrictEqual({
6cd5248f 202 name: DEFAULT_TASK_NAME,
985d0e79
JB
203 message: 'Error Message from ClusterWorker',
204 data
205 })
18482cec
JB
206 expect(
207 errorPool.workerNodes.some(
041dc05b 208 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
209 )
210 ).toBe(true)
325f50bc
S
211 })
212
213 it('Verify that error handling is working properly:async', async () => {
214 const data = { f: 10 }
c726f66c 215 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 216 let taskError
041dc05b 217 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
218 taskError = e
219 })
c726f66c
JB
220 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
221 PoolEvents.taskError
222 ])
325f50bc
S
223 let inError
224 try {
225 await asyncErrorPool.execute(data)
226 } catch (e) {
227 inError = e
228 }
229 expect(inError).toBeDefined()
8620fb25 230 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
231 expect(inError).toBe('Error Message from ClusterWorker:async')
232 expect(taskError).toStrictEqual({
6cd5248f 233 name: DEFAULT_TASK_NAME,
985d0e79
JB
234 message: 'Error Message from ClusterWorker:async',
235 data
236 })
18482cec
JB
237 expect(
238 asyncErrorPool.workerNodes.some(
041dc05b 239 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
240 )
241 ).toBe(true)
325f50bc
S
242 })
243
244 it('Verify that async function is working properly', async () => {
245 const data = { f: 10 }
15e5141f 246 const startTime = performance.now()
325f50bc 247 const result = await asyncPool.execute(data)
15e5141f 248 const usedTime = performance.now() - startTime
e1ffb94f 249 expect(result).toStrictEqual(data)
325f50bc
S
250 expect(usedTime).toBeGreaterThanOrEqual(2000)
251 })
252
253 it('Shutdown test', async () => {
bac873bd 254 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
c726f66c 255 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
256 let poolDestroy = 0
257 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
258 expect(pool.emitter.eventNames()).toStrictEqual([
259 PoolEvents.busy,
260 PoolEvents.destroy
261 ])
45dbbb14 262 await pool.destroy()
bdacc2d2 263 const numberOfExitEvents = await exitPromise
bb9423b7 264 expect(pool.started).toBe(false)
55082af9 265 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 266 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 267 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 268 expect(poolDestroy).toBe(1)
325f50bc
S
269 })
270
1a76932b
JB
271 it('Verify that cluster pool options are checked', async () => {
272 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
0fe39c97
JB
273 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
274 expect(pool.opts.env).toBeUndefined()
275 expect(pool.opts.settings).toBeUndefined()
276 await pool.destroy()
277 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
1a76932b
JB
278 env: { TEST: 'test' },
279 settings: { args: ['--use', 'http'], silent: true }
280 })
0fe39c97
JB
281 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
282 expect(pool.opts.settings).toStrictEqual({
1a76932b
JB
283 args: ['--use', 'http'],
284 silent: true
285 })
0fe39c97 286 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
1a76932b
JB
287 args: ['--use', 'http'],
288 silent: true,
289 exec: workerFilePath
290 })
0fe39c97 291 await pool.destroy()
1a76932b
JB
292 })
293
325f50bc 294 it('Should work even without opts in input', async () => {
7d27ec0d
JB
295 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
296 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
0fe39c97 297 const res = await pool.execute()
30b963d4 298 expect(res).toStrictEqual({ ok: 1 })
8bc77620 299 // We need to clean up the resources after our test
0fe39c97 300 await pool.destroy()
325f50bc 301 })
8d3782fa 302
7d27ec0d
JB
303 it('Verify destroyWorkerNode()', async () => {
304 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
305 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
52a23942 306 const workerNodeKey = 0
7d27ec0d 307 let disconnectEvent = 0
52a23942 308 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
7d27ec0d
JB
309 ++disconnectEvent
310 })
311 let exitEvent = 0
52a23942 312 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
313 ++exitEvent
314 })
52a23942 315 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
316 expect(disconnectEvent).toBe(1)
317 expect(exitEvent).toBe(1)
318 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
319 await pool.destroy()
320 })
321
1cc6e9ef 322 it('Verify that a pool with zero worker fails', () => {
8d3782fa
JB
323 expect(
324 () =>
325 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
948faff7 326 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 327 })
325f50bc 328})