094e506d19d73b1e44ac596eadd910444e933d67
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
1 import { expect } from 'expect'
2 import cluster from 'node:cluster'
3
4 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
6 import { TaskFunctions } from '../../test-types.cjs'
7 import { waitWorkerEvents } from '../../test-utils.cjs'
8
9 describe('Fixed cluster pool test suite', () => {
10 const numberOfWorkers = 8
11 const tasksConcurrency = 2
12 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
13
14 before('Create pools', () => {
15 pool = new FixedClusterPool(
16 numberOfWorkers,
17 './tests/worker-files/cluster/testWorker.cjs',
18 {
19 errorHandler: e => console.error(e),
20 }
21 )
22 queuePool = new FixedClusterPool(
23 numberOfWorkers,
24 './tests/worker-files/cluster/testWorker.cjs',
25 {
26 enableTasksQueue: true,
27 errorHandler: e => console.error(e),
28 tasksQueueOptions: {
29 concurrency: tasksConcurrency,
30 },
31 }
32 )
33 emptyPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/emptyWorker.cjs',
36 { exitHandler: () => console.info('empty pool worker exited') }
37 )
38 echoPool = new FixedClusterPool(
39 numberOfWorkers,
40 './tests/worker-files/cluster/echoWorker.cjs'
41 )
42 errorPool = new FixedClusterPool(
43 numberOfWorkers,
44 './tests/worker-files/cluster/errorWorker.cjs',
45 {
46 errorHandler: e => console.error(e),
47 }
48 )
49 asyncErrorPool = new FixedClusterPool(
50 numberOfWorkers,
51 './tests/worker-files/cluster/asyncErrorWorker.cjs',
52 {
53 errorHandler: e => console.error(e),
54 }
55 )
56 asyncPool = new FixedClusterPool(
57 numberOfWorkers,
58 './tests/worker-files/cluster/asyncWorker.cjs'
59 )
60 })
61
62 after('Destroy pools', async () => {
63 // We need to clean up the resources after our tests
64 await echoPool.destroy()
65 await asyncPool.destroy()
66 await errorPool.destroy()
67 await asyncErrorPool.destroy()
68 await emptyPool.destroy()
69 await queuePool.destroy()
70 })
71
72 it('Verify that the function is executed in a worker cluster', async () => {
73 let result = await pool.execute({
74 function: TaskFunctions.fibonacci,
75 })
76 expect(result).toBe(354224848179262000000)
77 result = await pool.execute({
78 function: TaskFunctions.factorial,
79 })
80 expect(result).toBe(9.33262154439441e157)
81 })
82
83 it('Verify that is possible to invoke the execute() method without input', async () => {
84 const result = await pool.execute()
85 expect(result).toStrictEqual({ ok: 1 })
86 })
87
88 it('Verify that tasks queuing is working', async () => {
89 const promises = new Set()
90 const maxMultiplier = 3 // Must be greater than tasksConcurrency
91 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
92 promises.add(queuePool.execute())
93 }
94 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
95 for (const workerNode of queuePool.workerNodes) {
96 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
97 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
98 queuePool.opts.tasksQueueOptions.concurrency
99 )
100 expect(workerNode.usage.tasks.executed).toBe(0)
101 expect(workerNode.usage.tasks.queued).toBe(
102 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
103 )
104 expect(workerNode.usage.tasks.maxQueued).toBe(
105 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
106 )
107 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
108 expect(workerNode.usage.tasks.stolen).toBe(0)
109 }
110 expect(queuePool.info.executedTasks).toBe(0)
111 expect(queuePool.info.executingTasks).toBe(
112 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
113 )
114 expect(queuePool.info.queuedTasks).toBe(
115 numberOfWorkers *
116 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
117 )
118 expect(queuePool.info.maxQueuedTasks).toBe(
119 numberOfWorkers *
120 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
121 )
122 expect(queuePool.info.backPressure).toBe(false)
123 expect(queuePool.info.stolenTasks).toBe(0)
124 await Promise.all(promises)
125 for (const workerNode of queuePool.workerNodes) {
126 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
127 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
128 numberOfWorkers * maxMultiplier
129 )
130 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
131 expect(workerNode.usage.tasks.queued).toBe(0)
132 expect(workerNode.usage.tasks.maxQueued).toBe(
133 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
134 )
135 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
136 0
137 )
138 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
139 numberOfWorkers * maxMultiplier
140 )
141 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
142 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
143 numberOfWorkers * maxMultiplier
144 )
145 }
146 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
147 expect(queuePool.info.backPressure).toBe(false)
148 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
149 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
150 numberOfWorkers * maxMultiplier
151 )
152 })
153
154 it('Verify that is possible to have a worker that return undefined', async () => {
155 const result = await emptyPool.execute()
156 expect(result).toBeUndefined()
157 })
158
159 it('Verify that data are sent to the worker correctly', async () => {
160 const data = { f: 10 }
161 const result = await echoPool.execute(data)
162 expect(result).toStrictEqual(data)
163 })
164
165 it('Verify that error handling is working properly:sync', async () => {
166 const data = { f: 10 }
167 expect(errorPool.emitter.eventNames()).toStrictEqual([])
168 let taskError
169 errorPool.emitter.on(PoolEvents.taskError, e => {
170 taskError = e
171 })
172 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
173 let inError
174 try {
175 await errorPool.execute(data)
176 } catch (e) {
177 inError = e
178 }
179 expect(inError).toStrictEqual('Error Message from ClusterWorker')
180 expect(taskError).toStrictEqual({
181 data,
182 message: 'Error Message from ClusterWorker',
183 name: DEFAULT_TASK_NAME,
184 })
185 expect(
186 errorPool.workerNodes.some(
187 workerNode => workerNode.usage.tasks.failed === 1
188 )
189 ).toBe(true)
190 })
191
192 it('Verify that error handling is working properly:async', async () => {
193 const data = { f: 10 }
194 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
195 let taskError
196 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
197 taskError = e
198 })
199 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
200 PoolEvents.taskError,
201 ])
202 let inError
203 try {
204 await asyncErrorPool.execute(data)
205 } catch (e) {
206 inError = e
207 }
208 expect(inError).toStrictEqual('Error Message from ClusterWorker:async')
209 expect(taskError).toStrictEqual({
210 data,
211 message: 'Error Message from ClusterWorker:async',
212 name: DEFAULT_TASK_NAME,
213 })
214 expect(
215 asyncErrorPool.workerNodes.some(
216 workerNode => workerNode.usage.tasks.failed === 1
217 )
218 ).toBe(true)
219 })
220
221 it('Verify that async function is working properly', async () => {
222 const data = { f: 10 }
223 const startTime = performance.now()
224 const result = await asyncPool.execute(data)
225 const usedTime = performance.now() - startTime
226 expect(result).toStrictEqual(data)
227 expect(usedTime).toBeGreaterThanOrEqual(2000)
228 })
229
230 it('Shutdown test', async () => {
231 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
232 expect(pool.emitter.eventNames()).toStrictEqual([])
233 let poolDestroy = 0
234 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
235 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
236 await pool.destroy()
237 const numberOfExitEvents = await exitPromise
238 expect(pool.info.started).toBe(false)
239 expect(pool.info.ready).toBe(false)
240 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
241 expect(pool.readyEventEmitted).toBe(false)
242 expect(pool.busyEventEmitted).toBe(false)
243 expect(pool.backPressureEventEmitted).toBe(false)
244 expect(pool.workerNodes.length).toBe(0)
245 expect(numberOfExitEvents).toBe(numberOfWorkers)
246 expect(poolDestroy).toBe(1)
247 })
248
249 it('Verify that cluster pool options are checked', async () => {
250 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
251 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
252 expect(pool.opts.env).toBeUndefined()
253 expect(pool.opts.settings).toBeUndefined()
254 expect(cluster.settings).toMatchObject({
255 exec: workerFilePath,
256 silent: false,
257 })
258 await pool.destroy()
259 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
260 env: { TEST: 'test' },
261 settings: { args: ['--use', 'http'], silent: true },
262 })
263 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
264 expect(pool.opts.settings).toStrictEqual({
265 args: ['--use', 'http'],
266 silent: true,
267 })
268 expect(cluster.settings).toMatchObject({
269 args: ['--use', 'http'],
270 exec: workerFilePath,
271 silent: true,
272 })
273 await pool.destroy()
274 })
275
276 it('Verify destroyWorkerNode()', async () => {
277 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
278 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
279 const workerNodeKey = 0
280 let disconnectEvent = 0
281 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
282 ++disconnectEvent
283 })
284 let exitEvent = 0
285 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
286 ++exitEvent
287 })
288 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
289 expect(disconnectEvent).toBe(1)
290 expect(exitEvent).toBe(1)
291 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
292 expect(pool.workerNodes.length).toBe(numberOfWorkers)
293 await pool.destroy()
294 })
295
296 it('Verify that a pool with zero worker fails', () => {
297 expect(
298 () =>
299 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
300 ).toThrow('Cannot instantiate a fixed pool with zero worker')
301 })
302 })