e563b0ba520c67f2f11b44221e61b88b098c6496
1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedClusterPool(
11 './tests/worker-files/cluster/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedClusterPool(
18 './tests/worker-files/cluster/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedClusterPool(
29 './tests/worker-files/cluster/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedClusterPool(
34 './tests/worker-files/cluster/echoWorker.js'
36 const errorPool
= new FixedClusterPool(
38 './tests/worker-files/cluster/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedClusterPool(
45 './tests/worker-files/cluster/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedClusterPool(
52 './tests/worker-files/cluster/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker cluster', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedClusterPool(
84 './tests/worker-files/cluster/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", () => {
97 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
98 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
103 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
106 it('Verify that tasks queuing is working', async () => {
107 const promises
= new Set()
108 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
109 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
110 promises
.add(queuePool
.execute())
112 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
115 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
116 queuePool
.opts
.tasksQueueOptions
.concurrency
118 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
119 expect(workerNode
.usage
.tasks
.queued
).toBe(
120 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
122 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
123 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
126 expect(queuePool
.info
.executingTasks
).toBe(
127 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
129 expect(queuePool
.info
.queuedTasks
).toBe(
131 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
133 expect(queuePool
.info
.maxQueuedTasks
).toBe(
135 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
137 await Promise
.all(promises
)
138 for (const workerNode
of queuePool
.workerNodes
) {
139 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
140 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
141 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
142 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
143 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
148 it('Verify that is possible to have a worker that return undefined', async () => {
149 const result
= await emptyPool
.execute()
150 expect(result
).toBeUndefined()
153 it('Verify that data are sent to the worker correctly', async () => {
154 const data
= { f
: 10 }
155 const result
= await echoPool
.execute(data
)
156 expect(result
).toStrictEqual(data
)
159 it('Verify that error handling is working properly:sync', async () => {
160 const data
= { f
: 10 }
162 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
167 await errorPool
.execute(data
)
171 expect(inError
).toBeDefined()
172 expect(typeof inError
=== 'string').toBe(true)
173 expect(inError
).toBe('Error Message from ClusterWorker')
174 expect(taskError
).toStrictEqual({
176 message
: 'Error Message from ClusterWorker',
180 errorPool
.workerNodes
.some(
181 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
186 it('Verify that error handling is working properly:async', async () => {
187 const data
= { f
: 10 }
189 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
194 await asyncErrorPool
.execute(data
)
198 expect(inError
).toBeDefined()
199 expect(typeof inError
=== 'string').toBe(true)
200 expect(inError
).toBe('Error Message from ClusterWorker:async')
201 expect(taskError
).toStrictEqual({
203 message
: 'Error Message from ClusterWorker:async',
207 asyncErrorPool
.workerNodes
.some(
208 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
213 it('Verify that async function is working properly', async () => {
214 const data
= { f
: 10 }
215 const startTime
= performance
.now()
216 const result
= await asyncPool
.execute(data
)
217 const usedTime
= performance
.now() - startTime
218 expect(result
).toStrictEqual(data
)
219 expect(usedTime
).toBeGreaterThanOrEqual(2000)
222 it('Shutdown test', async () => {
223 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
225 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
227 const numberOfExitEvents
= await exitPromise
228 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
229 expect(poolDestroy
).toBe(1)
232 it('Verify that cluster pool options are checked', async () => {
233 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
234 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
235 expect(pool1
.opts
.env
).toBeUndefined()
236 expect(pool1
.opts
.settings
).toBeUndefined()
237 await pool1
.destroy()
238 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
239 env
: { TEST
: 'test' },
240 settings
: { args
: ['--use', 'http'], silent
: true }
242 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
243 expect(pool1
.opts
.settings
).toStrictEqual({
244 args
: ['--use', 'http'],
247 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
248 args
: ['--use', 'http'],
252 await pool1
.destroy()
255 it('Should work even without opts in input', async () => {
256 const pool1
= new FixedClusterPool(
258 './tests/worker-files/cluster/testWorker.js'
260 const res
= await pool1
.execute()
261 expect(res
).toStrictEqual({ ok
: 1 })
262 // We need to clean up the resources after our test
263 await pool1
.destroy()
266 it('Verify that a pool with zero worker fails', async () => {
269 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
270 ).toThrowError('Cannot instantiate a fixed pool with zero worker')