chore: v2.6.10
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
216541b6 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
216541b6 6describe('Fixed cluster pool test suite', async () => {
e1ffb94f
JB
7 const numberOfWorkers = 6
8 const pool = new FixedClusterPool(
9 numberOfWorkers,
10 './tests/worker-files/cluster/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
216541b6
JB
15 let poolReady = 0
16 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
17 await waitPoolEvents(pool, PoolEvents.ready, 1)
594bfb84
JB
18 const queuePool = new FixedClusterPool(
19 numberOfWorkers,
20 './tests/worker-files/cluster/testWorker.js',
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
24 concurrency: 2
25 },
26 errorHandler: e => console.error(e)
27 }
28 )
e1ffb94f
JB
29 const emptyPool = new FixedClusterPool(
30 numberOfWorkers,
31 './tests/worker-files/cluster/emptyWorker.js',
32 { exitHandler: () => console.log('empty pool worker exited') }
33 )
34 const echoPool = new FixedClusterPool(
35 numberOfWorkers,
36 './tests/worker-files/cluster/echoWorker.js'
37 )
38 const errorPool = new FixedClusterPool(
39 numberOfWorkers,
40 './tests/worker-files/cluster/errorWorker.js',
41 {
42 errorHandler: e => console.error(e)
43 }
44 )
45 const asyncErrorPool = new FixedClusterPool(
46 numberOfWorkers,
47 './tests/worker-files/cluster/asyncErrorWorker.js',
48 {
49 errorHandler: e => console.error(e)
50 }
51 )
52 const asyncPool = new FixedClusterPool(
53 numberOfWorkers,
54 './tests/worker-files/cluster/asyncWorker.js'
55 )
56
8bc77620
APA
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
594bfb84 64 await queuePool.destroy()
8bc77620
APA
65 })
66
325f50bc 67 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9
JB
68 let result = await pool.execute({
69 function: WorkerFunctions.fibonacci
70 })
024daf59 71 expect(result).toBe(75025)
6db75ad9
JB
72 result = await pool.execute({
73 function: WorkerFunctions.factorial
74 })
70a4f5ea 75 expect(result).toBe(9.33262154439441e157)
325f50bc
S
76 })
77
318d4156 78 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 79 const result = await pool.execute()
30b963d4 80 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
81 })
82
216541b6
JB
83 it("Verify that 'ready' event is emitted", async () => {
84 expect(poolReady).toBe(1)
85 })
86
aee46736 87 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 88 let poolBusy = 0
aee46736 89 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 90 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 91 pool.execute()
7c0ba920 92 }
14916bf9
JB
93 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
94 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
95 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
96 })
97
594bfb84 98 it('Verify that tasks queuing is working', async () => {
d3d4b67d 99 const promises = new Set()
ee9f5295 100 const maxMultiplier = 2
594bfb84 101 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 102 promises.add(queuePool.execute())
594bfb84 103 }
d3d4b67d 104 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 105 for (const workerNode of queuePool.workerNodes) {
f59e1027 106 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
107 queuePool.opts.tasksQueueOptions.concurrency
108 )
f59e1027
JB
109 expect(workerNode.usage.tasks.executed).toBe(0)
110 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
111 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 112 }
a4e07f72 113 expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
6b27d407
JB
114 expect(queuePool.info.queuedTasks).toBe(
115 numberOfWorkers * maxMultiplier - numberOfWorkers
116 )
117 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
118 numberOfWorkers * maxMultiplier - numberOfWorkers
119 )
594bfb84
JB
120 await Promise.all(promises)
121 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
122 expect(workerNode.usage.tasks.executing).toBe(0)
123 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
124 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
125 expect(workerNode.usage.tasks.queued).toBe(0)
126 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
127 }
128 })
129
325f50bc
S
130 it('Verify that is possible to have a worker that return undefined', async () => {
131 const result = await emptyPool.execute()
6db75ad9 132 expect(result).toBeUndefined()
325f50bc
S
133 })
134
135 it('Verify that data are sent to the worker correctly', async () => {
136 const data = { f: 10 }
137 const result = await echoPool.execute(data)
e1ffb94f 138 expect(result).toStrictEqual(data)
325f50bc
S
139 })
140
141 it('Verify that error handling is working properly:sync', async () => {
142 const data = { f: 10 }
d46660cd
JB
143 let taskError
144 errorPool.emitter.on(PoolEvents.taskError, e => {
145 taskError = e
146 })
325f50bc
S
147 let inError
148 try {
149 await errorPool.execute(data)
150 } catch (e) {
151 inError = e
152 }
153 expect(inError).toBeDefined()
8620fb25 154 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
155 expect(inError).toBe('Error Message from ClusterWorker')
156 expect(taskError).toStrictEqual({
985d0e79
JB
157 message: 'Error Message from ClusterWorker',
158 data
159 })
18482cec
JB
160 expect(
161 errorPool.workerNodes.some(
f59e1027 162 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
163 )
164 ).toBe(true)
325f50bc
S
165 })
166
167 it('Verify that error handling is working properly:async', async () => {
168 const data = { f: 10 }
4f0b85b3
JB
169 let taskError
170 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
171 taskError = e
172 })
325f50bc
S
173 let inError
174 try {
175 await asyncErrorPool.execute(data)
176 } catch (e) {
177 inError = e
178 }
179 expect(inError).toBeDefined()
8620fb25 180 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
181 expect(inError).toBe('Error Message from ClusterWorker:async')
182 expect(taskError).toStrictEqual({
985d0e79
JB
183 message: 'Error Message from ClusterWorker:async',
184 data
185 })
18482cec
JB
186 expect(
187 asyncErrorPool.workerNodes.some(
f59e1027 188 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
189 )
190 ).toBe(true)
325f50bc
S
191 })
192
193 it('Verify that async function is working properly', async () => {
194 const data = { f: 10 }
15e5141f 195 const startTime = performance.now()
325f50bc 196 const result = await asyncPool.execute(data)
15e5141f 197 const usedTime = performance.now() - startTime
e1ffb94f 198 expect(result).toStrictEqual(data)
325f50bc
S
199 expect(usedTime).toBeGreaterThanOrEqual(2000)
200 })
201
202 it('Shutdown test', async () => {
bac873bd 203 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
45dbbb14 204 await pool.destroy()
bdacc2d2
JB
205 const numberOfExitEvents = await exitPromise
206 expect(numberOfExitEvents).toBe(numberOfWorkers)
325f50bc
S
207 })
208
1a76932b
JB
209 it('Verify that cluster pool options are checked', async () => {
210 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
211 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
212 expect(pool1.opts.env).toBeUndefined()
213 expect(pool1.opts.settings).toBeUndefined()
214 await pool1.destroy()
215 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
216 env: { TEST: 'test' },
217 settings: { args: ['--use', 'http'], silent: true }
218 })
219 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
220 expect(pool1.opts.settings).toStrictEqual({
221 args: ['--use', 'http'],
222 silent: true
223 })
224 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
225 args: ['--use', 'http'],
226 silent: true,
227 exec: workerFilePath
228 })
229 await pool1.destroy()
230 })
231
325f50bc
S
232 it('Should work even without opts in input', async () => {
233 const pool1 = new FixedClusterPool(
e1ffb94f 234 numberOfWorkers,
76b1e974 235 './tests/worker-files/cluster/testWorker.js'
325f50bc 236 )
6db75ad9 237 const res = await pool1.execute()
30b963d4 238 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
239 // We need to clean up the resources after our test
240 await pool1.destroy()
325f50bc 241 })
8d3782fa
JB
242
243 it('Verify that a pool with zero worker fails', async () => {
244 expect(
245 () =>
246 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 247 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 248 })
325f50bc 249})