3baaa98ad04831151e433856bdcacee5682338f9
1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedClusterPool(
11 './tests/worker-files/cluster/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedClusterPool(
18 './tests/worker-files/cluster/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedClusterPool(
29 './tests/worker-files/cluster/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedClusterPool(
34 './tests/worker-files/cluster/echoWorker.js'
36 const errorPool
= new FixedClusterPool(
38 './tests/worker-files/cluster/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedClusterPool(
45 './tests/worker-files/cluster/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedClusterPool(
52 './tests/worker-files/cluster/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker cluster', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedClusterPool(
84 './tests/worker-files/cluster/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", () => {
97 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
98 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
103 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
106 it('Verify that tasks queuing is working', async () => {
107 const promises
= new Set()
108 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
109 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
110 promises
.add(queuePool
.execute())
112 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
115 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
116 queuePool
.opts
.tasksQueueOptions
.concurrency
118 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
119 expect(workerNode
.usage
.tasks
.queued
).toBe(
120 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
122 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
123 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
126 expect(queuePool
.info
.executingTasks
).toBe(
127 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
129 expect(queuePool
.info
.queuedTasks
).toBe(
131 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
133 expect(queuePool
.info
.maxQueuedTasks
).toBe(
135 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
137 expect(queuePool
.info
.backPressure
).toBe(false)
138 await Promise
.all(promises
)
139 for (const workerNode
of queuePool
.workerNodes
) {
140 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
141 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
142 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
143 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
144 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
149 it('Verify that is possible to have a worker that return undefined', async () => {
150 const result
= await emptyPool
.execute()
151 expect(result
).toBeUndefined()
154 it('Verify that data are sent to the worker correctly', async () => {
155 const data
= { f
: 10 }
156 const result
= await echoPool
.execute(data
)
157 expect(result
).toStrictEqual(data
)
160 it('Verify that error handling is working properly:sync', async () => {
161 const data
= { f
: 10 }
163 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
168 await errorPool
.execute(data
)
172 expect(inError
).toBeDefined()
173 expect(typeof inError
=== 'string').toBe(true)
174 expect(inError
).toBe('Error Message from ClusterWorker')
175 expect(taskError
).toStrictEqual({
177 message
: 'Error Message from ClusterWorker',
181 errorPool
.workerNodes
.some(
182 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
187 it('Verify that error handling is working properly:async', async () => {
188 const data
= { f
: 10 }
190 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
195 await asyncErrorPool
.execute(data
)
199 expect(inError
).toBeDefined()
200 expect(typeof inError
=== 'string').toBe(true)
201 expect(inError
).toBe('Error Message from ClusterWorker:async')
202 expect(taskError
).toStrictEqual({
204 message
: 'Error Message from ClusterWorker:async',
208 asyncErrorPool
.workerNodes
.some(
209 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
214 it('Verify that async function is working properly', async () => {
215 const data
= { f
: 10 }
216 const startTime
= performance
.now()
217 const result
= await asyncPool
.execute(data
)
218 const usedTime
= performance
.now() - startTime
219 expect(result
).toStrictEqual(data
)
220 expect(usedTime
).toBeGreaterThanOrEqual(2000)
223 it('Shutdown test', async () => {
224 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
226 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
228 const numberOfExitEvents
= await exitPromise
229 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
230 expect(poolDestroy
).toBe(1)
233 it('Verify that cluster pool options are checked', async () => {
234 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
235 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
236 expect(pool1
.opts
.env
).toBeUndefined()
237 expect(pool1
.opts
.settings
).toBeUndefined()
238 await pool1
.destroy()
239 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
240 env
: { TEST
: 'test' },
241 settings
: { args
: ['--use', 'http'], silent
: true }
243 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
244 expect(pool1
.opts
.settings
).toStrictEqual({
245 args
: ['--use', 'http'],
248 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
249 args
: ['--use', 'http'],
253 await pool1
.destroy()
256 it('Should work even without opts in input', async () => {
257 const pool1
= new FixedClusterPool(
259 './tests/worker-files/cluster/testWorker.js'
261 const res
= await pool1
.execute()
262 expect(res
).toStrictEqual({ ok
: 1 })
263 // We need to clean up the resources after our test
264 await pool1
.destroy()
267 it('Verify that a pool with zero worker fails', async () => {
270 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
271 ).toThrowError('Cannot instantiate a fixed pool with zero worker')