fix: fix task stealing
[poolifier.git] / tests / pools / cluster / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedClusterPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
325f50bc 5
079de991 6describe('Fixed cluster pool test suite', () => {
e1ffb94f 7 const numberOfWorkers = 6
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedClusterPool(
10 numberOfWorkers,
11 './tests/worker-files/cluster/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedClusterPool(
17 numberOfWorkers,
18 './tests/worker-files/cluster/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedClusterPool(
28 numberOfWorkers,
29 './tests/worker-files/cluster/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedClusterPool(
33 numberOfWorkers,
34 './tests/worker-files/cluster/echoWorker.js'
35 )
36 const errorPool = new FixedClusterPool(
37 numberOfWorkers,
38 './tests/worker-files/cluster/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedClusterPool(
44 numberOfWorkers,
45 './tests/worker-files/cluster/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedClusterPool(
51 numberOfWorkers,
52 './tests/worker-files/cluster/asyncWorker.js'
53 )
54
8bc77620
APA
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
60 await asyncErrorPool.destroy()
61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
8bc77620
APA
63 })
64
325f50bc 65 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
325f50bc
S
74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
325f50bc 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
325f50bc
S
79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedClusterPool(
83 numberOfWorkers,
84 './tests/worker-files/cluster/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
94407def
JB
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises = new Set()
7c0ba920 97 let poolBusy = 0
aee46736 98 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 99 for (let i = 0; i < numberOfWorkers * 2; i++) {
94407def 100 promises.add(pool.execute())
7c0ba920 101 }
94407def 102 await Promise.all(promises)
14916bf9
JB
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
105 expect(poolBusy).toBe(numberOfWorkers + 1)
7c0ba920
JB
106 })
107
594bfb84 108 it('Verify that tasks queuing is working', async () => {
d3d4b67d 109 const promises = new Set()
4e377863 110 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 111 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
d3d4b67d 112 promises.add(queuePool.execute())
594bfb84 113 }
d3d4b67d 114 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
594bfb84 115 for (const workerNode of queuePool.workerNodes) {
5bb5be17 116 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 117 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
118 queuePool.opts.tasksQueueOptions.concurrency
119 )
f59e1027 120 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
121 expect(workerNode.usage.tasks.queued).toBe(
122 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
123 )
124 expect(workerNode.usage.tasks.maxQueued).toBe(
125 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
126 )
594bfb84 127 }
4e377863
JB
128 expect(queuePool.info.executingTasks).toBe(
129 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
130 )
6b27d407 131 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
132 numberOfWorkers *
133 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
134 )
135 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
136 numberOfWorkers *
137 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 138 )
a1763c54 139 expect(queuePool.info.backPressure).toBe(false)
594bfb84
JB
140 await Promise.all(promises)
141 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
142 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
143 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
144 numberOfWorkers * maxMultiplier
145 )
4e377863 146 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 147 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
148 expect(workerNode.usage.tasks.maxQueued).toBe(
149 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
150 )
594bfb84
JB
151 }
152 })
153
325f50bc
S
154 it('Verify that is possible to have a worker that return undefined', async () => {
155 const result = await emptyPool.execute()
6db75ad9 156 expect(result).toBeUndefined()
325f50bc
S
157 })
158
159 it('Verify that data are sent to the worker correctly', async () => {
160 const data = { f: 10 }
161 const result = await echoPool.execute(data)
e1ffb94f 162 expect(result).toStrictEqual(data)
325f50bc
S
163 })
164
165 it('Verify that error handling is working properly:sync', async () => {
166 const data = { f: 10 }
d46660cd 167 let taskError
8ebe6c30 168 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
169 taskError = e
170 })
325f50bc
S
171 let inError
172 try {
173 await errorPool.execute(data)
174 } catch (e) {
175 inError = e
176 }
177 expect(inError).toBeDefined()
8620fb25 178 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
179 expect(inError).toBe('Error Message from ClusterWorker')
180 expect(taskError).toStrictEqual({
ff128cc9 181 name: 'default',
985d0e79
JB
182 message: 'Error Message from ClusterWorker',
183 data
184 })
18482cec
JB
185 expect(
186 errorPool.workerNodes.some(
8ebe6c30 187 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
188 )
189 ).toBe(true)
325f50bc
S
190 })
191
192 it('Verify that error handling is working properly:async', async () => {
193 const data = { f: 10 }
4f0b85b3 194 let taskError
8ebe6c30 195 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
196 taskError = e
197 })
325f50bc
S
198 let inError
199 try {
200 await asyncErrorPool.execute(data)
201 } catch (e) {
202 inError = e
203 }
204 expect(inError).toBeDefined()
8620fb25 205 expect(typeof inError === 'string').toBe(true)
985d0e79
JB
206 expect(inError).toBe('Error Message from ClusterWorker:async')
207 expect(taskError).toStrictEqual({
ff128cc9 208 name: 'default',
985d0e79
JB
209 message: 'Error Message from ClusterWorker:async',
210 data
211 })
18482cec
JB
212 expect(
213 asyncErrorPool.workerNodes.some(
8ebe6c30 214 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
215 )
216 ).toBe(true)
325f50bc
S
217 })
218
219 it('Verify that async function is working properly', async () => {
220 const data = { f: 10 }
15e5141f 221 const startTime = performance.now()
325f50bc 222 const result = await asyncPool.execute(data)
15e5141f 223 const usedTime = performance.now() - startTime
e1ffb94f 224 expect(result).toStrictEqual(data)
325f50bc
S
225 expect(usedTime).toBeGreaterThanOrEqual(2000)
226 })
227
228 it('Shutdown test', async () => {
bac873bd 229 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
ef3891a3
JB
230 let poolDestroy = 0
231 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
45dbbb14 232 await pool.destroy()
bdacc2d2
JB
233 const numberOfExitEvents = await exitPromise
234 expect(numberOfExitEvents).toBe(numberOfWorkers)
ef3891a3 235 expect(poolDestroy).toBe(1)
325f50bc
S
236 })
237
1a76932b
JB
238 it('Verify that cluster pool options are checked', async () => {
239 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
240 let pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath)
241 expect(pool1.opts.env).toBeUndefined()
242 expect(pool1.opts.settings).toBeUndefined()
243 await pool1.destroy()
244 pool1 = new FixedClusterPool(numberOfWorkers, workerFilePath, {
245 env: { TEST: 'test' },
246 settings: { args: ['--use', 'http'], silent: true }
247 })
248 expect(pool1.opts.env).toStrictEqual({ TEST: 'test' })
249 expect(pool1.opts.settings).toStrictEqual({
250 args: ['--use', 'http'],
251 silent: true
252 })
253 expect({ ...pool1.opts.settings, exec: workerFilePath }).toStrictEqual({
254 args: ['--use', 'http'],
255 silent: true,
256 exec: workerFilePath
257 })
258 await pool1.destroy()
259 })
260
325f50bc
S
261 it('Should work even without opts in input', async () => {
262 const pool1 = new FixedClusterPool(
e1ffb94f 263 numberOfWorkers,
76b1e974 264 './tests/worker-files/cluster/testWorker.js'
325f50bc 265 )
6db75ad9 266 const res = await pool1.execute()
30b963d4 267 expect(res).toStrictEqual({ ok: 1 })
8bc77620
APA
268 // We need to clean up the resources after our test
269 await pool1.destroy()
325f50bc 270 })
8d3782fa
JB
271
272 it('Verify that a pool with zero worker fails', async () => {
273 expect(
274 () =>
275 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
2431bdb4 276 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 277 })
325f50bc 278})