fix: fix back pressure detection
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f 7 const numberOfWorkers = 6
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedClusterPool(
10 numberOfWorkers,
11 './tests/worker-files/cluster/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedClusterPool(
17 numberOfWorkers,
18 './tests/worker-files/cluster/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedClusterPool(
28 numberOfWorkers,
29 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedClusterPool(
33 numberOfWorkers,
34 './tests/worker-files/cluster/echoWorker.js'
35 )
36 const errorPool = new FixedClusterPool(
37 numberOfWorkers,
38 './tests/worker-files/cluster/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedClusterPool(
44 numberOfWorkers,
45 './tests/worker-files/cluster/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedClusterPool(
51 numberOfWorkers,
52 './tests/worker-files/cluster/asyncWorker.js'
53 )
54
8bc77620
APA
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
60 await asyncErrorPool.destroy()
61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
8bc77620
APA
63 })
64
325f50bc 65 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
325f50bc
S
74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedClusterPool(
83 numberOfWorkers,
84 './tests/worker-files/cluster/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
1dcbfefc 95 it("Verify that 'busy' event is emitted", () => {
7c0ba920 96 let poolBusy = 0
aee46736 97 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 98 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 99 pool.execute()
7c0ba920 100 }
14916bf9
JB
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
103 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
104 })
105
594bfb84 106 it('Verify that tasks queuing is working', async () => {
d3d4b67d 107 const promises = new Set()
4e377863 108 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 109 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 110 promises.add(queuePool.execute())
594bfb84 111 }
d3d4b67d 112 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 113 for (const workerNode of queuePool.workerNodes) {
5bb5be17 114 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 115 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
116 queuePool.opts.tasksQueueOptions.concurrency
117 )
f59e1027 118 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
119 expect(workerNode.usage.tasks.queued).toBe(
120 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
121 )
122 expect(workerNode.usage.tasks.maxQueued).toBe(
123 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
124 )
594bfb84 125 }
4e377863
JB
126 expect(queuePool.info.executingTasks).toBe(
127 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
128 )
6b27d407 129 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
130 numberOfWorkers *
131 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
132 )
133 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
134 numberOfWorkers *
135 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 136 )
a1763c54 137 expect(queuePool.info.backPressure).toBe(false)
594bfb84
JB
138 await Promise.all(promises)
139 for (const workerNode of queuePool.workerNodes) {
f59e1027 140 expect(workerNode.usage.tasks.executing).toBe(0)
4e377863 141 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 142 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
143 expect(workerNode.usage.tasks.maxQueued).toBe(
144 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
145 )
594bfb84
JB
146 }
147 })
148
325f50bc
S
149 it('Verify that is possible to have a worker that return undefined', async () => {
150 const result = await emptyPool.execute()
6db75ad9 151 expect(result).toBeUndefined()
325f50bc
S
152 })
153
154 it('Verify that data are sent to the worker correctly', async () => {
155 const data = { f: 10 }
156 const result = await echoPool.execute(data)
e1ffb94f 157 expect(result).toStrictEqual(data)
325f50bc
S
158 })
159
160 it('Verify that error handling is working properly:sync', async () => {
161 const data = { f: 10 }
d46660cd 162 let taskError
8ebe6c30 163 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
164 taskError = e
165 })
325f50bc
S
166 let inError
167 try {
168 await errorPool.execute(data)
169 } catch (e) {
170 inError = e
171 }
172 expect(inError).toBeDefined()
8620fb25 173 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
174 expect(inError).toBe('Error Message from ClusterWorker')
175 expect(taskError).toStrictEqual({
ff128cc9 176 name: 'default',
985d0e79
JB
177 message: 'Error Message from ClusterWorker',
178 data
179 })
18482cec
JB
180 expect(
181 errorPool.workerNodes.some(
8ebe6c30 182 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
183 )
184 ).toBe(true)
325f50bc
S
185 })
186
187 it('Verify that error handling is working properly:async', async () => {
188 const data = { f: 10 }
4f0b85b3 189 let taskError
8ebe6c30 190 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
191 taskError = e
192 })
325f50bc
S
193 let inError
194 try {
195 await asyncErrorPool.execute(data)
196 } catch (e) {
197 inError = e
198 }
199 expect(inError).toBeDefined()
8620fb25 200 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
201 expect(inError).toBe('Error Message from ClusterWorker:async')
202 expect(taskError).toStrictEqual({
ff128cc9 203 name: 'default',
985d0e79
JB
204 message: 'Error Message from ClusterWorker:async',
205 data
206 })
18482cec
JB
207 expect(
208 asyncErrorPool.workerNodes.some(
8ebe6c30 209 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
210 )
211 ).toBe(true)
325f50bc
S
212 })
213
214 it('Verify that async function is working properly', async () => {
215 const data = { f: 10 }
15e5141f 216 const startTime = performance.now()
325f50bc 217 const result = await asyncPool.execute(data)
15e5141f 218 const usedTime = performance.now() - startTime
e1ffb94f 219 expect(result).toStrictEqual(data)
325f50bc
S
220 expect(usedTime).toBeGreaterThanOrEqual(2000)
221 })
222
223 it('Shutdown test', async () => {
bac873bd 224 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
ef3891a3
JB
225 let poolDestroy = 0
226 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
45dbbb14 227 await pool.destroy()
bdacc2d2
JB
228 const numberOfExitEvents = await exitPromise
229 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 230 expect(poolDestroy).toBe(1)
325f50bc
S
231 })
232
1a76932b
JB
233 it('Verify that cluster pool options are checked', async () => {
234 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
235 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
236 expect(pool1.opts.env).toBeUndefined()
237 expect(pool1.opts.settings).toBeUndefined()
238 await pool1.destroy()
239 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
240 env: { TEST: 'test' },
241 settings: { args: ['--use', 'http'], silent: true }
242 })
243 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
244 expect(pool1.opts.settings).toStrictEqual({
245 args: ['--use', 'http'],
246 silent: true
247 })
248 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
249 args: ['--use', 'http'],
250 silent: true,
251 exec: workerFilePath
252 })
253 await pool1.destroy()
254 })
255
325f50bc
S
256 it('Should work even without opts in input', async () => {
257 const pool1 = new FixedClusterPool(
e1ffb94f 258 numberOfWorkers,
76b1e974 259 './tests/worker-files/cluster/testWorker.js'
325f50bc 260 )
6db75ad9 261 const res = await pool1.execute()
30b963d4 262 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
263 // We need to clean up the resources after our test
264 await pool1.destroy()
325f50bc 265 })
8d3782fa
JB
266
267 it('Verify that a pool with zero worker fails', async () => {
268 expect(
269 () =>
270 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 271 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 272 })
325f50bc 273})