test: add fixed priority queue get() test
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
1 import cluster from 'node:cluster'
2
3 import { expect } from 'expect'
4
5 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
6 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
7 import { TaskFunctions } from '../../test-types.cjs'
8 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
9
10 describe('Fixed cluster pool test suite', () => {
11 const numberOfWorkers = 8
12 const tasksConcurrency = 2
13 const pool = new FixedClusterPool(
14 numberOfWorkers,
15 './tests/worker-files/cluster/testWorker.cjs',
16 {
17 errorHandler: e => console.error(e)
18 }
19 )
20 const queuePool = new FixedClusterPool(
21 numberOfWorkers,
22 './tests/worker-files/cluster/testWorker.cjs',
23 {
24 enableTasksQueue: true,
25 tasksQueueOptions: {
26 concurrency: tasksConcurrency
27 },
28 errorHandler: e => console.error(e)
29 }
30 )
31 const emptyPool = new FixedClusterPool(
32 numberOfWorkers,
33 './tests/worker-files/cluster/emptyWorker.cjs',
34 { exitHandler: () => console.info('empty pool worker exited') }
35 )
36 const echoPool = new FixedClusterPool(
37 numberOfWorkers,
38 './tests/worker-files/cluster/echoWorker.cjs'
39 )
40 const errorPool = new FixedClusterPool(
41 numberOfWorkers,
42 './tests/worker-files/cluster/errorWorker.cjs',
43 {
44 errorHandler: e => console.error(e)
45 }
46 )
47 const asyncErrorPool = new FixedClusterPool(
48 numberOfWorkers,
49 './tests/worker-files/cluster/asyncErrorWorker.cjs',
50 {
51 errorHandler: e => console.error(e)
52 }
53 )
54 const asyncPool = new FixedClusterPool(
55 numberOfWorkers,
56 './tests/worker-files/cluster/asyncWorker.cjs'
57 )
58
59 after('Destroy all pools', async () => {
60 // We need to clean up the resources after our test
61 await echoPool.destroy()
62 await asyncPool.destroy()
63 await errorPool.destroy()
64 await asyncErrorPool.destroy()
65 await emptyPool.destroy()
66 await queuePool.destroy()
67 })
68
69 it('Verify that the function is executed in a worker cluster', async () => {
70 let result = await pool.execute({
71 function: TaskFunctions.fibonacci
72 })
73 expect(result).toBe(354224848179262000000)
74 result = await pool.execute({
75 function: TaskFunctions.factorial
76 })
77 expect(result).toBe(9.33262154439441e157)
78 })
79
80 it('Verify that is possible to invoke the execute() method without input', async () => {
81 const result = await pool.execute()
82 expect(result).toStrictEqual({ ok: 1 })
83 })
84
85 it("Verify that 'ready' event is emitted", async () => {
86 const pool = new FixedClusterPool(
87 numberOfWorkers,
88 './tests/worker-files/cluster/testWorker.cjs',
89 {
90 errorHandler: e => console.error(e)
91 }
92 )
93 expect(pool.emitter.eventNames()).toStrictEqual([])
94 let poolReady = 0
95 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
96 await waitPoolEvents(pool, PoolEvents.ready, 1)
97 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
98 expect(poolReady).toBe(1)
99 await pool.destroy()
100 })
101
102 it("Verify that 'busy' event is emitted", async () => {
103 const promises = new Set()
104 expect(pool.emitter.eventNames()).toStrictEqual([])
105 let poolBusy = 0
106 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
107 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
108 for (let i = 0; i < numberOfWorkers * 2; i++) {
109 promises.add(pool.execute())
110 }
111 await Promise.all(promises)
112 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
113 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
114 expect(poolBusy).toBe(numberOfWorkers + 1)
115 })
116
117 it('Verify that tasks queuing is working', async () => {
118 const promises = new Set()
119 const maxMultiplier = 3 // Must be greater than tasksConcurrency
120 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
121 promises.add(queuePool.execute())
122 }
123 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
124 for (const workerNode of queuePool.workerNodes) {
125 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
126 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
127 queuePool.opts.tasksQueueOptions.concurrency
128 )
129 expect(workerNode.usage.tasks.executed).toBe(0)
130 expect(workerNode.usage.tasks.queued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
133 expect(workerNode.usage.tasks.maxQueued).toBe(
134 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
135 )
136 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
137 expect(workerNode.usage.tasks.stolen).toBe(0)
138 }
139 expect(queuePool.info.executedTasks).toBe(0)
140 expect(queuePool.info.executingTasks).toBe(
141 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
142 )
143 expect(queuePool.info.queuedTasks).toBe(
144 numberOfWorkers *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
146 )
147 expect(queuePool.info.maxQueuedTasks).toBe(
148 numberOfWorkers *
149 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
150 )
151 expect(queuePool.info.backPressure).toBe(false)
152 expect(queuePool.info.stolenTasks).toBe(0)
153 await Promise.all(promises)
154 for (const workerNode of queuePool.workerNodes) {
155 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
156 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
157 numberOfWorkers * maxMultiplier
158 )
159 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
160 expect(workerNode.usage.tasks.queued).toBe(0)
161 expect(workerNode.usage.tasks.maxQueued).toBe(
162 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
163 )
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
165 0
166 )
167 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
168 numberOfWorkers * maxMultiplier
169 )
170 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
171 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
172 numberOfWorkers * maxMultiplier
173 )
174 }
175 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
176 expect(queuePool.info.backPressure).toBe(false)
177 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
178 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
179 numberOfWorkers * maxMultiplier
180 )
181 })
182
183 it('Verify that is possible to have a worker that return undefined', async () => {
184 const result = await emptyPool.execute()
185 expect(result).toBeUndefined()
186 })
187
188 it('Verify that data are sent to the worker correctly', async () => {
189 const data = { f: 10 }
190 const result = await echoPool.execute(data)
191 expect(result).toStrictEqual(data)
192 })
193
194 it('Verify that error handling is working properly:sync', async () => {
195 const data = { f: 10 }
196 expect(errorPool.emitter.eventNames()).toStrictEqual([])
197 let taskError
198 errorPool.emitter.on(PoolEvents.taskError, e => {
199 taskError = e
200 })
201 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
202 let inError
203 try {
204 await errorPool.execute(data)
205 } catch (e) {
206 inError = e
207 }
208 expect(inError).toBeDefined()
209 expect(typeof inError === 'string').toBe(true)
210 expect(inError).toBe('Error Message from ClusterWorker')
211 expect(taskError).toStrictEqual({
212 name: DEFAULT_TASK_NAME,
213 message: 'Error Message from ClusterWorker',
214 data
215 })
216 expect(
217 errorPool.workerNodes.some(
218 workerNode => workerNode.usage.tasks.failed === 1
219 )
220 ).toBe(true)
221 })
222
223 it('Verify that error handling is working properly:async', async () => {
224 const data = { f: 10 }
225 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
226 let taskError
227 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
228 taskError = e
229 })
230 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
231 PoolEvents.taskError
232 ])
233 let inError
234 try {
235 await asyncErrorPool.execute(data)
236 } catch (e) {
237 inError = e
238 }
239 expect(inError).toBeDefined()
240 expect(typeof inError === 'string').toBe(true)
241 expect(inError).toBe('Error Message from ClusterWorker:async')
242 expect(taskError).toStrictEqual({
243 name: DEFAULT_TASK_NAME,
244 message: 'Error Message from ClusterWorker:async',
245 data
246 })
247 expect(
248 asyncErrorPool.workerNodes.some(
249 workerNode => workerNode.usage.tasks.failed === 1
250 )
251 ).toBe(true)
252 })
253
254 it('Verify that async function is working properly', async () => {
255 const data = { f: 10 }
256 const startTime = performance.now()
257 const result = await asyncPool.execute(data)
258 const usedTime = performance.now() - startTime
259 expect(result).toStrictEqual(data)
260 expect(usedTime).toBeGreaterThanOrEqual(2000)
261 })
262
263 it('Shutdown test', async () => {
264 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
265 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
266 let poolDestroy = 0
267 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
268 expect(pool.emitter.eventNames()).toStrictEqual([
269 PoolEvents.busy,
270 PoolEvents.destroy
271 ])
272 await pool.destroy()
273 const numberOfExitEvents = await exitPromise
274 expect(pool.started).toBe(false)
275 expect(pool.emitter.eventNames()).toStrictEqual([
276 PoolEvents.busy,
277 PoolEvents.destroy
278 ])
279 expect(pool.readyEventEmitted).toBe(false)
280 expect(pool.workerNodes.length).toBe(0)
281 expect(numberOfExitEvents).toBe(numberOfWorkers)
282 expect(poolDestroy).toBe(1)
283 })
284
285 it('Verify that cluster pool options are checked', async () => {
286 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
287 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
288 expect(pool.opts.env).toBeUndefined()
289 expect(pool.opts.settings).toBeUndefined()
290 expect(cluster.settings).toMatchObject({
291 exec: workerFilePath,
292 silent: false
293 })
294 await pool.destroy()
295 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
296 env: { TEST: 'test' },
297 settings: { args: ['--use', 'http'], silent: true }
298 })
299 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
300 expect(pool.opts.settings).toStrictEqual({
301 args: ['--use', 'http'],
302 silent: true
303 })
304 expect(cluster.settings).toMatchObject({
305 args: ['--use', 'http'],
306 silent: true,
307 exec: workerFilePath
308 })
309 await pool.destroy()
310 })
311
312 it('Should work even without opts in input', async () => {
313 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
314 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
315 const res = await pool.execute()
316 expect(res).toStrictEqual({ ok: 1 })
317 // We need to clean up the resources after our test
318 await pool.destroy()
319 })
320
321 it('Verify destroyWorkerNode()', async () => {
322 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
323 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
324 const workerNodeKey = 0
325 let disconnectEvent = 0
326 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
327 ++disconnectEvent
328 })
329 let exitEvent = 0
330 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
331 ++exitEvent
332 })
333 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
334 expect(disconnectEvent).toBe(1)
335 expect(exitEvent).toBe(1)
336 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
337 expect(pool.workerNodes.length).toBe(numberOfWorkers)
338 await pool.destroy()
339 })
340
341 it('Verify that a pool with zero worker fails', () => {
342 expect(
343 () =>
344 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
345 ).toThrow('Cannot instantiate a fixed pool with zero worker')
346 })
347 })