Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
1 import { expect } from 'expect'
2
3 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
4 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
5 import { TaskFunctions } from '../../test-types.cjs'
6 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
7
8 describe('Fixed cluster pool test suite', () => {
9 const numberOfWorkers = 8
10 const tasksConcurrency = 2
11 const pool = new FixedClusterPool(
12 numberOfWorkers,
13 './tests/worker-files/cluster/testWorker.cjs',
14 {
15 errorHandler: e => console.error(e)
16 }
17 )
18 const queuePool = new FixedClusterPool(
19 numberOfWorkers,
20 './tests/worker-files/cluster/testWorker.cjs',
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
24 concurrency: tasksConcurrency
25 },
26 errorHandler: e => console.error(e)
27 }
28 )
29 const emptyPool = new FixedClusterPool(
30 numberOfWorkers,
31 './tests/worker-files/cluster/emptyWorker.cjs',
32 { exitHandler: () => console.info('empty pool worker exited') }
33 )
34 const echoPool = new FixedClusterPool(
35 numberOfWorkers,
36 './tests/worker-files/cluster/echoWorker.cjs'
37 )
38 const errorPool = new FixedClusterPool(
39 numberOfWorkers,
40 './tests/worker-files/cluster/errorWorker.cjs',
41 {
42 errorHandler: e => console.error(e)
43 }
44 )
45 const asyncErrorPool = new FixedClusterPool(
46 numberOfWorkers,
47 './tests/worker-files/cluster/asyncErrorWorker.cjs',
48 {
49 errorHandler: e => console.error(e)
50 }
51 )
52 const asyncPool = new FixedClusterPool(
53 numberOfWorkers,
54 './tests/worker-files/cluster/asyncWorker.cjs'
55 )
56
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
64 await queuePool.destroy()
65 })
66
67 it('Verify that the function is executed in a worker cluster', async () => {
68 let result = await pool.execute({
69 function: TaskFunctions.fibonacci
70 })
71 expect(result).toBe(75025)
72 result = await pool.execute({
73 function: TaskFunctions.factorial
74 })
75 expect(result).toBe(9.33262154439441e157)
76 })
77
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result = await pool.execute()
80 expect(result).toStrictEqual({ ok: 1 })
81 })
82
83 it("Verify that 'ready' event is emitted", async () => {
84 const pool = new FixedClusterPool(
85 numberOfWorkers,
86 './tests/worker-files/cluster/testWorker.cjs',
87 {
88 errorHandler: e => console.error(e)
89 }
90 )
91 expect(pool.emitter.eventNames()).toStrictEqual([])
92 let poolReady = 0
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
96 expect(poolReady).toBe(1)
97 await pool.destroy()
98 })
99
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
102 expect(pool.emitter.eventNames()).toStrictEqual([])
103 let poolBusy = 0
104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
106 for (let i = 0; i < numberOfWorkers * 2; i++) {
107 promises.add(pool.execute())
108 }
109 await Promise.all(promises)
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfWorkers + 1)
113 })
114
115 it('Verify that tasks queuing is working', async () => {
116 const promises = new Set()
117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
118 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
119 promises.add(queuePool.execute())
120 }
121 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
122 for (const workerNode of queuePool.workerNodes) {
123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
125 queuePool.opts.tasksQueueOptions.concurrency
126 )
127 expect(workerNode.usage.tasks.executed).toBe(0)
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 )
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 )
134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
135 expect(workerNode.usage.tasks.stolen).toBe(0)
136 }
137 expect(queuePool.info.executedTasks).toBe(0)
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
140 )
141 expect(queuePool.info.queuedTasks).toBe(
142 numberOfWorkers *
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
144 )
145 expect(queuePool.info.maxQueuedTasks).toBe(
146 numberOfWorkers *
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
148 )
149 expect(queuePool.info.backPressure).toBe(false)
150 expect(queuePool.info.stolenTasks).toBe(0)
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfWorkers * maxMultiplier
156 )
157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
158 expect(workerNode.usage.tasks.queued).toBe(0)
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 )
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
163 0
164 )
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfWorkers * maxMultiplier
167 )
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfWorkers * maxMultiplier
171 )
172 }
173 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfWorkers * maxMultiplier
178 )
179 })
180
181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
183 expect(result).toBeUndefined()
184 })
185
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
189 expect(result).toStrictEqual(data)
190 })
191
192 it('Verify that error handling is working properly:sync', async () => {
193 const data = { f: 10 }
194 expect(errorPool.emitter.eventNames()).toStrictEqual([])
195 let taskError
196 errorPool.emitter.on(PoolEvents.taskError, e => {
197 taskError = e
198 })
199 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
200 let inError
201 try {
202 await errorPool.execute(data)
203 } catch (e) {
204 inError = e
205 }
206 expect(inError).toBeDefined()
207 expect(typeof inError === 'string').toBe(true)
208 expect(inError).toBe('Error Message from ClusterWorker')
209 expect(taskError).toStrictEqual({
210 name: DEFAULT_TASK_NAME,
211 message: 'Error Message from ClusterWorker',
212 data
213 })
214 expect(
215 errorPool.workerNodes.some(
216 workerNode => workerNode.usage.tasks.failed === 1
217 )
218 ).toBe(true)
219 })
220
221 it('Verify that error handling is working properly:async', async () => {
222 const data = { f: 10 }
223 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
224 let taskError
225 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
226 taskError = e
227 })
228 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
229 PoolEvents.taskError
230 ])
231 let inError
232 try {
233 await asyncErrorPool.execute(data)
234 } catch (e) {
235 inError = e
236 }
237 expect(inError).toBeDefined()
238 expect(typeof inError === 'string').toBe(true)
239 expect(inError).toBe('Error Message from ClusterWorker:async')
240 expect(taskError).toStrictEqual({
241 name: DEFAULT_TASK_NAME,
242 message: 'Error Message from ClusterWorker:async',
243 data
244 })
245 expect(
246 asyncErrorPool.workerNodes.some(
247 workerNode => workerNode.usage.tasks.failed === 1
248 )
249 ).toBe(true)
250 })
251
252 it('Verify that async function is working properly', async () => {
253 const data = { f: 10 }
254 const startTime = performance.now()
255 const result = await asyncPool.execute(data)
256 const usedTime = performance.now() - startTime
257 expect(result).toStrictEqual(data)
258 expect(usedTime).toBeGreaterThanOrEqual(2000)
259 })
260
261 it('Shutdown test', async () => {
262 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
264 let poolDestroy = 0
265 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
266 expect(pool.emitter.eventNames()).toStrictEqual([
267 PoolEvents.busy,
268 PoolEvents.destroy
269 ])
270 await pool.destroy()
271 const numberOfExitEvents = await exitPromise
272 expect(pool.started).toBe(false)
273 expect(pool.emitter.eventNames()).toStrictEqual([
274 PoolEvents.busy,
275 PoolEvents.destroy
276 ])
277 expect(pool.readyEventEmitted).toBe(false)
278 expect(pool.workerNodes.length).toBe(0)
279 expect(numberOfExitEvents).toBe(numberOfWorkers)
280 expect(poolDestroy).toBe(1)
281 })
282
283 it('Verify that cluster pool options are checked', async () => {
284 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
285 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
286 expect(pool.opts.env).toBeUndefined()
287 expect(pool.opts.settings).toBeUndefined()
288 await pool.destroy()
289 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
290 env: { TEST: 'test' },
291 settings: { args: ['--use', 'http'], silent: true }
292 })
293 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
294 expect(pool.opts.settings).toStrictEqual({
295 args: ['--use', 'http'],
296 silent: true
297 })
298 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
299 args: ['--use', 'http'],
300 silent: true,
301 exec: workerFilePath
302 })
303 await pool.destroy()
304 })
305
306 it('Should work even without opts in input', async () => {
307 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
308 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
309 const res = await pool.execute()
310 expect(res).toStrictEqual({ ok: 1 })
311 // We need to clean up the resources after our test
312 await pool.destroy()
313 })
314
315 it('Verify destroyWorkerNode()', async () => {
316 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
317 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
318 const workerNodeKey = 0
319 let disconnectEvent = 0
320 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
321 ++disconnectEvent
322 })
323 let exitEvent = 0
324 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
325 ++exitEvent
326 })
327 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
328 expect(disconnectEvent).toBe(1)
329 expect(exitEvent).toBe(1)
330 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
331 await pool.destroy()
332 })
333
334 it('Verify that a pool with zero worker fails', () => {
335 expect(
336 () =>
337 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
338 ).toThrow('Cannot instantiate a fixed pool with zero worker')
339 })
340 })