1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 8
8 const tasksConcurrency
= 2
9 const pool
= new FixedClusterPool(
11 './tests/worker-files/cluster/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedClusterPool(
18 './tests/worker-files/cluster/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedClusterPool(
29 './tests/worker-files/cluster/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedClusterPool(
34 './tests/worker-files/cluster/echoWorker.js'
36 const errorPool
= new FixedClusterPool(
38 './tests/worker-files/cluster/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedClusterPool(
45 './tests/worker-files/cluster/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedClusterPool(
52 './tests/worker-files/cluster/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker cluster', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedClusterPool(
84 './tests/worker-files/cluster/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises
= new Set()
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
100 promises
.add(pool
.execute())
102 await Promise
.all(promises
)
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
105 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
108 it('Verify that tasks queuing is working', async () => {
109 const promises
= new Set()
110 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
111 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
112 promises
.add(queuePool
.execute())
114 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
115 for (const workerNode
of queuePool
.workerNodes
) {
116 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
124 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
125 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
127 expect(workerNode
.usage
.tasks
.stolen
).toBe(0)
129 expect(queuePool
.info
.executedTasks
).toBe(0)
130 expect(queuePool
.info
.executingTasks
).toBe(
131 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
133 expect(queuePool
.info
.queuedTasks
).toBe(
135 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
137 expect(queuePool
.info
.maxQueuedTasks
).toBe(
139 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
141 expect(queuePool
.info
.backPressure
).toBe(false)
142 expect(queuePool
.info
.stolenTasks
).toBe(0)
143 await Promise
.all(promises
)
144 for (const workerNode
of queuePool
.workerNodes
) {
145 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
146 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
147 numberOfWorkers
* maxMultiplier
149 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
150 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
151 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
152 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
154 expect(workerNode
.usage
.tasks
.stolen
).toBeGreaterThanOrEqual(0)
155 expect(workerNode
.usage
.tasks
.stolen
).toBeLessThanOrEqual(
156 numberOfWorkers
* maxMultiplier
159 expect(queuePool
.info
.executedTasks
).toBe(numberOfWorkers
* maxMultiplier
)
160 expect(queuePool
.info
.backPressure
).toBe(false)
161 expect(queuePool
.info
.stolenTasks
).toBeGreaterThanOrEqual(0)
162 expect(queuePool
.info
.stolenTasks
).toBeLessThanOrEqual(
163 numberOfWorkers
* maxMultiplier
167 it('Verify that is possible to have a worker that return undefined', async () => {
168 const result
= await emptyPool
.execute()
169 expect(result
).toBeUndefined()
172 it('Verify that data are sent to the worker correctly', async () => {
173 const data
= { f
: 10 }
174 const result
= await echoPool
.execute(data
)
175 expect(result
).toStrictEqual(data
)
178 it('Verify that error handling is working properly:sync', async () => {
179 const data
= { f
: 10 }
181 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
186 await errorPool
.execute(data
)
190 expect(inError
).toBeDefined()
191 expect(typeof inError
=== 'string').toBe(true)
192 expect(inError
).toBe('Error Message from ClusterWorker')
193 expect(taskError
).toStrictEqual({
195 message
: 'Error Message from ClusterWorker',
199 errorPool
.workerNodes
.some(
200 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
205 it('Verify that error handling is working properly:async', async () => {
206 const data
= { f
: 10 }
208 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
213 await asyncErrorPool
.execute(data
)
217 expect(inError
).toBeDefined()
218 expect(typeof inError
=== 'string').toBe(true)
219 expect(inError
).toBe('Error Message from ClusterWorker:async')
220 expect(taskError
).toStrictEqual({
222 message
: 'Error Message from ClusterWorker:async',
226 asyncErrorPool
.workerNodes
.some(
227 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
232 it('Verify that async function is working properly', async () => {
233 const data
= { f
: 10 }
234 const startTime
= performance
.now()
235 const result
= await asyncPool
.execute(data
)
236 const usedTime
= performance
.now() - startTime
237 expect(result
).toStrictEqual(data
)
238 expect(usedTime
).toBeGreaterThanOrEqual(2000)
241 it('Shutdown test', async () => {
242 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
244 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
246 const numberOfExitEvents
= await exitPromise
247 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
248 expect(poolDestroy
).toBe(1)
251 it('Verify that cluster pool options are checked', async () => {
252 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
253 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
254 expect(pool1
.opts
.env
).toBeUndefined()
255 expect(pool1
.opts
.settings
).toBeUndefined()
256 await pool1
.destroy()
257 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
258 env
: { TEST
: 'test' },
259 settings
: { args
: ['--use', 'http'], silent
: true }
261 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
262 expect(pool1
.opts
.settings
).toStrictEqual({
263 args
: ['--use', 'http'],
266 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
267 args
: ['--use', 'http'],
271 await pool1
.destroy()
274 it('Should work even without opts in input', async () => {
275 const pool1
= new FixedClusterPool(
277 './tests/worker-files/cluster/testWorker.js'
279 const res
= await pool1
.execute()
280 expect(res
).toStrictEqual({ ok
: 1 })
281 // We need to clean up the resources after our test
282 await pool1
.destroy()
285 it('Verify that a pool with zero worker fails', async () => {
288 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
289 ).toThrowError('Cannot instantiate a fixed pool with zero worker')