fix: fix pool readiness status at startup
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
cc3ab78b 4const { waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f
JB
7 const numberOfWorkers = 6
8 const pool = new FixedClusterPool(
9 numberOfWorkers,
10 './tests/worker-files/cluster/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedClusterPool(
16 numberOfWorkers,
17 './tests/worker-files/cluster/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedClusterPool(
27 numberOfWorkers,
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedClusterPool(
32 numberOfWorkers,
33 './tests/worker-files/cluster/echoWorker.js'
34 )
35 const errorPool = new FixedClusterPool(
36 numberOfWorkers,
37 './tests/worker-files/cluster/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedClusterPool(
43 numberOfWorkers,
44 './tests/worker-files/cluster/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedClusterPool(
50 numberOfWorkers,
51 './tests/worker-files/cluster/asyncWorker.js'
52 )
53
8bc77620
APA
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
59 await asyncErrorPool.destroy()
60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
8bc77620
APA
62 })
63
325f50bc 64 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
024daf59 68 expect(result).toBe(75025)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
325f50bc
S
73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 76 const result = await pool.execute()
30b963d4 77 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
78 })
79
cc3ab78b 80 it.skip("Verify that 'ready' event is emitted", async () => {
079de991
JB
81 const pool1 = new FixedClusterPool(
82 numberOfWorkers,
83 './tests/worker-files/cluster/testWorker.js',
84 {
85 errorHandler: e => console.error(e)
86 }
87 )
cc3ab78b 88 let poolInfo
079de991 89 let poolReady = 0
cc3ab78b
JB
90 pool1.emitter.on(PoolEvents.ready, info => {
91 ++poolReady
92 poolInfo = info
93 })
216541b6 94 expect(poolReady).toBe(1)
cc3ab78b 95 expect(poolInfo).toBeDefined()
216541b6
JB
96 })
97
aee46736 98 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 99 let poolBusy = 0
aee46736 100 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 101 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 102 pool.execute()
7c0ba920 103 }
14916bf9
JB
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
106 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
107 })
108
594bfb84 109 it('Verify that tasks queuing is working', async () => {
d3d4b67d 110 const promises = new Set()
ee9f5295 111 const maxMultiplier = 2
594bfb84 112 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 113 promises.add(queuePool.execute())
594bfb84 114 }
d3d4b67d 115 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 116 for (const workerNode of queuePool.workerNodes) {
f59e1027 117 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
118 queuePool.opts.tasksQueueOptions.concurrency
119 )
f59e1027
JB
120 expect(workerNode.usage.tasks.executed).toBe(0)
121 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
122 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 123 }
a4e07f72 124 expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
6b27d407
JB
125 expect(queuePool.info.queuedTasks).toBe(
126 numberOfWorkers * maxMultiplier - numberOfWorkers
127 )
128 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
129 numberOfWorkers * maxMultiplier - numberOfWorkers
130 )
594bfb84
JB
131 await Promise.all(promises)
132 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
133 expect(workerNode.usage.tasks.executing).toBe(0)
134 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
135 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
136 expect(workerNode.usage.tasks.queued).toBe(0)
137 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
138 }
139 })
140
325f50bc
S
141 it('Verify that is possible to have a worker that return undefined', async () => {
142 const result = await emptyPool.execute()
6db75ad9 143 expect(result).toBeUndefined()
325f50bc
S
144 })
145
146 it('Verify that data are sent to the worker correctly', async () => {
147 const data = { f: 10 }
148 const result = await echoPool.execute(data)
e1ffb94f 149 expect(result).toStrictEqual(data)
325f50bc
S
150 })
151
152 it('Verify that error handling is working properly:sync', async () => {
153 const data = { f: 10 }
d46660cd
JB
154 let taskError
155 errorPool.emitter.on(PoolEvents.taskError, e => {
156 taskError = e
157 })
325f50bc
S
158 let inError
159 try {
160 await errorPool.execute(data)
161 } catch (e) {
162 inError = e
163 }
164 expect(inError).toBeDefined()
8620fb25 165 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
166 expect(inError).toBe('Error Message from ClusterWorker')
167 expect(taskError).toStrictEqual({
ff128cc9 168 name: 'default',
985d0e79
JB
169 message: 'Error Message from ClusterWorker',
170 data
171 })
18482cec
JB
172 expect(
173 errorPool.workerNodes.some(
f59e1027 174 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
175 )
176 ).toBe(true)
325f50bc
S
177 })
178
179 it('Verify that error handling is working properly:async', async () => {
180 const data = { f: 10 }
4f0b85b3
JB
181 let taskError
182 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
183 taskError = e
184 })
325f50bc
S
185 let inError
186 try {
187 await asyncErrorPool.execute(data)
188 } catch (e) {
189 inError = e
190 }
191 expect(inError).toBeDefined()
8620fb25 192 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
193 expect(inError).toBe('Error Message from ClusterWorker:async')
194 expect(taskError).toStrictEqual({
ff128cc9 195 name: 'default',
985d0e79
JB
196 message: 'Error Message from ClusterWorker:async',
197 data
198 })
18482cec
JB
199 expect(
200 asyncErrorPool.workerNodes.some(
f59e1027 201 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
202 )
203 ).toBe(true)
325f50bc
S
204 })
205
206 it('Verify that async function is working properly', async () => {
207 const data = { f: 10 }
15e5141f 208 const startTime = performance.now()
325f50bc 209 const result = await asyncPool.execute(data)
15e5141f 210 const usedTime = performance.now() - startTime
e1ffb94f 211 expect(result).toStrictEqual(data)
325f50bc
S
212 expect(usedTime).toBeGreaterThanOrEqual(2000)
213 })
214
215 it('Shutdown test', async () => {
bac873bd 216 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
45dbbb14 217 await pool.destroy()
bdacc2d2
JB
218 const numberOfExitEvents = await exitPromise
219 expect(numberOfExitEvents).toBe(numberOfWorkers)
325f50bc
S
220 })
221
1a76932b
JB
222 it('Verify that cluster pool options are checked', async () => {
223 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
224 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
225 expect(pool1.opts.env).toBeUndefined()
226 expect(pool1.opts.settings).toBeUndefined()
227 await pool1.destroy()
228 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
229 env: { TEST: 'test' },
230 settings: { args: ['--use', 'http'], silent: true }
231 })
232 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
233 expect(pool1.opts.settings).toStrictEqual({
234 args: ['--use', 'http'],
235 silent: true
236 })
237 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
238 args: ['--use', 'http'],
239 silent: true,
240 exec: workerFilePath
241 })
242 await pool1.destroy()
243 })
244
325f50bc
S
245 it('Should work even without opts in input', async () => {
246 const pool1 = new FixedClusterPool(
e1ffb94f 247 numberOfWorkers,
76b1e974 248 './tests/worker-files/cluster/testWorker.js'
325f50bc 249 )
6db75ad9 250 const res = await pool1.execute()
30b963d4 251 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
252 // We need to clean up the resources after our test
253 await pool1.destroy()
325f50bc 254 })
8d3782fa
JB
255
256 it('Verify that a pool with zero worker fails', async () => {
257 expect(
258 () =>
259 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 260 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 261 })
325f50bc 262})