]> Piment Noir Git Repositories - poolifier.git/blob - tests/pools/cluster/fixed.test.mjs
8d8cdf785fdfb883b0a28474e6d91ea75f64193c
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
1 import cluster from 'node:cluster'
2 import { afterAll, beforeAll, describe, expect, it } from 'vitest'
3
4 import { FixedClusterPool, PoolEvents } from '../../../lib/index.mjs'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.mjs'
6 import { TaskFunctions } from '../../test-types.cjs'
7 import { sleep, waitWorkerEvents } from '../../test-utils.cjs'
8
9 describe('Fixed cluster pool test suite', () => {
10 const numberOfWorkers = 8
11 const tasksConcurrency = 2
12 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
13
14 beforeAll(() => {
15 pool = new FixedClusterPool(
16 numberOfWorkers,
17 './tests/worker-files/cluster/testWorker.cjs',
18 {
19 errorHandler: e => console.error(e),
20 }
21 )
22 queuePool = new FixedClusterPool(
23 numberOfWorkers,
24 './tests/worker-files/cluster/testWorker.cjs',
25 {
26 enableTasksQueue: true,
27 errorHandler: e => console.error(e),
28 tasksQueueOptions: {
29 concurrency: tasksConcurrency,
30 },
31 }
32 )
33 emptyPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/emptyWorker.cjs',
36 { exitHandler: () => console.info('empty pool worker exited') }
37 )
38 echoPool = new FixedClusterPool(
39 numberOfWorkers,
40 './tests/worker-files/cluster/echoWorker.cjs'
41 )
42 errorPool = new FixedClusterPool(
43 numberOfWorkers,
44 './tests/worker-files/cluster/errorWorker.cjs',
45 {
46 errorHandler: e => console.error(e),
47 }
48 )
49 asyncErrorPool = new FixedClusterPool(
50 numberOfWorkers,
51 './tests/worker-files/cluster/asyncErrorWorker.cjs',
52 {
53 errorHandler: e => console.error(e),
54 }
55 )
56 asyncPool = new FixedClusterPool(
57 numberOfWorkers,
58 './tests/worker-files/cluster/asyncWorker.cjs'
59 )
60 })
61
62 afterAll(async () => {
63 // Skip on CI to avoid afterAll hook timeout
64 if (process.env.CI != null) return
65 // We need to clean up the resources after our tests
66 await echoPool.destroy()
67 await asyncPool.destroy()
68 await errorPool.destroy()
69 await asyncErrorPool.destroy()
70 await emptyPool.destroy()
71 await queuePool.destroy()
72 })
73
74 it('Verify that the function is executed in a worker cluster', async () => {
75 let result = await pool.execute(
76 {
77 function: TaskFunctions.fibonacci,
78 },
79 'default',
80 AbortSignal.timeout(2000)
81 )
82 expect(result).toBe(354224848179262000000)
83 result = await pool.execute(
84 {
85 function: TaskFunctions.factorial,
86 },
87 'default',
88 AbortSignal.timeout(2000)
89 )
90 expect(result).toBe(9.33262154439441e157)
91 })
92
93 it('Verify that is possible to invoke the execute() method without input', async () => {
94 const result = await pool.execute()
95 expect(result).toStrictEqual({ ok: 1 })
96 })
97
98 it('Verify that tasks queuing is working', async () => {
99 const promises = new Set()
100 const maxMultiplier = 3 // Must be greater than tasksConcurrency
101 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
102 promises.add(queuePool.execute())
103 }
104 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
105 for (const workerNode of queuePool.workerNodes) {
106 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
107 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
108 queuePool.opts.tasksQueueOptions.concurrency
109 )
110 expect(workerNode.usage.tasks.executed).toBe(0)
111 expect(workerNode.usage.tasks.queued).toBe(
112 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
113 )
114 expect(workerNode.usage.tasks.maxQueued).toBe(
115 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
116 )
117 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
118 expect(workerNode.usage.tasks.stolen).toBe(0)
119 }
120 expect(queuePool.info.executedTasks).toBe(0)
121 expect(queuePool.info.executingTasks).toBe(
122 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
123 )
124 expect(queuePool.info.queuedTasks).toBe(
125 numberOfWorkers *
126 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
127 )
128 expect(queuePool.info.maxQueuedTasks).toBe(
129 numberOfWorkers *
130 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
131 )
132 expect(queuePool.info.backPressure).toBe(false)
133 expect(queuePool.info.stolenTasks).toBe(0)
134 await Promise.all(promises)
135 for (const workerNode of queuePool.workerNodes) {
136 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
137 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
138 numberOfWorkers * maxMultiplier
139 )
140 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
141 expect(workerNode.usage.tasks.queued).toBe(0)
142 expect(workerNode.usage.tasks.maxQueued).toBe(
143 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
144 )
145 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
146 0
147 )
148 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
149 numberOfWorkers * maxMultiplier
150 )
151 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
153 numberOfWorkers * maxMultiplier
154 )
155 }
156 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
157 expect(queuePool.info.backPressure).toBe(false)
158 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
159 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
160 numberOfWorkers * maxMultiplier
161 )
162 })
163
164 it('Verify that is possible to have a worker that return undefined', async () => {
165 const result = await emptyPool.execute()
166 expect(result).toBeUndefined()
167 })
168
169 it('Verify that data are sent to the worker correctly', async () => {
170 const data = { f: 10 }
171 const result = await echoPool.execute(data)
172 expect(result).toStrictEqual(data)
173 })
174
175 it('Verify that error handling is working properly:sync', async () => {
176 const data = { f: 10 }
177 expect(errorPool.emitter.eventNames()).toStrictEqual([])
178 let taskError
179 errorPool.emitter.on(PoolEvents.taskError, e => {
180 taskError = e
181 })
182 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
183 let inError
184 try {
185 await errorPool.execute(data)
186 } catch (e) {
187 inError = e
188 }
189 expect(inError).toBeInstanceOf(Error)
190 expect(inError.message).toStrictEqual('Error Message from ClusterWorker')
191 expect(typeof inError.stack === 'string').toBe(true)
192 expect(taskError).toStrictEqual({
193 aborted: false,
194 data,
195 message: inError.message,
196 name: DEFAULT_TASK_NAME,
197 stack: inError.stack,
198 })
199 expect(
200 errorPool.workerNodes.some(
201 workerNode => workerNode.usage.tasks.failed === 1
202 )
203 ).toBe(true)
204 })
205
206 it('Verify that error handling is working properly:async', async () => {
207 const data = { f: 10 }
208 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
209 let taskError
210 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
211 taskError = e
212 })
213 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
214 PoolEvents.taskError,
215 ])
216 let inError
217 try {
218 await asyncErrorPool.execute(data)
219 } catch (e) {
220 inError = e
221 }
222 expect(inError).toBeInstanceOf(Error)
223 expect(inError.message).toStrictEqual(
224 'Error Message from ClusterWorker:async'
225 )
226 expect(typeof inError.stack === 'string').toBe(true)
227 expect(taskError).toStrictEqual({
228 aborted: false,
229 data,
230 message: inError.message,
231 name: DEFAULT_TASK_NAME,
232 stack: inError.stack,
233 })
234 expect(
235 asyncErrorPool.workerNodes.some(
236 workerNode => workerNode.usage.tasks.failed === 1
237 )
238 ).toBe(true)
239 })
240
241 it('Verify that async function is working properly', async () => {
242 const data = { f: 10 }
243 const startTime = performance.now()
244 const result = await asyncPool.execute(data)
245 const usedTime = performance.now() - startTime
246 expect(result).toStrictEqual(data)
247 expect(usedTime).toBeGreaterThanOrEqual(2000)
248 })
249
250 it('Verify that task can be aborted', async () => {
251 let error
252
253 try {
254 await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
255 } catch (e) {
256 error = e
257 }
258 expect(error).toBeInstanceOf(Error)
259 expect(error.name).toBe('TimeoutError')
260 expect(error.message).toBe('The operation was aborted due to timeout')
261 expect(error.stack).toBeDefined()
262
263 const abortController = new AbortController()
264 setTimeout(() => {
265 abortController.abort(new Error('Task aborted'))
266 }, 500)
267 try {
268 await asyncErrorPool.execute({}, 'default', abortController.signal)
269 } catch (e) {
270 error = e
271 }
272 expect(error).toBeInstanceOf(Error)
273 expect(error.message).toBe('Task aborted')
274 expect(error.stack).toBeDefined()
275 })
276
277 it('Shutdown test', { retry: 0 }, async ({ skip }) => {
278 if (process.env.CI != null) {
279 skip()
280 return
281 }
282 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
283 expect(pool.emitter.eventNames()).toStrictEqual([])
284 let poolDestroy = 0
285 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
286 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
287 await pool.destroy()
288 const exitEvents = await exitPromise
289 expect(pool.info.started).toBe(false)
290 expect(pool.info.ready).toBe(false)
291 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
292 expect(pool.readyEventEmitted).toBe(false)
293 expect(pool.busyEventEmitted).toBe(false)
294 expect(pool.backPressureEventEmitted).toBe(false)
295 expect(pool.workerNodes.length).toBe(0)
296 expect(exitEvents).toBe(numberOfWorkers)
297 expect(poolDestroy).toBe(1)
298 })
299
300 it('Verify that cluster pool options are checked', async () => {
301 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
302 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
303 expect(pool.opts.env).toBeUndefined()
304 expect(pool.opts.settings).toBeUndefined()
305 expect(cluster.settings).toMatchObject({
306 exec: workerFilePath,
307 silent: false,
308 })
309 await pool.destroy()
310 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
311 env: { TEST: 'test' },
312 settings: { args: ['--use', 'http'], silent: true },
313 })
314 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
315 expect(pool.opts.settings).toStrictEqual({
316 args: ['--use', 'http'],
317 silent: true,
318 })
319 expect(cluster.settings).toMatchObject({
320 args: ['--use', 'http'],
321 exec: workerFilePath,
322 silent: true,
323 })
324 await pool.destroy()
325 })
326
327 it('Verify destroyWorkerNode()', async () => {
328 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
329 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
330 const workerNodeKey = 0
331 let disconnectEvent = 0
332 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
333 ++disconnectEvent
334 })
335 let exitEvent = 0
336 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
337 ++exitEvent
338 })
339 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
340 expect(disconnectEvent).toBe(1)
341 expect(exitEvent).toBe(1)
342 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
343 expect(pool.workerNodes.length).toBe(numberOfWorkers)
344 await sleep(500)
345 await pool.destroy()
346 })
347
348 it('Verify that a pool with zero worker fails', () => {
349 expect(
350 () =>
351 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
352 ).toThrow('Cannot instantiate a fixed pool with zero worker')
353 })
354 })