fix: fix queued tasks rescheduling
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f 7 const numberOfWorkers = 6
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedClusterPool(
10 numberOfWorkers,
11 './tests/worker-files/cluster/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedClusterPool(
17 numberOfWorkers,
18 './tests/worker-files/cluster/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedClusterPool(
28 numberOfWorkers,
29 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedClusterPool(
33 numberOfWorkers,
34 './tests/worker-files/cluster/echoWorker.js'
35 )
36 const errorPool = new FixedClusterPool(
37 numberOfWorkers,
38 './tests/worker-files/cluster/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedClusterPool(
44 numberOfWorkers,
45 './tests/worker-files/cluster/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedClusterPool(
51 numberOfWorkers,
52 './tests/worker-files/cluster/asyncWorker.js'
53 )
54
8bc77620
APA
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
60 await asyncErrorPool.destroy()
61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
8bc77620
APA
63 })
64
325f50bc 65 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
325f50bc
S
74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedClusterPool(
83 numberOfWorkers,
84 './tests/worker-files/cluster/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
94407def
JB
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises = new Set()
7c0ba920 97 let poolBusy = 0
aee46736 98 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 99 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 100 promises.add(pool.execute())
7c0ba920 101 }
94407def 102 await Promise.all(promises)
14916bf9
JB
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
105 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
106 })
107
594bfb84 108 it('Verify that tasks queuing is working', async () => {
d3d4b67d 109 const promises = new Set()
4e377863 110 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 111 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 112 promises.add(queuePool.execute())
594bfb84 113 }
d3d4b67d 114 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 115 for (const workerNode of queuePool.workerNodes) {
5bb5be17 116 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 117 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
118 queuePool.opts.tasksQueueOptions.concurrency
119 )
f59e1027 120 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
121 expect(workerNode.usage.tasks.queued).toBe(
122 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
123 )
124 expect(workerNode.usage.tasks.maxQueued).toBe(
125 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
126 )
594bfb84 127 }
4e377863
JB
128 expect(queuePool.info.executingTasks).toBe(
129 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
130 )
6b27d407 131 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
132 numberOfWorkers *
133 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
134 )
135 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
136 numberOfWorkers *
137 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 138 )
a1763c54 139 expect(queuePool.info.backPressure).toBe(false)
594bfb84
JB
140 await Promise.all(promises)
141 for (const workerNode of queuePool.workerNodes) {
f59e1027 142 expect(workerNode.usage.tasks.executing).toBe(0)
4e377863 143 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 144 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
145 expect(workerNode.usage.tasks.maxQueued).toBe(
146 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
147 )
594bfb84
JB
148 }
149 })
150
325f50bc
S
151 it('Verify that is possible to have a worker that return undefined', async () => {
152 const result = await emptyPool.execute()
6db75ad9 153 expect(result).toBeUndefined()
325f50bc
S
154 })
155
156 it('Verify that data are sent to the worker correctly', async () => {
157 const data = { f: 10 }
158 const result = await echoPool.execute(data)
e1ffb94f 159 expect(result).toStrictEqual(data)
325f50bc
S
160 })
161
162 it('Verify that error handling is working properly:sync', async () => {
163 const data = { f: 10 }
d46660cd 164 let taskError
8ebe6c30 165 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
166 taskError = e
167 })
325f50bc
S
168 let inError
169 try {
170 await errorPool.execute(data)
171 } catch (e) {
172 inError = e
173 }
174 expect(inError).toBeDefined()
8620fb25 175 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
176 expect(inError).toBe('Error Message from ClusterWorker')
177 expect(taskError).toStrictEqual({
ff128cc9 178 name: 'default',
985d0e79
JB
179 message: 'Error Message from ClusterWorker',
180 data
181 })
18482cec
JB
182 expect(
183 errorPool.workerNodes.some(
8ebe6c30 184 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
185 )
186 ).toBe(true)
325f50bc
S
187 })
188
189 it('Verify that error handling is working properly:async', async () => {
190 const data = { f: 10 }
4f0b85b3 191 let taskError
8ebe6c30 192 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
193 taskError = e
194 })
325f50bc
S
195 let inError
196 try {
197 await asyncErrorPool.execute(data)
198 } catch (e) {
199 inError = e
200 }
201 expect(inError).toBeDefined()
8620fb25 202 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
203 expect(inError).toBe('Error Message from ClusterWorker:async')
204 expect(taskError).toStrictEqual({
ff128cc9 205 name: 'default',
985d0e79
JB
206 message: 'Error Message from ClusterWorker:async',
207 data
208 })
18482cec
JB
209 expect(
210 asyncErrorPool.workerNodes.some(
8ebe6c30 211 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
212 )
213 ).toBe(true)
325f50bc
S
214 })
215
216 it('Verify that async function is working properly', async () => {
217 const data = { f: 10 }
15e5141f 218 const startTime = performance.now()
325f50bc 219 const result = await asyncPool.execute(data)
15e5141f 220 const usedTime = performance.now() - startTime
e1ffb94f 221 expect(result).toStrictEqual(data)
325f50bc
S
222 expect(usedTime).toBeGreaterThanOrEqual(2000)
223 })
224
225 it('Shutdown test', async () => {
bac873bd 226 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
ef3891a3
JB
227 let poolDestroy = 0
228 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
45dbbb14 229 await pool.destroy()
bdacc2d2
JB
230 const numberOfExitEvents = await exitPromise
231 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 232 expect(poolDestroy).toBe(1)
325f50bc
S
233 })
234
1a76932b
JB
235 it('Verify that cluster pool options are checked', async () => {
236 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
237 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
238 expect(pool1.opts.env).toBeUndefined()
239 expect(pool1.opts.settings).toBeUndefined()
240 await pool1.destroy()
241 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
242 env: { TEST: 'test' },
243 settings: { args: ['--use', 'http'], silent: true }
244 })
245 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
246 expect(pool1.opts.settings).toStrictEqual({
247 args: ['--use', 'http'],
248 silent: true
249 })
250 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
251 args: ['--use', 'http'],
252 silent: true,
253 exec: workerFilePath
254 })
255 await pool1.destroy()
256 })
257
325f50bc
S
258 it('Should work even without opts in input', async () => {
259 const pool1 = new FixedClusterPool(
e1ffb94f 260 numberOfWorkers,
76b1e974 261 './tests/worker-files/cluster/testWorker.js'
325f50bc 262 )
6db75ad9 263 const res = await pool1.execute()
30b963d4 264 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
265 // We need to clean up the resources after our test
266 await pool1.destroy()
325f50bc 267 })
8d3782fa
JB
268
269 it('Verify that a pool with zero worker fails', async () => {
270 expect(
271 () =>
272 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 273 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 274 })
325f50bc 275})