build: cleanup TS configuration
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
6cd5248f 5const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
325f50bc 6
079de991 7describe('Fixed cluster pool test suite', () => {
56d11670 8 const numberOfWorkers = 8
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedClusterPool(
11 numberOfWorkers,
12 './tests/worker-files/cluster/testWorker.js',
13 {
8ebe6c30 14 errorHandler: (e) => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedClusterPool(
18 numberOfWorkers,
19 './tests/worker-files/cluster/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
8ebe6c30 25 errorHandler: (e) => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedClusterPool(
29 numberOfWorkers,
30 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/echoWorker.js'
36 )
37 const errorPool = new FixedClusterPool(
38 numberOfWorkers,
39 './tests/worker-files/cluster/errorWorker.js',
40 {
8ebe6c30 41 errorHandler: (e) => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedClusterPool(
45 numberOfWorkers,
46 './tests/worker-files/cluster/asyncErrorWorker.js',
47 {
8ebe6c30 48 errorHandler: (e) => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedClusterPool(
52 numberOfWorkers,
53 './tests/worker-files/cluster/asyncWorker.js'
54 )
55
8bc77620
APA
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
8bc77620
APA
64 })
65
325f50bc 66 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
325f50bc
S
75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
83 const pool1 = new FixedClusterPool(
84 numberOfWorkers,
85 './tests/worker-files/cluster/testWorker.js',
86 {
8ebe6c30 87 errorHandler: (e) => console.error(e)
079de991
JB
88 }
89 )
90 let poolReady = 0
66293e7d 91 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 92 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
93 expect(poolReady).toBe(1)
94 })
95
94407def
JB
96 it("Verify that 'busy' event is emitted", async () => {
97 const promises = new Set()
7c0ba920 98 let poolBusy = 0
aee46736 99 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 100 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 101 promises.add(pool.execute())
7c0ba920 102 }
94407def 103 await Promise.all(promises)
14916bf9
JB
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
106 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
107 })
108
594bfb84 109 it('Verify that tasks queuing is working', async () => {
d3d4b67d 110 const promises = new Set()
4e377863 111 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 112 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 113 promises.add(queuePool.execute())
594bfb84 114 }
d3d4b67d 115 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 116 for (const workerNode of queuePool.workerNodes) {
5bb5be17 117 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 118 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
119 queuePool.opts.tasksQueueOptions.concurrency
120 )
f59e1027 121 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
122 expect(workerNode.usage.tasks.queued).toBe(
123 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
124 )
125 expect(workerNode.usage.tasks.maxQueued).toBe(
126 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
127 )
56d11670 128 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 129 }
56d11670 130 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
131 expect(queuePool.info.executingTasks).toBe(
132 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
133 )
6b27d407 134 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
135 numberOfWorkers *
136 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
137 )
138 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
139 numberOfWorkers *
140 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 141 )
a1763c54 142 expect(queuePool.info.backPressure).toBe(false)
56d11670 143 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
144 await Promise.all(promises)
145 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
146 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
147 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
148 numberOfWorkers * maxMultiplier
149 )
4e377863 150 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 151 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
152 expect(workerNode.usage.tasks.maxQueued).toBe(
153 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
154 )
56d11670
JB
155 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
156 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
157 numberOfWorkers * maxMultiplier
158 )
594bfb84 159 }
56d11670
JB
160 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
161 expect(queuePool.info.backPressure).toBe(false)
162 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
163 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
164 numberOfWorkers * maxMultiplier
165 )
594bfb84
JB
166 })
167
325f50bc
S
168 it('Verify that is possible to have a worker that return undefined', async () => {
169 const result = await emptyPool.execute()
6db75ad9 170 expect(result).toBeUndefined()
325f50bc
S
171 })
172
173 it('Verify that data are sent to the worker correctly', async () => {
174 const data = { f: 10 }
175 const result = await echoPool.execute(data)
e1ffb94f 176 expect(result).toStrictEqual(data)
325f50bc
S
177 })
178
179 it('Verify that error handling is working properly:sync', async () => {
180 const data = { f: 10 }
d46660cd 181 let taskError
8ebe6c30 182 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
183 taskError = e
184 })
325f50bc
S
185 let inError
186 try {
187 await errorPool.execute(data)
188 } catch (e) {
189 inError = e
190 }
191 expect(inError).toBeDefined()
8620fb25 192 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
193 expect(inError).toBe('Error Message from ClusterWorker')
194 expect(taskError).toStrictEqual({
6cd5248f 195 name: DEFAULT_TASK_NAME,
985d0e79
JB
196 message: 'Error Message from ClusterWorker',
197 data
198 })
18482cec
JB
199 expect(
200 errorPool.workerNodes.some(
8ebe6c30 201 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
202 )
203 ).toBe(true)
325f50bc
S
204 })
205
206 it('Verify that error handling is working properly:async', async () => {
207 const data = { f: 10 }
4f0b85b3 208 let taskError
8ebe6c30 209 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
210 taskError = e
211 })
325f50bc
S
212 let inError
213 try {
214 await asyncErrorPool.execute(data)
215 } catch (e) {
216 inError = e
217 }
218 expect(inError).toBeDefined()
8620fb25 219 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
220 expect(inError).toBe('Error Message from ClusterWorker:async')
221 expect(taskError).toStrictEqual({
6cd5248f 222 name: DEFAULT_TASK_NAME,
985d0e79
JB
223 message: 'Error Message from ClusterWorker:async',
224 data
225 })
18482cec
JB
226 expect(
227 asyncErrorPool.workerNodes.some(
8ebe6c30 228 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
229 )
230 ).toBe(true)
325f50bc
S
231 })
232
233 it('Verify that async function is working properly', async () => {
234 const data = { f: 10 }
15e5141f 235 const startTime = performance.now()
325f50bc 236 const result = await asyncPool.execute(data)
15e5141f 237 const usedTime = performance.now() - startTime
e1ffb94f 238 expect(result).toStrictEqual(data)
325f50bc
S
239 expect(usedTime).toBeGreaterThanOrEqual(2000)
240 })
241
242 it('Shutdown test', async () => {
bac873bd 243 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
ef3891a3
JB
244 let poolDestroy = 0
245 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
45dbbb14 246 await pool.destroy()
bdacc2d2
JB
247 const numberOfExitEvents = await exitPromise
248 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 249 expect(poolDestroy).toBe(1)
325f50bc
S
250 })
251
1a76932b
JB
252 it('Verify that cluster pool options are checked', async () => {
253 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
254 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
255 expect(pool1.opts.env).toBeUndefined()
256 expect(pool1.opts.settings).toBeUndefined()
257 await pool1.destroy()
258 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
259 env: { TEST: 'test' },
260 settings: { args: ['--use', 'http'], silent: true }
261 })
262 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
263 expect(pool1.opts.settings).toStrictEqual({
264 args: ['--use', 'http'],
265 silent: true
266 })
267 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
268 args: ['--use', 'http'],
269 silent: true,
270 exec: workerFilePath
271 })
272 await pool1.destroy()
273 })
274
325f50bc
S
275 it('Should work even without opts in input', async () => {
276 const pool1 = new FixedClusterPool(
e1ffb94f 277 numberOfWorkers,
76b1e974 278 './tests/worker-files/cluster/testWorker.js'
325f50bc 279 )
6db75ad9 280 const res = await pool1.execute()
30b963d4 281 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
282 // We need to clean up the resources after our test
283 await pool1.destroy()
325f50bc 284 })
8d3782fa
JB
285
286 it('Verify that a pool with zero worker fails', async () => {
287 expect(
288 () =>
289 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 290 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 291 })
325f50bc 292})