test: improve task error handling
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
85a3f8a7 4const TestUtils = require('../../test-utils')
325f50bc 5
a35560ba 6describe('Fixed cluster pool test suite', () => {
e1ffb94f
JB
7 const numberOfWorkers = 6
8 const pool = new FixedClusterPool(
9 numberOfWorkers,
10 './tests/worker-files/cluster/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedClusterPool(
16 numberOfWorkers,
17 './tests/worker-files/cluster/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedClusterPool(
27 numberOfWorkers,
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedClusterPool(
32 numberOfWorkers,
33 './tests/worker-files/cluster/echoWorker.js'
34 )
35 const errorPool = new FixedClusterPool(
36 numberOfWorkers,
37 './tests/worker-files/cluster/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedClusterPool(
43 numberOfWorkers,
44 './tests/worker-files/cluster/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedClusterPool(
50 numberOfWorkers,
51 './tests/worker-files/cluster/asyncWorker.js'
52 )
53
8bc77620
APA
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
59 await asyncErrorPool.destroy()
60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
8bc77620
APA
62 })
63
325f50bc 64 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
70a4f5ea 68 expect(result).toBe(121393)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
325f50bc
S
73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 76 const result = await pool.execute()
6db75ad9 77 expect(result).toBe(false)
325f50bc
S
78 })
79
aee46736 80 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 81 let poolBusy = 0
aee46736 82 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 83 for (let i = 0; i < numberOfWorkers * 2; i++) {
8cbb82eb 84 pool.execute()
7c0ba920 85 }
14916bf9
JB
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
88 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
89 })
90
594bfb84 91 it('Verify that tasks queuing is working', async () => {
d3d4b67d 92 const promises = new Set()
ee9f5295 93 const maxMultiplier = 2
594bfb84 94 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 95 promises.add(queuePool.execute())
594bfb84 96 }
d3d4b67d 97 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84
JB
98 for (const workerNode of queuePool.workerNodes) {
99 expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
100 queuePool.opts.tasksQueueOptions.concurrency
101 )
1ab50fe5 102 expect(workerNode.tasksUsage.ran).toBe(0)
4d8bf9e4 103 expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
594bfb84 104 }
6b27d407
JB
105 expect(queuePool.info.runningTasks).toBe(numberOfWorkers)
106 expect(queuePool.info.queuedTasks).toBe(
107 numberOfWorkers * maxMultiplier - numberOfWorkers
108 )
109 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
110 numberOfWorkers * maxMultiplier - numberOfWorkers
111 )
594bfb84
JB
112 await Promise.all(promises)
113 for (const workerNode of queuePool.workerNodes) {
114 expect(workerNode.tasksUsage.running).toBe(0)
1ab50fe5
JB
115 expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
116 expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
4d8bf9e4 117 expect(workerNode.tasksQueue.size).toBe(0)
594bfb84
JB
118 }
119 })
120
325f50bc
S
121 it('Verify that is possible to have a worker that return undefined', async () => {
122 const result = await emptyPool.execute()
6db75ad9 123 expect(result).toBeUndefined()
325f50bc
S
124 })
125
126 it('Verify that data are sent to the worker correctly', async () => {
127 const data = { f: 10 }
128 const result = await echoPool.execute(data)
e1ffb94f 129 expect(result).toStrictEqual(data)
325f50bc
S
130 })
131
132 it('Verify that error handling is working properly:sync', async () => {
133 const data = { f: 10 }
d46660cd
JB
134 let taskError
135 errorPool.emitter.on(PoolEvents.taskError, e => {
136 taskError = e
137 })
325f50bc
S
138 let inError
139 try {
140 await errorPool.execute(data)
141 } catch (e) {
142 inError = e
143 }
144 expect(inError).toBeDefined()
8620fb25 145 expect(typeof inError === 'string').toBe(true)
325f50bc 146 expect(inError).toBe('Error Message from ClusterWorker')
d46660cd
JB
147 expect(taskError).toStrictEqual({
148 error: 'Error Message from ClusterWorker',
149 errorData: data
150 })
18482cec
JB
151 expect(
152 errorPool.workerNodes.some(
153 workerNode => workerNode.tasksUsage.error === 1
154 )
155 ).toBe(true)
325f50bc
S
156 })
157
158 it('Verify that error handling is working properly:async', async () => {
159 const data = { f: 10 }
d46660cd
JB
160 // let taskError
161 // errorPool.emitter.on(PoolEvents.taskError, e => {
162 // taskError = e
163 // })
325f50bc
S
164 let inError
165 try {
166 await asyncErrorPool.execute(data)
167 } catch (e) {
168 inError = e
169 }
170 expect(inError).toBeDefined()
8620fb25 171 expect(typeof inError === 'string').toBe(true)
325f50bc 172 expect(inError).toBe('Error Message from ClusterWorker:async')
d46660cd
JB
173 // expect(taskError).toStrictEqual({
174 // error: 'Error Message from ClusterWorker:async',
175 // errorData: data
176 // })
18482cec
JB
177 expect(
178 asyncErrorPool.workerNodes.some(
179 workerNode => workerNode.tasksUsage.error === 1
180 )
181 ).toBe(true)
325f50bc
S
182 })
183
184 it('Verify that async function is working properly', async () => {
185 const data = { f: 10 }
15e5141f 186 const startTime = performance.now()
325f50bc 187 const result = await asyncPool.execute(data)
15e5141f 188 const usedTime = performance.now() - startTime
e1ffb94f 189 expect(result).toStrictEqual(data)
325f50bc
S
190 expect(usedTime).toBeGreaterThanOrEqual(2000)
191 })
192
193 it('Shutdown test', async () => {
85a3f8a7 194 const exitPromise = TestUtils.waitExits(pool, numberOfWorkers)
45dbbb14 195 await pool.destroy()
bdacc2d2
JB
196 const numberOfExitEvents = await exitPromise
197 expect(numberOfExitEvents).toBe(numberOfWorkers)
325f50bc
S
198 })
199
1a76932b
JB
200 it('Verify that cluster pool options are checked', async () => {
201 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
202 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
203 expect(pool1.opts.env).toBeUndefined()
204 expect(pool1.opts.settings).toBeUndefined()
205 await pool1.destroy()
206 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
207 env: { TEST: 'test' },
208 settings: { args: ['--use', 'http'], silent: true }
209 })
210 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
211 expect(pool1.opts.settings).toStrictEqual({
212 args: ['--use', 'http'],
213 silent: true
214 })
215 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
216 args: ['--use', 'http'],
217 silent: true,
218 exec: workerFilePath
219 })
220 await pool1.destroy()
221 })
222
325f50bc
S
223 it('Should work even without opts in input', async () => {
224 const pool1 = new FixedClusterPool(
e1ffb94f 225 numberOfWorkers,
76b1e974 226 './tests/worker-files/cluster/testWorker.js'
325f50bc 227 )
6db75ad9
JB
228 const res = await pool1.execute()
229 expect(res).toBe(false)
8bc77620
APA
230 // We need to clean up the resources after our test
231 await pool1.destroy()
325f50bc 232 })
8d3782fa
JB
233
234 it('Verify that a pool with zero worker fails', async () => {
235 expect(
236 () =>
237 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
d4aeae5a 238 ).toThrowError('Cannot instantiate a fixed pool with no worker')
8d3782fa 239 })
325f50bc 240})