perf: optimize tasks stealing in corner case
[poolifier.git] / tests / pools / cluster / fixed.test.mjs
CommitLineData
a074ffee
JB
1import { expect } from 'expect'
2import { FixedClusterPool, PoolEvents } from '../../../lib/index.js'
3import { TaskFunctions } from '../../test-types.js'
4import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
325f50bc 6
079de991 7describe('Fixed cluster pool test suite', () => {
56d11670 8 const numberOfWorkers = 8
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedClusterPool(
11 numberOfWorkers,
12 './tests/worker-files/cluster/testWorker.js',
13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedClusterPool(
18 numberOfWorkers,
19 './tests/worker-files/cluster/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
041dc05b 25 errorHandler: e => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedClusterPool(
29 numberOfWorkers,
30 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedClusterPool(
34 numberOfWorkers,
35 './tests/worker-files/cluster/echoWorker.js'
36 )
37 const errorPool = new FixedClusterPool(
38 numberOfWorkers,
39 './tests/worker-files/cluster/errorWorker.js',
40 {
041dc05b 41 errorHandler: e => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedClusterPool(
45 numberOfWorkers,
46 './tests/worker-files/cluster/asyncErrorWorker.js',
47 {
041dc05b 48 errorHandler: e => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedClusterPool(
52 numberOfWorkers,
53 './tests/worker-files/cluster/asyncWorker.js'
54 )
55
8bc77620
APA
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
8bc77620
APA
64 })
65
325f50bc 66 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
325f50bc
S
75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedClusterPool(
079de991
JB
84 numberOfWorkers,
85 './tests/worker-files/cluster/testWorker.js',
86 {
041dc05b 87 errorHandler: e => console.error(e)
079de991
JB
88 }
89 )
c726f66c 90 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 91 let poolReady = 0
0fe39c97
JB
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 95 expect(poolReady).toBe(1)
0fe39c97 96 await pool.destroy()
216541b6
JB
97 })
98
94407def
JB
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
b04bd0f1 101 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 102 let poolBusy = 0
aee46736 103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 105 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 106 promises.add(pool.execute())
7c0ba920 107 }
94407def 108 await Promise.all(promises)
14916bf9
JB
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
112 })
113
594bfb84 114 it('Verify that tasks queuing is working', async () => {
d3d4b67d 115 const promises = new Set()
4e377863 116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 117 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 118 promises.add(queuePool.execute())
594bfb84 119 }
d3d4b67d 120 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 121 for (const workerNode of queuePool.workerNodes) {
5bb5be17 122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
f59e1027 126 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
463226a4 133 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
56d11670 134 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 135 }
56d11670 136 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
137 expect(queuePool.info.executingTasks).toBe(
138 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
139 )
6b27d407 140 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
141 numberOfWorkers *
142 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
143 )
144 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
145 numberOfWorkers *
146 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 147 )
a1763c54 148 expect(queuePool.info.backPressure).toBe(false)
56d11670 149 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
150 await Promise.all(promises)
151 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
152 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
153 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
154 numberOfWorkers * maxMultiplier
155 )
4e377863 156 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 157 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
158 expect(workerNode.usage.tasks.maxQueued).toBe(
159 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
160 )
463226a4
JB
161 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
162 0
163 )
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
165 numberOfWorkers * maxMultiplier
166 )
56d11670
JB
167 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
168 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
169 numberOfWorkers * maxMultiplier
170 )
594bfb84 171 }
56d11670
JB
172 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
173 expect(queuePool.info.backPressure).toBe(false)
174 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
175 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
176 numberOfWorkers * maxMultiplier
177 )
594bfb84
JB
178 })
179
325f50bc
S
180 it('Verify that is possible to have a worker that return undefined', async () => {
181 const result = await emptyPool.execute()
6db75ad9 182 expect(result).toBeUndefined()
325f50bc
S
183 })
184
185 it('Verify that data are sent to the worker correctly', async () => {
186 const data = { f: 10 }
187 const result = await echoPool.execute(data)
e1ffb94f 188 expect(result).toStrictEqual(data)
325f50bc
S
189 })
190
191 it('Verify that error handling is working properly:sync', async () => {
192 const data = { f: 10 }
c726f66c 193 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 194 let taskError
041dc05b 195 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
196 taskError = e
197 })
c726f66c 198 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
325f50bc
S
199 let inError
200 try {
201 await errorPool.execute(data)
202 } catch (e) {
203 inError = e
204 }
205 expect(inError).toBeDefined()
8620fb25 206 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
207 expect(inError).toBe('Error Message from ClusterWorker')
208 expect(taskError).toStrictEqual({
6cd5248f 209 name: DEFAULT_TASK_NAME,
985d0e79
JB
210 message: 'Error Message from ClusterWorker',
211 data
212 })
18482cec
JB
213 expect(
214 errorPool.workerNodes.some(
041dc05b 215 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
216 )
217 ).toBe(true)
325f50bc
S
218 })
219
220 it('Verify that error handling is working properly:async', async () => {
221 const data = { f: 10 }
c726f66c 222 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 223 let taskError
041dc05b 224 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
225 taskError = e
226 })
c726f66c
JB
227 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
228 PoolEvents.taskError
229 ])
325f50bc
S
230 let inError
231 try {
232 await asyncErrorPool.execute(data)
233 } catch (e) {
234 inError = e
235 }
236 expect(inError).toBeDefined()
8620fb25 237 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
238 expect(inError).toBe('Error Message from ClusterWorker:async')
239 expect(taskError).toStrictEqual({
6cd5248f 240 name: DEFAULT_TASK_NAME,
985d0e79
JB
241 message: 'Error Message from ClusterWorker:async',
242 data
243 })
18482cec
JB
244 expect(
245 asyncErrorPool.workerNodes.some(
041dc05b 246 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
247 )
248 ).toBe(true)
325f50bc
S
249 })
250
251 it('Verify that async function is working properly', async () => {
252 const data = { f: 10 }
15e5141f 253 const startTime = performance.now()
325f50bc 254 const result = await asyncPool.execute(data)
15e5141f 255 const usedTime = performance.now() - startTime
e1ffb94f 256 expect(result).toStrictEqual(data)
325f50bc
S
257 expect(usedTime).toBeGreaterThanOrEqual(2000)
258 })
259
260 it('Shutdown test', async () => {
bac873bd 261 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
c726f66c 262 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
263 let poolDestroy = 0
264 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
265 expect(pool.emitter.eventNames()).toStrictEqual([
266 PoolEvents.busy,
267 PoolEvents.destroy
268 ])
45dbbb14 269 await pool.destroy()
bdacc2d2 270 const numberOfExitEvents = await exitPromise
bb9423b7 271 expect(pool.started).toBe(false)
76efd044 272 expect(pool.emitter.eventNames()).toStrictEqual([])
55082af9 273 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 274 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 275 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 276 expect(poolDestroy).toBe(1)
325f50bc
S
277 })
278
1a76932b
JB
279 it('Verify that cluster pool options are checked', async () => {
280 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
0fe39c97
JB
281 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
282 expect(pool.opts.env).toBeUndefined()
283 expect(pool.opts.settings).toBeUndefined()
284 await pool.destroy()
285 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
1a76932b
JB
286 env: { TEST: 'test' },
287 settings: { args: ['--use', 'http'], silent: true }
288 })
0fe39c97
JB
289 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
290 expect(pool.opts.settings).toStrictEqual({
1a76932b
JB
291 args: ['--use', 'http'],
292 silent: true
293 })
0fe39c97 294 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
1a76932b
JB
295 args: ['--use', 'http'],
296 silent: true,
297 exec: workerFilePath
298 })
0fe39c97 299 await pool.destroy()
1a76932b
JB
300 })
301
325f50bc 302 it('Should work even without opts in input', async () => {
7d27ec0d
JB
303 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
304 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
0fe39c97 305 const res = await pool.execute()
30b963d4 306 expect(res).toStrictEqual({ ok: 1 })
8bc77620 307 // We need to clean up the resources after our test
0fe39c97 308 await pool.destroy()
325f50bc 309 })
8d3782fa 310
7d27ec0d
JB
311 it('Verify destroyWorkerNode()', async () => {
312 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
313 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
52a23942 314 const workerNodeKey = 0
7d27ec0d 315 let disconnectEvent = 0
52a23942 316 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
7d27ec0d
JB
317 ++disconnectEvent
318 })
319 let exitEvent = 0
52a23942 320 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
321 ++exitEvent
322 })
52a23942 323 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
324 expect(disconnectEvent).toBe(1)
325 expect(exitEvent).toBe(1)
326 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
327 await pool.destroy()
328 })
329
1cc6e9ef 330 it('Verify that a pool with zero worker fails', () => {
8d3782fa
JB
331 expect(
332 () =>
333 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
948faff7 334 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 335 })
325f50bc 336})