build(deps-dev): apply updates
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f 7 const numberOfWorkers = 6
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedClusterPool(
10 numberOfWorkers,
11 './tests/worker-files/cluster/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedClusterPool(
17 numberOfWorkers,
18 './tests/worker-files/cluster/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedClusterPool(
28 numberOfWorkers,
29 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedClusterPool(
33 numberOfWorkers,
34 './tests/worker-files/cluster/echoWorker.js'
35 )
36 const errorPool = new FixedClusterPool(
37 numberOfWorkers,
38 './tests/worker-files/cluster/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedClusterPool(
44 numberOfWorkers,
45 './tests/worker-files/cluster/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedClusterPool(
51 numberOfWorkers,
52 './tests/worker-files/cluster/asyncWorker.js'
53 )
54
8bc77620
APA
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
60 await asyncErrorPool.destroy()
61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
8bc77620
APA
63 })
64
325f50bc 65 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
325f50bc
S
74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedClusterPool(
83 numberOfWorkers,
84 './tests/worker-files/cluster/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
1dcbfefc 95 it("Verify that 'busy' event is emitted", () => {
7c0ba920 96 let poolBusy = 0
aee46736 97 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 98 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 99 pool.execute()
7c0ba920 100 }
14916bf9
JB
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
103 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
104 })
105
594bfb84 106 it('Verify that tasks queuing is working', async () => {
d3d4b67d 107 const promises = new Set()
4e377863 108 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 109 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 110 promises.add(queuePool.execute())
594bfb84 111 }
d3d4b67d 112 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 113 for (const workerNode of queuePool.workerNodes) {
f59e1027 114 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
115 queuePool.opts.tasksQueueOptions.concurrency
116 )
f59e1027 117 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
118 expect(workerNode.usage.tasks.queued).toBe(
119 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
120 )
121 expect(workerNode.usage.tasks.maxQueued).toBe(
122 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
123 )
594bfb84 124 }
4e377863
JB
125 expect(queuePool.info.executingTasks).toBe(
126 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
127 )
6b27d407 128 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
129 numberOfWorkers *
130 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
131 )
132 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
133 numberOfWorkers *
134 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 135 )
594bfb84
JB
136 await Promise.all(promises)
137 for (const workerNode of queuePool.workerNodes) {
f59e1027 138 expect(workerNode.usage.tasks.executing).toBe(0)
4e377863 139 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 140 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
141 expect(workerNode.usage.tasks.maxQueued).toBe(
142 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
143 )
594bfb84
JB
144 }
145 })
146
325f50bc
S
147 it('Verify that is possible to have a worker that return undefined', async () => {
148 const result = await emptyPool.execute()
6db75ad9 149 expect(result).toBeUndefined()
325f50bc
S
150 })
151
152 it('Verify that data are sent to the worker correctly', async () => {
153 const data = { f: 10 }
154 const result = await echoPool.execute(data)
e1ffb94f 155 expect(result).toStrictEqual(data)
325f50bc
S
156 })
157
158 it('Verify that error handling is working properly:sync', async () => {
159 const data = { f: 10 }
d46660cd 160 let taskError
8ebe6c30 161 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
162 taskError = e
163 })
325f50bc
S
164 let inError
165 try {
166 await errorPool.execute(data)
167 } catch (e) {
168 inError = e
169 }
170 expect(inError).toBeDefined()
8620fb25 171 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
172 expect(inError).toBe('Error Message from ClusterWorker')
173 expect(taskError).toStrictEqual({
ff128cc9 174 name: 'default',
985d0e79
JB
175 message: 'Error Message from ClusterWorker',
176 data
177 })
18482cec
JB
178 expect(
179 errorPool.workerNodes.some(
8ebe6c30 180 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
181 )
182 ).toBe(true)
325f50bc
S
183 })
184
185 it('Verify that error handling is working properly:async', async () => {
186 const data = { f: 10 }
4f0b85b3 187 let taskError
8ebe6c30 188 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
189 taskError = e
190 })
325f50bc
S
191 let inError
192 try {
193 await asyncErrorPool.execute(data)
194 } catch (e) {
195 inError = e
196 }
197 expect(inError).toBeDefined()
8620fb25 198 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
199 expect(inError).toBe('Error Message from ClusterWorker:async')
200 expect(taskError).toStrictEqual({
ff128cc9 201 name: 'default',
985d0e79
JB
202 message: 'Error Message from ClusterWorker:async',
203 data
204 })
18482cec
JB
205 expect(
206 asyncErrorPool.workerNodes.some(
8ebe6c30 207 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
208 )
209 ).toBe(true)
325f50bc
S
210 })
211
212 it('Verify that async function is working properly', async () => {
213 const data = { f: 10 }
15e5141f 214 const startTime = performance.now()
325f50bc 215 const result = await asyncPool.execute(data)
15e5141f 216 const usedTime = performance.now() - startTime
e1ffb94f 217 expect(result).toStrictEqual(data)
325f50bc
S
218 expect(usedTime).toBeGreaterThanOrEqual(2000)
219 })
220
221 it('Shutdown test', async () => {
bac873bd 222 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
45dbbb14 223 await pool.destroy()
bdacc2d2
JB
224 const numberOfExitEvents = await exitPromise
225 expect(numberOfExitEvents).toBe(numberOfWorkers)
325f50bc
S
226 })
227
1a76932b
JB
228 it('Verify that cluster pool options are checked', async () => {
229 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
230 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
231 expect(pool1.opts.env).toBeUndefined()
232 expect(pool1.opts.settings).toBeUndefined()
233 await pool1.destroy()
234 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
235 env: { TEST: 'test' },
236 settings: { args: ['--use', 'http'], silent: true }
237 })
238 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
239 expect(pool1.opts.settings).toStrictEqual({
240 args: ['--use', 'http'],
241 silent: true
242 })
243 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
244 args: ['--use', 'http'],
245 silent: true,
246 exec: workerFilePath
247 })
248 await pool1.destroy()
249 })
250
325f50bc
S
251 it('Should work even without opts in input', async () => {
252 const pool1 = new FixedClusterPool(
e1ffb94f 253 numberOfWorkers,
76b1e974 254 './tests/worker-files/cluster/testWorker.js'
325f50bc 255 )
6db75ad9 256 const res = await pool1.execute()
30b963d4 257 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
258 // We need to clean up the resources after our test
259 await pool1.destroy()
325f50bc 260 })
8d3782fa
JB
261
262 it('Verify that a pool with zero worker fails', async () => {
263 expect(
264 () =>
265 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 266 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 267 })
325f50bc 268})