chore: generate documentation
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
56d11670 7 const numberOfWorkers = 8
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedClusterPool(
10 numberOfWorkers,
11 './tests/worker-files/cluster/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedClusterPool(
17 numberOfWorkers,
18 './tests/worker-files/cluster/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedClusterPool(
28 numberOfWorkers,
29 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedClusterPool(
33 numberOfWorkers,
34 './tests/worker-files/cluster/echoWorker.js'
35 )
36 const errorPool = new FixedClusterPool(
37 numberOfWorkers,
38 './tests/worker-files/cluster/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedClusterPool(
44 numberOfWorkers,
45 './tests/worker-files/cluster/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedClusterPool(
51 numberOfWorkers,
52 './tests/worker-files/cluster/asyncWorker.js'
53 )
54
8bc77620
APA
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
60 await asyncErrorPool.destroy()
61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
8bc77620
APA
63 })
64
325f50bc 65 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
325f50bc
S
74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedClusterPool(
83 numberOfWorkers,
84 './tests/worker-files/cluster/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
94407def
JB
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises = new Set()
7c0ba920 97 let poolBusy = 0
aee46736 98 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 99 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 100 promises.add(pool.execute())
7c0ba920 101 }
94407def 102 await Promise.all(promises)
14916bf9
JB
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
105 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
106 })
107
594bfb84 108 it('Verify that tasks queuing is working', async () => {
d3d4b67d 109 const promises = new Set()
4e377863 110 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 111 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 112 promises.add(queuePool.execute())
594bfb84 113 }
d3d4b67d 114 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 115 for (const workerNode of queuePool.workerNodes) {
5bb5be17 116 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 117 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
118 queuePool.opts.tasksQueueOptions.concurrency
119 )
f59e1027 120 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
121 expect(workerNode.usage.tasks.queued).toBe(
122 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
123 )
124 expect(workerNode.usage.tasks.maxQueued).toBe(
125 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
126 )
56d11670 127 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 128 }
56d11670 129 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
130 expect(queuePool.info.executingTasks).toBe(
131 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
132 )
6b27d407 133 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
134 numberOfWorkers *
135 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
136 )
137 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
138 numberOfWorkers *
139 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 140 )
a1763c54 141 expect(queuePool.info.backPressure).toBe(false)
56d11670 142 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
143 await Promise.all(promises)
144 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
145 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
146 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
147 numberOfWorkers * maxMultiplier
148 )
4e377863 149 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 150 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
151 expect(workerNode.usage.tasks.maxQueued).toBe(
152 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
153 )
56d11670
JB
154 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
155 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
156 numberOfWorkers * maxMultiplier
157 )
594bfb84 158 }
56d11670
JB
159 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
160 expect(queuePool.info.backPressure).toBe(false)
161 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
162 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
163 numberOfWorkers * maxMultiplier
164 )
594bfb84
JB
165 })
166
325f50bc
S
167 it('Verify that is possible to have a worker that return undefined', async () => {
168 const result = await emptyPool.execute()
6db75ad9 169 expect(result).toBeUndefined()
325f50bc
S
170 })
171
172 it('Verify that data are sent to the worker correctly', async () => {
173 const data = { f: 10 }
174 const result = await echoPool.execute(data)
e1ffb94f 175 expect(result).toStrictEqual(data)
325f50bc
S
176 })
177
178 it('Verify that error handling is working properly:sync', async () => {
179 const data = { f: 10 }
d46660cd 180 let taskError
8ebe6c30 181 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
182 taskError = e
183 })
325f50bc
S
184 let inError
185 try {
186 await errorPool.execute(data)
187 } catch (e) {
188 inError = e
189 }
190 expect(inError).toBeDefined()
8620fb25 191 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
192 expect(inError).toBe('Error Message from ClusterWorker')
193 expect(taskError).toStrictEqual({
ff128cc9 194 name: 'default',
985d0e79
JB
195 message: 'Error Message from ClusterWorker',
196 data
197 })
18482cec
JB
198 expect(
199 errorPool.workerNodes.some(
8ebe6c30 200 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
201 )
202 ).toBe(true)
325f50bc
S
203 })
204
205 it('Verify that error handling is working properly:async', async () => {
206 const data = { f: 10 }
4f0b85b3 207 let taskError
8ebe6c30 208 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
209 taskError = e
210 })
325f50bc
S
211 let inError
212 try {
213 await asyncErrorPool.execute(data)
214 } catch (e) {
215 inError = e
216 }
217 expect(inError).toBeDefined()
8620fb25 218 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
219 expect(inError).toBe('Error Message from ClusterWorker:async')
220 expect(taskError).toStrictEqual({
ff128cc9 221 name: 'default',
985d0e79
JB
222 message: 'Error Message from ClusterWorker:async',
223 data
224 })
18482cec
JB
225 expect(
226 asyncErrorPool.workerNodes.some(
8ebe6c30 227 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
228 )
229 ).toBe(true)
325f50bc
S
230 })
231
232 it('Verify that async function is working properly', async () => {
233 const data = { f: 10 }
15e5141f 234 const startTime = performance.now()
325f50bc 235 const result = await asyncPool.execute(data)
15e5141f 236 const usedTime = performance.now() - startTime
e1ffb94f 237 expect(result).toStrictEqual(data)
325f50bc
S
238 expect(usedTime).toBeGreaterThanOrEqual(2000)
239 })
240
241 it('Shutdown test', async () => {
bac873bd 242 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
ef3891a3
JB
243 let poolDestroy = 0
244 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
45dbbb14 245 await pool.destroy()
bdacc2d2
JB
246 const numberOfExitEvents = await exitPromise
247 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 248 expect(poolDestroy).toBe(1)
325f50bc
S
249 })
250
1a76932b
JB
251 it('Verify that cluster pool options are checked', async () => {
252 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
253 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
254 expect(pool1.opts.env).toBeUndefined()
255 expect(pool1.opts.settings).toBeUndefined()
256 await pool1.destroy()
257 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
258 env: { TEST: 'test' },
259 settings: { args: ['--use', 'http'], silent: true }
260 })
261 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
262 expect(pool1.opts.settings).toStrictEqual({
263 args: ['--use', 'http'],
264 silent: true
265 })
266 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
267 args: ['--use', 'http'],
268 silent: true,
269 exec: workerFilePath
270 })
271 await pool1.destroy()
272 })
273
325f50bc
S
274 it('Should work even without opts in input', async () => {
275 const pool1 = new FixedClusterPool(
e1ffb94f 276 numberOfWorkers,
76b1e974 277 './tests/worker-files/cluster/testWorker.js'
325f50bc 278 )
6db75ad9 279 const res = await pool1.execute()
30b963d4 280 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
281 // We need to clean up the resources after our test
282 await pool1.destroy()
325f50bc 283 })
8d3782fa
JB
284
285 it('Verify that a pool with zero worker fails', async () => {
286 expect(
287 () =>
288 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 289 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 290 })
325f50bc 291})