1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
5 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
7 describe('Fixed cluster pool test suite', () => {
8 const numberOfWorkers
= 8
9 const tasksConcurrency
= 2
10 const pool
= new FixedClusterPool(
12 './tests/worker-files/cluster/testWorker.js',
14 errorHandler
: e
=> console
.error(e
)
17 const queuePool
= new FixedClusterPool(
19 './tests/worker-files/cluster/testWorker.js',
21 enableTasksQueue
: true,
23 concurrency
: tasksConcurrency
25 errorHandler
: e
=> console
.error(e
)
28 const emptyPool
= new FixedClusterPool(
30 './tests/worker-files/cluster/emptyWorker.js',
31 { exitHandler
: () => console
.info('empty pool worker exited') }
33 const echoPool
= new FixedClusterPool(
35 './tests/worker-files/cluster/echoWorker.js'
37 const errorPool
= new FixedClusterPool(
39 './tests/worker-files/cluster/errorWorker.js',
41 errorHandler
: e
=> console
.error(e
)
44 const asyncErrorPool
= new FixedClusterPool(
46 './tests/worker-files/cluster/asyncErrorWorker.js',
48 errorHandler
: e
=> console
.error(e
)
51 const asyncPool
= new FixedClusterPool(
53 './tests/worker-files/cluster/asyncWorker.js'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool
.destroy()
59 await asyncPool
.destroy()
60 await errorPool
.destroy()
61 await asyncErrorPool
.destroy()
62 await emptyPool
.destroy()
63 await queuePool
.destroy()
66 it('Verify that the function is executed in a worker cluster', async () => {
67 let result
= await pool
.execute({
68 function: TaskFunctions
.fibonacci
70 expect(result
).toBe(75025)
71 result
= await pool
.execute({
72 function: TaskFunctions
.factorial
74 expect(result
).toBe(9.33262154439441e157
)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result
= await pool
.execute()
79 expect(result
).toStrictEqual({ ok
: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool
= new FixedClusterPool(
85 './tests/worker-files/cluster/testWorker.js',
87 errorHandler
: e
=> console
.error(e
)
91 pool
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
92 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
93 expect(poolReady
).toBe(1)
97 it("Verify that 'busy' event is emitted", async () => {
98 const promises
= new Set()
100 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
101 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
102 promises
.add(pool
.execute())
104 await Promise
.all(promises
)
105 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
106 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
107 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
110 it('Verify that tasks queuing is working', async () => {
111 const promises
= new Set()
112 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
113 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
114 promises
.add(queuePool
.execute())
116 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
117 for (const workerNode
of queuePool
.workerNodes
) {
118 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
119 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
120 queuePool
.opts
.tasksQueueOptions
.concurrency
122 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
123 expect(workerNode
.usage
.tasks
.queued
).toBe(
124 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
126 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
127 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
129 expect(workerNode
.usage
.tasks
.stolen
).toBe(0)
131 expect(queuePool
.info
.executedTasks
).toBe(0)
132 expect(queuePool
.info
.executingTasks
).toBe(
133 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
135 expect(queuePool
.info
.queuedTasks
).toBe(
137 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
139 expect(queuePool
.info
.maxQueuedTasks
).toBe(
141 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
143 expect(queuePool
.info
.backPressure
).toBe(false)
144 expect(queuePool
.info
.stolenTasks
).toBe(0)
145 await Promise
.all(promises
)
146 for (const workerNode
of queuePool
.workerNodes
) {
147 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
148 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
149 numberOfWorkers
* maxMultiplier
151 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
152 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
153 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
154 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
156 expect(workerNode
.usage
.tasks
.stolen
).toBeGreaterThanOrEqual(0)
157 expect(workerNode
.usage
.tasks
.stolen
).toBeLessThanOrEqual(
158 numberOfWorkers
* maxMultiplier
161 expect(queuePool
.info
.executedTasks
).toBe(numberOfWorkers
* maxMultiplier
)
162 expect(queuePool
.info
.backPressure
).toBe(false)
163 expect(queuePool
.info
.stolenTasks
).toBeGreaterThanOrEqual(0)
164 expect(queuePool
.info
.stolenTasks
).toBeLessThanOrEqual(
165 numberOfWorkers
* maxMultiplier
169 it('Verify that is possible to have a worker that return undefined', async () => {
170 const result
= await emptyPool
.execute()
171 expect(result
).toBeUndefined()
174 it('Verify that data are sent to the worker correctly', async () => {
175 const data
= { f
: 10 }
176 const result
= await echoPool
.execute(data
)
177 expect(result
).toStrictEqual(data
)
180 it('Verify that error handling is working properly:sync', async () => {
181 const data
= { f
: 10 }
183 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
188 await errorPool
.execute(data
)
192 expect(inError
).toBeDefined()
193 expect(typeof inError
=== 'string').toBe(true)
194 expect(inError
).toBe('Error Message from ClusterWorker')
195 expect(taskError
).toStrictEqual({
196 name
: DEFAULT_TASK_NAME
,
197 message
: 'Error Message from ClusterWorker',
201 errorPool
.workerNodes
.some(
202 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
207 it('Verify that error handling is working properly:async', async () => {
208 const data
= { f
: 10 }
210 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
215 await asyncErrorPool
.execute(data
)
219 expect(inError
).toBeDefined()
220 expect(typeof inError
=== 'string').toBe(true)
221 expect(inError
).toBe('Error Message from ClusterWorker:async')
222 expect(taskError
).toStrictEqual({
223 name
: DEFAULT_TASK_NAME
,
224 message
: 'Error Message from ClusterWorker:async',
228 asyncErrorPool
.workerNodes
.some(
229 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
234 it('Verify that async function is working properly', async () => {
235 const data
= { f
: 10 }
236 const startTime
= performance
.now()
237 const result
= await asyncPool
.execute(data
)
238 const usedTime
= performance
.now() - startTime
239 expect(result
).toStrictEqual(data
)
240 expect(usedTime
).toBeGreaterThanOrEqual(2000)
243 it('Shutdown test', async () => {
244 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
246 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
248 const numberOfExitEvents
= await exitPromise
249 expect(pool
.started
).toBe(false)
250 expect(pool
.workerNodes
.length
).toBe(0)
251 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
252 expect(poolDestroy
).toBe(1)
255 it('Verify that cluster pool options are checked', async () => {
256 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
257 let pool
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
258 expect(pool
.opts
.env
).toBeUndefined()
259 expect(pool
.opts
.settings
).toBeUndefined()
261 pool
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
262 env
: { TEST
: 'test' },
263 settings
: { args
: ['--use', 'http'], silent
: true }
265 expect(pool
.opts
.env
).toStrictEqual({ TEST
: 'test' })
266 expect(pool
.opts
.settings
).toStrictEqual({
267 args
: ['--use', 'http'],
270 expect({ ...pool
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
271 args
: ['--use', 'http'],
278 it('Should work even without opts in input', async () => {
279 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
280 const pool
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
281 const res
= await pool
.execute()
282 expect(res
).toStrictEqual({ ok
: 1 })
283 // We need to clean up the resources after our test
287 it('Verify destroyWorkerNode()', async () => {
288 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
289 const pool
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
290 const workerNodeKey
= 0
291 let disconnectEvent
= 0
292 pool
.workerNodes
[workerNodeKey
].worker
.on('disconnect', () => {
296 pool
.workerNodes
[workerNodeKey
].worker
.on('exit', () => {
299 await
expect(pool
.destroyWorkerNode(workerNodeKey
)).resolves
.toBeUndefined()
300 expect(disconnectEvent
).toBe(1)
301 expect(exitEvent
).toBe(1)
302 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
- 1)
306 it('Verify that a pool with zero worker fails', async () => {
309 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
310 ).toThrowError('Cannot instantiate a fixed pool with zero worker')