4471f51309c39cdb056203aa97d0c03205bb9043
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
1 import { expect } from 'expect'
2 import { FixedClusterPool, PoolEvents } from '../../../lib/index.js'
3 import { TaskFunctions } from '../../test-types.js'
4 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
6
7 describe('Fixed cluster pool test suite', () => {
8 const numberOfWorkers = 8
9 const tasksConcurrency = 2
10 const pool = new FixedClusterPool(
11 numberOfWorkers,
12 './tests/worker-files/cluster/testWorker.js',
13 {
14 errorHandler: e => console.error(e)
15 }
16 )
17 const queuePool = new FixedClusterPool(
18 numberOfWorkers,
19 './tests/worker-files/cluster/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
23 concurrency: tasksConcurrency
24 },
25 errorHandler: e => console.error(e)
26 }
27 )
28 const emptyPool = new FixedClusterPool(
29 numberOfWorkers,
30 './tests/worker-files/cluster/emptyWorker.js',
31 { exitHandler: () => console.info('empty pool worker exited') }
32 )
33 const echoPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/echoWorker.js'
36 )
37 const errorPool = new FixedClusterPool(
38 numberOfWorkers,
39 './tests/worker-files/cluster/errorWorker.js',
40 {
41 errorHandler: e => console.error(e)
42 }
43 )
44 const asyncErrorPool = new FixedClusterPool(
45 numberOfWorkers,
46 './tests/worker-files/cluster/asyncErrorWorker.js',
47 {
48 errorHandler: e => console.error(e)
49 }
50 )
51 const asyncPool = new FixedClusterPool(
52 numberOfWorkers,
53 './tests/worker-files/cluster/asyncWorker.js'
54 )
55
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
64 })
65
66 it('Verify that the function is executed in a worker cluster', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
69 })
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
73 })
74 expect(result).toBe(9.33262154439441e157)
75 })
76
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
80 })
81
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedClusterPool(
84 numberOfWorkers,
85 './tests/worker-files/cluster/testWorker.js',
86 {
87 errorHandler: e => console.error(e)
88 }
89 )
90 expect(pool.emitter.eventNames()).toStrictEqual([])
91 let poolReady = 0
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
95 expect(poolReady).toBe(1)
96 await pool.destroy()
97 })
98
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
101 expect(pool.emitter.eventNames()).toStrictEqual([])
102 let poolBusy = 0
103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
105 for (let i = 0; i < numberOfWorkers * 2; i++) {
106 promises.add(pool.execute())
107 }
108 await Promise.all(promises)
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfWorkers + 1)
112 })
113
114 it('Verify that tasks queuing is working', async () => {
115 const promises = new Set()
116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
117 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
118 promises.add(queuePool.execute())
119 }
120 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
126 expect(workerNode.usage.tasks.executed).toBe(0)
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
133 expect(workerNode.usage.tasks.stolen).toBe(0)
134 }
135 expect(queuePool.info.executedTasks).toBe(0)
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
138 )
139 expect(queuePool.info.queuedTasks).toBe(
140 numberOfWorkers *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
142 )
143 expect(queuePool.info.maxQueuedTasks).toBe(
144 numberOfWorkers *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
146 )
147 expect(queuePool.info.backPressure).toBe(false)
148 expect(queuePool.info.stolenTasks).toBe(0)
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfWorkers * maxMultiplier
154 )
155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
156 expect(workerNode.usage.tasks.queued).toBe(0)
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
159 )
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfWorkers * maxMultiplier
163 )
164 }
165 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfWorkers * maxMultiplier
170 )
171 })
172
173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
175 expect(result).toBeUndefined()
176 })
177
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
181 expect(result).toStrictEqual(data)
182 })
183
184 it('Verify that error handling is working properly:sync', async () => {
185 const data = { f: 10 }
186 expect(errorPool.emitter.eventNames()).toStrictEqual([])
187 let taskError
188 errorPool.emitter.on(PoolEvents.taskError, e => {
189 taskError = e
190 })
191 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
192 let inError
193 try {
194 await errorPool.execute(data)
195 } catch (e) {
196 inError = e
197 }
198 expect(inError).toBeDefined()
199 expect(typeof inError === 'string').toBe(true)
200 expect(inError).toBe('Error Message from ClusterWorker')
201 expect(taskError).toStrictEqual({
202 name: DEFAULT_TASK_NAME,
203 message: 'Error Message from ClusterWorker',
204 data
205 })
206 expect(
207 errorPool.workerNodes.some(
208 workerNode => workerNode.usage.tasks.failed === 1
209 )
210 ).toBe(true)
211 })
212
213 it('Verify that error handling is working properly:async', async () => {
214 const data = { f: 10 }
215 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
216 let taskError
217 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
218 taskError = e
219 })
220 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
221 PoolEvents.taskError
222 ])
223 let inError
224 try {
225 await asyncErrorPool.execute(data)
226 } catch (e) {
227 inError = e
228 }
229 expect(inError).toBeDefined()
230 expect(typeof inError === 'string').toBe(true)
231 expect(inError).toBe('Error Message from ClusterWorker:async')
232 expect(taskError).toStrictEqual({
233 name: DEFAULT_TASK_NAME,
234 message: 'Error Message from ClusterWorker:async',
235 data
236 })
237 expect(
238 asyncErrorPool.workerNodes.some(
239 workerNode => workerNode.usage.tasks.failed === 1
240 )
241 ).toBe(true)
242 })
243
244 it('Verify that async function is working properly', async () => {
245 const data = { f: 10 }
246 const startTime = performance.now()
247 const result = await asyncPool.execute(data)
248 const usedTime = performance.now() - startTime
249 expect(result).toStrictEqual(data)
250 expect(usedTime).toBeGreaterThanOrEqual(2000)
251 })
252
253 it('Shutdown test', async () => {
254 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
255 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
256 let poolDestroy = 0
257 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
258 expect(pool.emitter.eventNames()).toStrictEqual([
259 PoolEvents.busy,
260 PoolEvents.destroy
261 ])
262 await pool.destroy()
263 const numberOfExitEvents = await exitPromise
264 expect(pool.started).toBe(false)
265 expect(pool.workerNodes.length).toBe(0)
266 expect(numberOfExitEvents).toBe(numberOfWorkers)
267 expect(poolDestroy).toBe(1)
268 })
269
270 it('Verify that cluster pool options are checked', async () => {
271 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
272 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
273 expect(pool.opts.env).toBeUndefined()
274 expect(pool.opts.settings).toBeUndefined()
275 await pool.destroy()
276 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
277 env: { TEST: 'test' },
278 settings: { args: ['--use', 'http'], silent: true }
279 })
280 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
281 expect(pool.opts.settings).toStrictEqual({
282 args: ['--use', 'http'],
283 silent: true
284 })
285 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
286 args: ['--use', 'http'],
287 silent: true,
288 exec: workerFilePath
289 })
290 await pool.destroy()
291 })
292
293 it('Should work even without opts in input', async () => {
294 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
295 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
296 const res = await pool.execute()
297 expect(res).toStrictEqual({ ok: 1 })
298 // We need to clean up the resources after our test
299 await pool.destroy()
300 })
301
302 it('Verify destroyWorkerNode()', async () => {
303 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
304 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
305 const workerNodeKey = 0
306 let disconnectEvent = 0
307 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
308 ++disconnectEvent
309 })
310 let exitEvent = 0
311 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
312 ++exitEvent
313 })
314 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
315 expect(disconnectEvent).toBe(1)
316 expect(exitEvent).toBe(1)
317 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
318 await pool.destroy()
319 })
320
321 it('Verify that a pool with zero worker fails', () => {
322 expect(
323 () =>
324 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
325 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
326 })
327 })