1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedClusterPool(
11 './tests/worker-files/cluster/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedClusterPool(
18 './tests/worker-files/cluster/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedClusterPool(
29 './tests/worker-files/cluster/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedClusterPool(
34 './tests/worker-files/cluster/echoWorker.js'
36 const errorPool
= new FixedClusterPool(
38 './tests/worker-files/cluster/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedClusterPool(
45 './tests/worker-files/cluster/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedClusterPool(
52 './tests/worker-files/cluster/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker cluster', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedClusterPool(
84 './tests/worker-files/cluster/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", () => {
97 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
98 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
103 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
106 it('Verify that tasks queuing is working', async () => {
107 const promises
= new Set()
108 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
109 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
110 promises
.add(queuePool
.execute())
112 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
115 queuePool
.opts
.tasksQueueOptions
.concurrency
117 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
118 expect(workerNode
.usage
.tasks
.queued
).toBe(
119 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
121 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
125 expect(queuePool
.info
.executingTasks
).toBe(
126 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(queuePool
.info
.queuedTasks
).toBe(
130 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
132 expect(queuePool
.info
.maxQueuedTasks
).toBe(
134 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
136 await Promise
.all(promises
)
137 for (const workerNode
of queuePool
.workerNodes
) {
138 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
139 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
140 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
141 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
142 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
147 it('Verify that is possible to have a worker that return undefined', async () => {
148 const result
= await emptyPool
.execute()
149 expect(result
).toBeUndefined()
152 it('Verify that data are sent to the worker correctly', async () => {
153 const data
= { f
: 10 }
154 const result
= await echoPool
.execute(data
)
155 expect(result
).toStrictEqual(data
)
158 it('Verify that error handling is working properly:sync', async () => {
159 const data
= { f
: 10 }
161 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
166 await errorPool
.execute(data
)
170 expect(inError
).toBeDefined()
171 expect(typeof inError
=== 'string').toBe(true)
172 expect(inError
).toBe('Error Message from ClusterWorker')
173 expect(taskError
).toStrictEqual({
175 message
: 'Error Message from ClusterWorker',
179 errorPool
.workerNodes
.some(
180 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
185 it('Verify that error handling is working properly:async', async () => {
186 const data
= { f
: 10 }
188 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
193 await asyncErrorPool
.execute(data
)
197 expect(inError
).toBeDefined()
198 expect(typeof inError
=== 'string').toBe(true)
199 expect(inError
).toBe('Error Message from ClusterWorker:async')
200 expect(taskError
).toStrictEqual({
202 message
: 'Error Message from ClusterWorker:async',
206 asyncErrorPool
.workerNodes
.some(
207 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
212 it('Verify that async function is working properly', async () => {
213 const data
= { f
: 10 }
214 const startTime
= performance
.now()
215 const result
= await asyncPool
.execute(data
)
216 const usedTime
= performance
.now() - startTime
217 expect(result
).toStrictEqual(data
)
218 expect(usedTime
).toBeGreaterThanOrEqual(2000)
221 it('Shutdown test', async () => {
222 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
224 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
226 const numberOfExitEvents
= await exitPromise
227 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
228 expect(poolDestroy
).toBe(1)
231 it('Verify that cluster pool options are checked', async () => {
232 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
233 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
234 expect(pool1
.opts
.env
).toBeUndefined()
235 expect(pool1
.opts
.settings
).toBeUndefined()
236 await pool1
.destroy()
237 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
238 env
: { TEST
: 'test' },
239 settings
: { args
: ['--use', 'http'], silent
: true }
241 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
242 expect(pool1
.opts
.settings
).toStrictEqual({
243 args
: ['--use', 'http'],
246 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
247 args
: ['--use', 'http'],
251 await pool1
.destroy()
254 it('Should work even without opts in input', async () => {
255 const pool1
= new FixedClusterPool(
257 './tests/worker-files/cluster/testWorker.js'
259 const res
= await pool1
.execute()
260 expect(res
).toStrictEqual({ ok
: 1 })
261 // We need to clean up the resources after our test
262 await pool1
.destroy()
265 it('Verify that a pool with zero worker fails', async () => {
268 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
269 ).toThrowError('Cannot instantiate a fixed pool with zero worker')