fix: fix race condition at counting executing tasks on worker node
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f 7 const numberOfWorkers = 6
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedClusterPool(
10 numberOfWorkers,
11 './tests/worker-files/cluster/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedClusterPool(
17 numberOfWorkers,
18 './tests/worker-files/cluster/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedClusterPool(
28 numberOfWorkers,
29 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedClusterPool(
33 numberOfWorkers,
34 './tests/worker-files/cluster/echoWorker.js'
35 )
36 const errorPool = new FixedClusterPool(
37 numberOfWorkers,
38 './tests/worker-files/cluster/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedClusterPool(
44 numberOfWorkers,
45 './tests/worker-files/cluster/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedClusterPool(
51 numberOfWorkers,
52 './tests/worker-files/cluster/asyncWorker.js'
53 )
54
8bc77620
APA
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
60 await asyncErrorPool.destroy()
61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
8bc77620
APA
63 })
64
325f50bc 65 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
325f50bc
S
74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedClusterPool(
83 numberOfWorkers,
84 './tests/worker-files/cluster/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
1dcbfefc 95 it("Verify that 'busy' event is emitted", () => {
7c0ba920 96 let poolBusy = 0
aee46736 97 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 98 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 99 pool.execute()
7c0ba920 100 }
14916bf9
JB
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
103 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
104 })
105
594bfb84 106 it('Verify that tasks queuing is working', async () => {
d3d4b67d 107 const promises = new Set()
4e377863 108 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 109 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 110 promises.add(queuePool.execute())
594bfb84 111 }
d3d4b67d 112 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 113 for (const workerNode of queuePool.workerNodes) {
5bb5be17 114 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 115 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
116 queuePool.opts.tasksQueueOptions.concurrency
117 )
f59e1027 118 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
119 expect(workerNode.usage.tasks.queued).toBe(
120 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
121 )
122 expect(workerNode.usage.tasks.maxQueued).toBe(
123 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
124 )
594bfb84 125 }
4e377863
JB
126 expect(queuePool.info.executingTasks).toBe(
127 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
128 )
6b27d407 129 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
130 numberOfWorkers *
131 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
132 )
133 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
134 numberOfWorkers *
135 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 136 )
594bfb84
JB
137 await Promise.all(promises)
138 for (const workerNode of queuePool.workerNodes) {
f59e1027 139 expect(workerNode.usage.tasks.executing).toBe(0)
4e377863 140 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 141 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
142 expect(workerNode.usage.tasks.maxQueued).toBe(
143 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
144 )
594bfb84
JB
145 }
146 })
147
325f50bc
S
148 it('Verify that is possible to have a worker that return undefined', async () => {
149 const result = await emptyPool.execute()
6db75ad9 150 expect(result).toBeUndefined()
325f50bc
S
151 })
152
153 it('Verify that data are sent to the worker correctly', async () => {
154 const data = { f: 10 }
155 const result = await echoPool.execute(data)
e1ffb94f 156 expect(result).toStrictEqual(data)
325f50bc
S
157 })
158
159 it('Verify that error handling is working properly:sync', async () => {
160 const data = { f: 10 }
d46660cd 161 let taskError
8ebe6c30 162 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
163 taskError = e
164 })
325f50bc
S
165 let inError
166 try {
167 await errorPool.execute(data)
168 } catch (e) {
169 inError = e
170 }
171 expect(inError).toBeDefined()
8620fb25 172 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
173 expect(inError).toBe('Error Message from ClusterWorker')
174 expect(taskError).toStrictEqual({
ff128cc9 175 name: 'default',
985d0e79
JB
176 message: 'Error Message from ClusterWorker',
177 data
178 })
18482cec
JB
179 expect(
180 errorPool.workerNodes.some(
8ebe6c30 181 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
182 )
183 ).toBe(true)
325f50bc
S
184 })
185
186 it('Verify that error handling is working properly:async', async () => {
187 const data = { f: 10 }
4f0b85b3 188 let taskError
8ebe6c30 189 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
190 taskError = e
191 })
325f50bc
S
192 let inError
193 try {
194 await asyncErrorPool.execute(data)
195 } catch (e) {
196 inError = e
197 }
198 expect(inError).toBeDefined()
8620fb25 199 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
200 expect(inError).toBe('Error Message from ClusterWorker:async')
201 expect(taskError).toStrictEqual({
ff128cc9 202 name: 'default',
985d0e79
JB
203 message: 'Error Message from ClusterWorker:async',
204 data
205 })
18482cec
JB
206 expect(
207 asyncErrorPool.workerNodes.some(
8ebe6c30 208 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
209 )
210 ).toBe(true)
325f50bc
S
211 })
212
213 it('Verify that async function is working properly', async () => {
214 const data = { f: 10 }
15e5141f 215 const startTime = performance.now()
325f50bc 216 const result = await asyncPool.execute(data)
15e5141f 217 const usedTime = performance.now() - startTime
e1ffb94f 218 expect(result).toStrictEqual(data)
325f50bc
S
219 expect(usedTime).toBeGreaterThanOrEqual(2000)
220 })
221
222 it('Shutdown test', async () => {
bac873bd 223 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
ef3891a3
JB
224 let poolDestroy = 0
225 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
45dbbb14 226 await pool.destroy()
bdacc2d2
JB
227 const numberOfExitEvents = await exitPromise
228 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 229 expect(poolDestroy).toBe(1)
325f50bc
S
230 })
231
1a76932b
JB
232 it('Verify that cluster pool options are checked', async () => {
233 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
234 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
235 expect(pool1.opts.env).toBeUndefined()
236 expect(pool1.opts.settings).toBeUndefined()
237 await pool1.destroy()
238 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
239 env: { TEST: 'test' },
240 settings: { args: ['--use', 'http'], silent: true }
241 })
242 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
243 expect(pool1.opts.settings).toStrictEqual({
244 args: ['--use', 'http'],
245 silent: true
246 })
247 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
248 args: ['--use', 'http'],
249 silent: true,
250 exec: workerFilePath
251 })
252 await pool1.destroy()
253 })
254
325f50bc
S
255 it('Should work even without opts in input', async () => {
256 const pool1 = new FixedClusterPool(
e1ffb94f 257 numberOfWorkers,
76b1e974 258 './tests/worker-files/cluster/testWorker.js'
325f50bc 259 )
6db75ad9 260 const res = await pool1.execute()
30b963d4 261 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
262 // We need to clean up the resources after our test
263 await pool1.destroy()
325f50bc 264 })
8d3782fa
JB
265
266 it('Verify that a pool with zero worker fails', async () => {
267 expect(
268 () =>
269 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 270 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 271 })
325f50bc 272})