]> Piment Noir Git Repositories - poolifier.git/blob - tests/pools/cluster/fixed.test.mjs
a3f69131acc8bfcfd21867f131e89e1e1fed7c07
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
1 import { expect } from '@std/expect'
2 import cluster from 'node:cluster'
3
4 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
6 import { TaskFunctions } from '../../test-types.cjs'
7 import { waitWorkerEvents } from '../../test-utils.cjs'
8
9 describe('Fixed cluster pool test suite', () => {
10 const numberOfWorkers = 8
11 const tasksConcurrency = 2
12 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
13
14 before('Create pools', () => {
15 pool = new FixedClusterPool(
16 numberOfWorkers,
17 './tests/worker-files/cluster/testWorker.cjs',
18 {
19 errorHandler: e => console.error(e),
20 }
21 )
22 queuePool = new FixedClusterPool(
23 numberOfWorkers,
24 './tests/worker-files/cluster/testWorker.cjs',
25 {
26 enableTasksQueue: true,
27 errorHandler: e => console.error(e),
28 tasksQueueOptions: {
29 concurrency: tasksConcurrency,
30 },
31 }
32 )
33 emptyPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/emptyWorker.cjs',
36 { exitHandler: () => console.info('empty pool worker exited') }
37 )
38 echoPool = new FixedClusterPool(
39 numberOfWorkers,
40 './tests/worker-files/cluster/echoWorker.cjs'
41 )
42 errorPool = new FixedClusterPool(
43 numberOfWorkers,
44 './tests/worker-files/cluster/errorWorker.cjs',
45 {
46 errorHandler: e => console.error(e),
47 }
48 )
49 asyncErrorPool = new FixedClusterPool(
50 numberOfWorkers,
51 './tests/worker-files/cluster/asyncErrorWorker.cjs',
52 {
53 errorHandler: e => console.error(e),
54 }
55 )
56 asyncPool = new FixedClusterPool(
57 numberOfWorkers,
58 './tests/worker-files/cluster/asyncWorker.cjs'
59 )
60 })
61
62 after('Destroy pools', async () => {
63 // We need to clean up the resources after our tests
64 await echoPool.destroy()
65 await asyncPool.destroy()
66 await errorPool.destroy()
67 await asyncErrorPool.destroy()
68 await emptyPool.destroy()
69 await queuePool.destroy()
70 })
71
72 it('Verify that the function is executed in a worker cluster', async () => {
73 let result = await pool.execute(
74 {
75 function: TaskFunctions.fibonacci,
76 },
77 'default',
78 AbortSignal.timeout(2000)
79 )
80 expect(result).toBe(354224848179262000000)
81 result = await pool.execute(
82 {
83 function: TaskFunctions.factorial,
84 },
85 'default',
86 AbortSignal.timeout(2000)
87 )
88 expect(result).toBe(9.33262154439441e157)
89 })
90
91 it('Verify that is possible to invoke the execute() method without input', async () => {
92 const result = await pool.execute()
93 expect(result).toStrictEqual({ ok: 1 })
94 })
95
96 it('Verify that tasks queuing is working', async () => {
97 const promises = new Set()
98 const maxMultiplier = 3 // Must be greater than tasksConcurrency
99 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
100 promises.add(queuePool.execute())
101 }
102 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
103 for (const workerNode of queuePool.workerNodes) {
104 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
105 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
106 queuePool.opts.tasksQueueOptions.concurrency
107 )
108 expect(workerNode.usage.tasks.executed).toBe(0)
109 expect(workerNode.usage.tasks.queued).toBe(
110 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
111 )
112 expect(workerNode.usage.tasks.maxQueued).toBe(
113 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
114 )
115 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
116 expect(workerNode.usage.tasks.stolen).toBe(0)
117 }
118 expect(queuePool.info.executedTasks).toBe(0)
119 expect(queuePool.info.executingTasks).toBe(
120 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
121 )
122 expect(queuePool.info.queuedTasks).toBe(
123 numberOfWorkers *
124 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
125 )
126 expect(queuePool.info.maxQueuedTasks).toBe(
127 numberOfWorkers *
128 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
129 )
130 expect(queuePool.info.backPressure).toBe(false)
131 expect(queuePool.info.stolenTasks).toBe(0)
132 await Promise.all(promises)
133 for (const workerNode of queuePool.workerNodes) {
134 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
135 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
136 numberOfWorkers * maxMultiplier
137 )
138 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
139 expect(workerNode.usage.tasks.queued).toBe(0)
140 expect(workerNode.usage.tasks.maxQueued).toBe(
141 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
142 )
143 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
144 0
145 )
146 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
147 numberOfWorkers * maxMultiplier
148 )
149 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
150 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
151 numberOfWorkers * maxMultiplier
152 )
153 }
154 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
155 expect(queuePool.info.backPressure).toBe(false)
156 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
157 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
158 numberOfWorkers * maxMultiplier
159 )
160 })
161
162 it('Verify that is possible to have a worker that return undefined', async () => {
163 const result = await emptyPool.execute()
164 expect(result).toBeUndefined()
165 })
166
167 it('Verify that data are sent to the worker correctly', async () => {
168 const data = { f: 10 }
169 const result = await echoPool.execute(data)
170 expect(result).toStrictEqual(data)
171 })
172
173 it('Verify that error handling is working properly:sync', async () => {
174 const data = { f: 10 }
175 expect(errorPool.emitter.eventNames()).toStrictEqual([])
176 let taskError
177 errorPool.emitter.on(PoolEvents.taskError, e => {
178 taskError = e
179 })
180 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
181 let inError
182 try {
183 await errorPool.execute(data)
184 } catch (e) {
185 inError = e
186 }
187 expect(inError).toBeInstanceOf(Error)
188 expect(inError.message).toStrictEqual('Error Message from ClusterWorker')
189 expect(typeof inError.stack === 'string').toBe(true)
190 expect(taskError).toStrictEqual({
191 aborted: false,
192 data,
193 message: inError.message,
194 name: DEFAULT_TASK_NAME,
195 stack: inError.stack,
196 })
197 expect(
198 errorPool.workerNodes.some(
199 workerNode => workerNode.usage.tasks.failed === 1
200 )
201 ).toBe(true)
202 })
203
204 it('Verify that error handling is working properly:async', async () => {
205 const data = { f: 10 }
206 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
207 let taskError
208 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
209 taskError = e
210 })
211 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
212 PoolEvents.taskError,
213 ])
214 let inError
215 try {
216 await asyncErrorPool.execute(data)
217 } catch (e) {
218 inError = e
219 }
220 expect(inError).toBeInstanceOf(Error)
221 expect(inError.message).toStrictEqual(
222 'Error Message from ClusterWorker:async'
223 )
224 expect(typeof inError.stack === 'string').toBe(true)
225 expect(taskError).toStrictEqual({
226 aborted: false,
227 data,
228 message: inError.message,
229 name: DEFAULT_TASK_NAME,
230 stack: inError.stack,
231 })
232 expect(
233 asyncErrorPool.workerNodes.some(
234 workerNode => workerNode.usage.tasks.failed === 1
235 )
236 ).toBe(true)
237 })
238
239 it('Verify that async function is working properly', async () => {
240 const data = { f: 10 }
241 const startTime = performance.now()
242 const result = await asyncPool.execute(data)
243 const usedTime = performance.now() - startTime
244 expect(result).toStrictEqual(data)
245 expect(usedTime).toBeGreaterThanOrEqual(2000)
246 })
247
248 it('Verify that task can be aborted', async () => {
249 let error
250
251 try {
252 await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
253 } catch (e) {
254 error = e
255 }
256 expect(error).toBeInstanceOf(Error)
257 expect(error.name).toBe('TimeoutError')
258 expect(error.message).toBe('The operation was aborted due to timeout')
259 expect(error.stack).toBeDefined()
260
261 const abortController = new AbortController()
262 setTimeout(() => {
263 abortController.abort(new Error('Task aborted'))
264 }, 500)
265 try {
266 await asyncErrorPool.execute({}, 'default', abortController.signal)
267 } catch (e) {
268 error = e
269 }
270 expect(error).toBeInstanceOf(Error)
271 expect(error.message).toBe('Task aborted')
272 expect(error.stack).toBeDefined()
273 })
274
275 it('Shutdown test', async () => {
276 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
277 expect(pool.emitter.eventNames()).toStrictEqual([])
278 let poolDestroy = 0
279 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
280 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
281 await pool.destroy()
282 const numberOfExitEvents = await exitPromise
283 expect(pool.info.started).toBe(false)
284 expect(pool.info.ready).toBe(false)
285 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
286 expect(pool.readyEventEmitted).toBe(false)
287 expect(pool.busyEventEmitted).toBe(false)
288 expect(pool.backPressureEventEmitted).toBe(false)
289 expect(pool.workerNodes.length).toBe(0)
290 expect(numberOfExitEvents).toBe(numberOfWorkers)
291 expect(poolDestroy).toBe(1)
292 })
293
294 it('Verify that cluster pool options are checked', async () => {
295 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
296 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
297 expect(pool.opts.env).toBeUndefined()
298 expect(pool.opts.settings).toBeUndefined()
299 expect(cluster.settings).toMatchObject({
300 exec: workerFilePath,
301 silent: false,
302 })
303 await pool.destroy()
304 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
305 env: { TEST: 'test' },
306 settings: { args: ['--use', 'http'], silent: true },
307 })
308 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
309 expect(pool.opts.settings).toStrictEqual({
310 args: ['--use', 'http'],
311 silent: true,
312 })
313 expect(cluster.settings).toMatchObject({
314 args: ['--use', 'http'],
315 exec: workerFilePath,
316 silent: true,
317 })
318 await pool.destroy()
319 })
320
321 it('Verify destroyWorkerNode()', async () => {
322 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
323 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
324 const workerNodeKey = 0
325 let disconnectEvent = 0
326 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
327 ++disconnectEvent
328 })
329 let exitEvent = 0
330 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
331 ++exitEvent
332 })
333 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
334 expect(disconnectEvent).toBe(1)
335 expect(exitEvent).toBe(1)
336 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
337 expect(pool.workerNodes.length).toBe(numberOfWorkers)
338 await pool.destroy()
339 })
340
341 it('Verify that a pool with zero worker fails', () => {
342 expect(
343 () =>
344 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
345 ).toThrow('Cannot instantiate a fixed pool with zero worker')
346 })
347 })