feat: make IWRR strategy worker readiness aware
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
216541b6 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f
JB
7 const numberOfWorkers = 6
8 const pool = new FixedClusterPool(
9 numberOfWorkers,
10 './tests/worker-files/cluster/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedClusterPool(
16 numberOfWorkers,
17 './tests/worker-files/cluster/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedClusterPool(
27 numberOfWorkers,
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedClusterPool(
32 numberOfWorkers,
33 './tests/worker-files/cluster/echoWorker.js'
34 )
35 const errorPool = new FixedClusterPool(
36 numberOfWorkers,
37 './tests/worker-files/cluster/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedClusterPool(
43 numberOfWorkers,
44 './tests/worker-files/cluster/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedClusterPool(
50 numberOfWorkers,
51 './tests/worker-files/cluster/asyncWorker.js'
52 )
53
8bc77620
APA
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
59 await asyncErrorPool.destroy()
60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
8bc77620
APA
62 })
63
325f50bc 64 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
024daf59 68 expect(result).toBe(75025)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
325f50bc
S
73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 76 const result = await pool.execute()
30b963d4 77 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
78 })
79
216541b6 80 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
81 const pool1 = new FixedClusterPool(
82 numberOfWorkers,
83 './tests/worker-files/cluster/testWorker.js',
84 {
85 errorHandler: e => console.error(e)
86 }
87 )
88 let poolReady = 0
89 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
3f7d99f5
JB
90 if (!pool1.info.ready) {
91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
92 }
216541b6
JB
93 expect(poolReady).toBe(1)
94 })
95
aee46736 96 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 97 let poolBusy = 0
aee46736 98 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 99 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 100 pool.execute()
7c0ba920 101 }
14916bf9
JB
102 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
103 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
104 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
105 })
106
594bfb84 107 it('Verify that tasks queuing is working', async () => {
d3d4b67d 108 const promises = new Set()
ee9f5295 109 const maxMultiplier = 2
594bfb84 110 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 111 promises.add(queuePool.execute())
594bfb84 112 }
d3d4b67d 113 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 114 for (const workerNode of queuePool.workerNodes) {
f59e1027 115 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
116 queuePool.opts.tasksQueueOptions.concurrency
117 )
f59e1027
JB
118 expect(workerNode.usage.tasks.executed).toBe(0)
119 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
120 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 121 }
a4e07f72 122 expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
6b27d407
JB
123 expect(queuePool.info.queuedTasks).toBe(
124 numberOfWorkers * maxMultiplier - numberOfWorkers
125 )
126 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
127 numberOfWorkers * maxMultiplier - numberOfWorkers
128 )
594bfb84
JB
129 await Promise.all(promises)
130 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
131 expect(workerNode.usage.tasks.executing).toBe(0)
132 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
133 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
134 expect(workerNode.usage.tasks.queued).toBe(0)
135 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
136 }
137 })
138
325f50bc
S
139 it('Verify that is possible to have a worker that return undefined', async () => {
140 const result = await emptyPool.execute()
6db75ad9 141 expect(result).toBeUndefined()
325f50bc
S
142 })
143
144 it('Verify that data are sent to the worker correctly', async () => {
145 const data = { f: 10 }
146 const result = await echoPool.execute(data)
e1ffb94f 147 expect(result).toStrictEqual(data)
325f50bc
S
148 })
149
150 it('Verify that error handling is working properly:sync', async () => {
151 const data = { f: 10 }
d46660cd
JB
152 let taskError
153 errorPool.emitter.on(PoolEvents.taskError, e => {
154 taskError = e
155 })
325f50bc
S
156 let inError
157 try {
158 await errorPool.execute(data)
159 } catch (e) {
160 inError = e
161 }
162 expect(inError).toBeDefined()
8620fb25 163 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
164 expect(inError).toBe('Error Message from ClusterWorker')
165 expect(taskError).toStrictEqual({
ff128cc9 166 name: 'default',
985d0e79
JB
167 message: 'Error Message from ClusterWorker',
168 data
169 })
18482cec
JB
170 expect(
171 errorPool.workerNodes.some(
f59e1027 172 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
173 )
174 ).toBe(true)
325f50bc
S
175 })
176
177 it('Verify that error handling is working properly:async', async () => {
178 const data = { f: 10 }
4f0b85b3
JB
179 let taskError
180 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
181 taskError = e
182 })
325f50bc
S
183 let inError
184 try {
185 await asyncErrorPool.execute(data)
186 } catch (e) {
187 inError = e
188 }
189 expect(inError).toBeDefined()
8620fb25 190 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
191 expect(inError).toBe('Error Message from ClusterWorker:async')
192 expect(taskError).toStrictEqual({
ff128cc9 193 name: 'default',
985d0e79
JB
194 message: 'Error Message from ClusterWorker:async',
195 data
196 })
18482cec
JB
197 expect(
198 asyncErrorPool.workerNodes.some(
f59e1027 199 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
200 )
201 ).toBe(true)
325f50bc
S
202 })
203
204 it('Verify that async function is working properly', async () => {
205 const data = { f: 10 }
15e5141f 206 const startTime = performance.now()
325f50bc 207 const result = await asyncPool.execute(data)
15e5141f 208 const usedTime = performance.now() - startTime
e1ffb94f 209 expect(result).toStrictEqual(data)
325f50bc
S
210 expect(usedTime).toBeGreaterThanOrEqual(2000)
211 })
212
213 it('Shutdown test', async () => {
bac873bd 214 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
45dbbb14 215 await pool.destroy()
bdacc2d2
JB
216 const numberOfExitEvents = await exitPromise
217 expect(numberOfExitEvents).toBe(numberOfWorkers)
325f50bc
S
218 })
219
1a76932b
JB
220 it('Verify that cluster pool options are checked', async () => {
221 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
222 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
223 expect(pool1.opts.env).toBeUndefined()
224 expect(pool1.opts.settings).toBeUndefined()
225 await pool1.destroy()
226 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
227 env: { TEST: 'test' },
228 settings: { args: ['--use', 'http'], silent: true }
229 })
230 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
231 expect(pool1.opts.settings).toStrictEqual({
232 args: ['--use', 'http'],
233 silent: true
234 })
235 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
236 args: ['--use', 'http'],
237 silent: true,
238 exec: workerFilePath
239 })
240 await pool1.destroy()
241 })
242
325f50bc
S
243 it('Should work even without opts in input', async () => {
244 const pool1 = new FixedClusterPool(
e1ffb94f 245 numberOfWorkers,
76b1e974 246 './tests/worker-files/cluster/testWorker.js'
325f50bc 247 )
6db75ad9 248 const res = await pool1.execute()
30b963d4 249 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
250 // We need to clean up the resources after our test
251 await pool1.destroy()
325f50bc 252 })
8d3782fa
JB
253
254 it('Verify that a pool with zero worker fails', async () => {
255 expect(
256 () =>
257 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 258 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 259 })
325f50bc 260})