6daec1400dbc9dc0fdb921d6b25b53dd9d751eab
[poolifier.git] / tests / pools / cluster / fixed.test.js
1 const { expect } = require('expect')
2 const { FixedClusterPool, PoolEvents } = require('../../../lib')
3 const { WorkerFunctions } = require('../../test-types')
4 const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
5
6 describe('Fixed cluster pool test suite', async () => {
7 const numberOfWorkers = 6
8 const pool = new FixedClusterPool(
9 numberOfWorkers,
10 './tests/worker-files/cluster/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
15 let poolReady = 0
16 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
17 await waitPoolEvents(pool, PoolEvents.ready, 1)
18 const queuePool = new FixedClusterPool(
19 numberOfWorkers,
20 './tests/worker-files/cluster/testWorker.js',
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
24 concurrency: 2
25 },
26 errorHandler: e => console.error(e)
27 }
28 )
29 const emptyPool = new FixedClusterPool(
30 numberOfWorkers,
31 './tests/worker-files/cluster/emptyWorker.js',
32 { exitHandler: () => console.log('empty pool worker exited') }
33 )
34 const echoPool = new FixedClusterPool(
35 numberOfWorkers,
36 './tests/worker-files/cluster/echoWorker.js'
37 )
38 const errorPool = new FixedClusterPool(
39 numberOfWorkers,
40 './tests/worker-files/cluster/errorWorker.js',
41 {
42 errorHandler: e => console.error(e)
43 }
44 )
45 const asyncErrorPool = new FixedClusterPool(
46 numberOfWorkers,
47 './tests/worker-files/cluster/asyncErrorWorker.js',
48 {
49 errorHandler: e => console.error(e)
50 }
51 )
52 const asyncPool = new FixedClusterPool(
53 numberOfWorkers,
54 './tests/worker-files/cluster/asyncWorker.js'
55 )
56
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
64 await queuePool.destroy()
65 })
66
67 it('Verify that the function is executed in a worker cluster', async () => {
68 let result = await pool.execute({
69 function: WorkerFunctions.fibonacci
70 })
71 expect(result).toBe(75025)
72 result = await pool.execute({
73 function: WorkerFunctions.factorial
74 })
75 expect(result).toBe(9.33262154439441e157)
76 })
77
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result = await pool.execute()
80 expect(result).toStrictEqual({ ok: 1 })
81 })
82
83 it("Verify that 'ready' event is emitted", async () => {
84 expect(poolReady).toBe(1)
85 })
86
87 it("Verify that 'busy' event is emitted", async () => {
88 let poolBusy = 0
89 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
90 for (let i = 0; i < numberOfWorkers * 2; i++) {
91 pool.execute()
92 }
93 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
94 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
95 expect(poolBusy).toBe(numberOfWorkers + 1)
96 })
97
98 it('Verify that tasks queuing is working', async () => {
99 const promises = new Set()
100 const maxMultiplier = 2
101 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
102 promises.add(queuePool.execute())
103 }
104 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
105 for (const workerNode of queuePool.workerNodes) {
106 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
107 queuePool.opts.tasksQueueOptions.concurrency
108 )
109 expect(workerNode.usage.tasks.executed).toBe(0)
110 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
111 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
112 }
113 expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
114 expect(queuePool.info.queuedTasks).toBe(
115 numberOfWorkers * maxMultiplier - numberOfWorkers
116 )
117 expect(queuePool.info.maxQueuedTasks).toBe(
118 numberOfWorkers * maxMultiplier - numberOfWorkers
119 )
120 await Promise.all(promises)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBe(0)
123 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
124 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
125 expect(workerNode.usage.tasks.queued).toBe(0)
126 expect(workerNode.usage.tasks.maxQueued).toBe(1)
127 }
128 })
129
130 it('Verify that is possible to have a worker that return undefined', async () => {
131 const result = await emptyPool.execute()
132 expect(result).toBeUndefined()
133 })
134
135 it('Verify that data are sent to the worker correctly', async () => {
136 const data = { f: 10 }
137 const result = await echoPool.execute(data)
138 expect(result).toStrictEqual(data)
139 })
140
141 it('Verify that error handling is working properly:sync', async () => {
142 const data = { f: 10 }
143 let taskError
144 errorPool.emitter.on(PoolEvents.taskError, e => {
145 taskError = e
146 })
147 let inError
148 try {
149 await errorPool.execute(data)
150 } catch (e) {
151 inError = e
152 }
153 expect(inError).toBeDefined()
154 expect(typeof inError === 'string').toBe(true)
155 expect(inError).toBe('Error Message from ClusterWorker')
156 expect(taskError).toStrictEqual({
157 message: 'Error Message from ClusterWorker',
158 data
159 })
160 expect(
161 errorPool.workerNodes.some(
162 workerNode => workerNode.usage.tasks.failed === 1
163 )
164 ).toBe(true)
165 })
166
167 it('Verify that error handling is working properly:async', async () => {
168 const data = { f: 10 }
169 let taskError
170 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
171 taskError = e
172 })
173 let inError
174 try {
175 await asyncErrorPool.execute(data)
176 } catch (e) {
177 inError = e
178 }
179 expect(inError).toBeDefined()
180 expect(typeof inError === 'string').toBe(true)
181 expect(inError).toBe('Error Message from ClusterWorker:async')
182 expect(taskError).toStrictEqual({
183 message: 'Error Message from ClusterWorker:async',
184 data
185 })
186 expect(
187 asyncErrorPool.workerNodes.some(
188 workerNode => workerNode.usage.tasks.failed === 1
189 )
190 ).toBe(true)
191 })
192
193 it('Verify that async function is working properly', async () => {
194 const data = { f: 10 }
195 const startTime = performance.now()
196 const result = await asyncPool.execute(data)
197 const usedTime = performance.now() - startTime
198 expect(result).toStrictEqual(data)
199 expect(usedTime).toBeGreaterThanOrEqual(2000)
200 })
201
202 it('Shutdown test', async () => {
203 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
204 await pool.destroy()
205 const numberOfExitEvents = await exitPromise
206 expect(numberOfExitEvents).toBe(numberOfWorkers)
207 })
208
209 it('Verify that cluster pool options are checked', async () => {
210 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
211 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
212 expect(pool1.opts.env).toBeUndefined()
213 expect(pool1.opts.settings).toBeUndefined()
214 await pool1.destroy()
215 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
216 env: { TEST: 'test' },
217 settings: { args: ['--use', 'http'], silent: true }
218 })
219 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
220 expect(pool1.opts.settings).toStrictEqual({
221 args: ['--use', 'http'],
222 silent: true
223 })
224 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
225 args: ['--use', 'http'],
226 silent: true,
227 exec: workerFilePath
228 })
229 await pool1.destroy()
230 })
231
232 it('Should work even without opts in input', async () => {
233 const pool1 = new FixedClusterPool(
234 numberOfWorkers,
235 './tests/worker-files/cluster/testWorker.js'
236 )
237 const res = await pool1.execute()
238 expect(res).toStrictEqual({ ok: 1 })
239 // We need to clean up the resources after our test
240 await pool1.destroy()
241 })
242
243 it('Verify that a pool with zero worker fails', async () => {
244 expect(
245 () =>
246 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
247 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
248 })
249 })