fix: ensure worker node destroy semantic is always the same
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f
JB
7 const numberOfWorkers = 6
8 const pool = new FixedClusterPool(
9 numberOfWorkers,
10 './tests/worker-files/cluster/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedClusterPool(
16 numberOfWorkers,
17 './tests/worker-files/cluster/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedClusterPool(
27 numberOfWorkers,
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedClusterPool(
32 numberOfWorkers,
33 './tests/worker-files/cluster/echoWorker.js'
34 )
35 const errorPool = new FixedClusterPool(
36 numberOfWorkers,
37 './tests/worker-files/cluster/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedClusterPool(
43 numberOfWorkers,
44 './tests/worker-files/cluster/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedClusterPool(
50 numberOfWorkers,
51 './tests/worker-files/cluster/asyncWorker.js'
52 )
53
8bc77620
APA
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
59 await asyncErrorPool.destroy()
60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
8bc77620
APA
62 })
63
325f50bc 64 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
024daf59 68 expect(result).toBe(75025)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
325f50bc
S
73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 76 const result = await pool.execute()
30b963d4 77 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
78 })
79
1dcbfefc 80 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
81 const pool1 = new FixedClusterPool(
82 numberOfWorkers,
83 './tests/worker-files/cluster/testWorker.js',
84 {
85 errorHandler: e => console.error(e)
86 }
87 )
88 let poolReady = 0
66293e7d 89 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 90 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
91 expect(poolReady).toBe(1)
92 })
93
1dcbfefc 94 it("Verify that 'busy' event is emitted", () => {
7c0ba920 95 let poolBusy = 0
aee46736 96 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 97 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 98 pool.execute()
7c0ba920 99 }
14916bf9
JB
100 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
101 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
102 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
103 })
104
594bfb84 105 it('Verify that tasks queuing is working', async () => {
d3d4b67d 106 const promises = new Set()
ee9f5295 107 const maxMultiplier = 2
594bfb84 108 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 109 promises.add(queuePool.execute())
594bfb84 110 }
d3d4b67d 111 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 112 for (const workerNode of queuePool.workerNodes) {
f59e1027 113 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
114 queuePool.opts.tasksQueueOptions.concurrency
115 )
f59e1027
JB
116 expect(workerNode.usage.tasks.executed).toBe(0)
117 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
118 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 119 }
a4e07f72 120 expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
6b27d407
JB
121 expect(queuePool.info.queuedTasks).toBe(
122 numberOfWorkers * maxMultiplier - numberOfWorkers
123 )
124 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
125 numberOfWorkers * maxMultiplier - numberOfWorkers
126 )
594bfb84
JB
127 await Promise.all(promises)
128 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
129 expect(workerNode.usage.tasks.executing).toBe(0)
130 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
131 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
132 expect(workerNode.usage.tasks.queued).toBe(0)
133 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
134 }
135 })
136
325f50bc
S
137 it('Verify that is possible to have a worker that return undefined', async () => {
138 const result = await emptyPool.execute()
6db75ad9 139 expect(result).toBeUndefined()
325f50bc
S
140 })
141
142 it('Verify that data are sent to the worker correctly', async () => {
143 const data = { f: 10 }
144 const result = await echoPool.execute(data)
e1ffb94f 145 expect(result).toStrictEqual(data)
325f50bc
S
146 })
147
148 it('Verify that error handling is working properly:sync', async () => {
149 const data = { f: 10 }
d46660cd
JB
150 let taskError
151 errorPool.emitter.on(PoolEvents.taskError, e => {
152 taskError = e
153 })
325f50bc
S
154 let inError
155 try {
156 await errorPool.execute(data)
157 } catch (e) {
158 inError = e
159 }
160 expect(inError).toBeDefined()
8620fb25 161 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
162 expect(inError).toBe('Error Message from ClusterWorker')
163 expect(taskError).toStrictEqual({
ff128cc9 164 name: 'default',
985d0e79
JB
165 message: 'Error Message from ClusterWorker',
166 data
167 })
18482cec
JB
168 expect(
169 errorPool.workerNodes.some(
f59e1027 170 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
171 )
172 ).toBe(true)
325f50bc
S
173 })
174
175 it('Verify that error handling is working properly:async', async () => {
176 const data = { f: 10 }
4f0b85b3
JB
177 let taskError
178 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
179 taskError = e
180 })
325f50bc
S
181 let inError
182 try {
183 await asyncErrorPool.execute(data)
184 } catch (e) {
185 inError = e
186 }
187 expect(inError).toBeDefined()
8620fb25 188 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
189 expect(inError).toBe('Error Message from ClusterWorker:async')
190 expect(taskError).toStrictEqual({
ff128cc9 191 name: 'default',
985d0e79
JB
192 message: 'Error Message from ClusterWorker:async',
193 data
194 })
18482cec
JB
195 expect(
196 asyncErrorPool.workerNodes.some(
f59e1027 197 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
198 )
199 ).toBe(true)
325f50bc
S
200 })
201
202 it('Verify that async function is working properly', async () => {
203 const data = { f: 10 }
15e5141f 204 const startTime = performance.now()
325f50bc 205 const result = await asyncPool.execute(data)
15e5141f 206 const usedTime = performance.now() - startTime
e1ffb94f 207 expect(result).toStrictEqual(data)
325f50bc
S
208 expect(usedTime).toBeGreaterThanOrEqual(2000)
209 })
210
211 it('Shutdown test', async () => {
bac873bd 212 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
45dbbb14 213 await pool.destroy()
bdacc2d2
JB
214 const numberOfExitEvents = await exitPromise
215 expect(numberOfExitEvents).toBe(numberOfWorkers)
325f50bc
S
216 })
217
1a76932b
JB
218 it('Verify that cluster pool options are checked', async () => {
219 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
220 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
221 expect(pool1.opts.env).toBeUndefined()
222 expect(pool1.opts.settings).toBeUndefined()
223 await pool1.destroy()
224 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
225 env: { TEST: 'test' },
226 settings: { args: ['--use', 'http'], silent: true }
227 })
228 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
229 expect(pool1.opts.settings).toStrictEqual({
230 args: ['--use', 'http'],
231 silent: true
232 })
233 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
234 args: ['--use', 'http'],
235 silent: true,
236 exec: workerFilePath
237 })
238 await pool1.destroy()
239 })
240
325f50bc
S
241 it('Should work even without opts in input', async () => {
242 const pool1 = new FixedClusterPool(
e1ffb94f 243 numberOfWorkers,
76b1e974 244 './tests/worker-files/cluster/testWorker.js'
325f50bc 245 )
6db75ad9 246 const res = await pool1.execute()
30b963d4 247 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
248 // We need to clean up the resources after our test
249 await pool1.destroy()
325f50bc 250 })
8d3782fa
JB
251
252 it('Verify that a pool with zero worker fails', async () => {
253 expect(
254 () =>
255 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 256 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 257 })
325f50bc 258})