1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
5 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
7 describe('Fixed cluster pool test suite', () => {
8 const numberOfWorkers
= 8
9 const tasksConcurrency
= 2
10 const pool
= new FixedClusterPool(
12 './tests/worker-files/cluster/testWorker.js',
14 errorHandler
: (e
) => console
.error(e
)
17 const queuePool
= new FixedClusterPool(
19 './tests/worker-files/cluster/testWorker.js',
21 enableTasksQueue
: true,
23 concurrency
: tasksConcurrency
25 errorHandler
: (e
) => console
.error(e
)
28 const emptyPool
= new FixedClusterPool(
30 './tests/worker-files/cluster/emptyWorker.js',
31 { exitHandler
: () => console
.info('empty pool worker exited') }
33 const echoPool
= new FixedClusterPool(
35 './tests/worker-files/cluster/echoWorker.js'
37 const errorPool
= new FixedClusterPool(
39 './tests/worker-files/cluster/errorWorker.js',
41 errorHandler
: (e
) => console
.error(e
)
44 const asyncErrorPool
= new FixedClusterPool(
46 './tests/worker-files/cluster/asyncErrorWorker.js',
48 errorHandler
: (e
) => console
.error(e
)
51 const asyncPool
= new FixedClusterPool(
53 './tests/worker-files/cluster/asyncWorker.js'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool
.destroy()
59 await asyncPool
.destroy()
60 await errorPool
.destroy()
61 await asyncErrorPool
.destroy()
62 await emptyPool
.destroy()
63 await queuePool
.destroy()
66 it('Verify that the function is executed in a worker cluster', async () => {
67 let result
= await pool
.execute({
68 function: TaskFunctions
.fibonacci
70 expect(result
).toBe(75025)
71 result
= await pool
.execute({
72 function: TaskFunctions
.factorial
74 expect(result
).toBe(9.33262154439441e157
)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result
= await pool
.execute()
79 expect(result
).toStrictEqual({ ok
: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool1
= new FixedClusterPool(
85 './tests/worker-files/cluster/testWorker.js',
87 errorHandler
: (e
) => console
.error(e
)
91 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
92 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
93 expect(poolReady
).toBe(1)
96 it("Verify that 'busy' event is emitted", async () => {
97 const promises
= new Set()
99 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
100 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
101 promises
.add(pool
.execute())
103 await Promise
.all(promises
)
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
106 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
109 it('Verify that tasks queuing is working', async () => {
110 const promises
= new Set()
111 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
112 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
113 promises
.add(queuePool
.execute())
115 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
116 for (const workerNode
of queuePool
.workerNodes
) {
117 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
118 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
119 queuePool
.opts
.tasksQueueOptions
.concurrency
121 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
122 expect(workerNode
.usage
.tasks
.queued
).toBe(
123 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
125 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
126 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(workerNode
.usage
.tasks
.stolen
).toBe(0)
130 expect(queuePool
.info
.executedTasks
).toBe(0)
131 expect(queuePool
.info
.executingTasks
).toBe(
132 numberOfWorkers
* queuePool
.opts
.tasksQueueOptions
.concurrency
134 expect(queuePool
.info
.queuedTasks
).toBe(
136 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
138 expect(queuePool
.info
.maxQueuedTasks
).toBe(
140 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
142 expect(queuePool
.info
.backPressure
).toBe(false)
143 expect(queuePool
.info
.stolenTasks
).toBe(0)
144 await Promise
.all(promises
)
145 for (const workerNode
of queuePool
.workerNodes
) {
146 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
147 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
148 numberOfWorkers
* maxMultiplier
150 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
151 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
152 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
153 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
155 expect(workerNode
.usage
.tasks
.stolen
).toBeGreaterThanOrEqual(0)
156 expect(workerNode
.usage
.tasks
.stolen
).toBeLessThanOrEqual(
157 numberOfWorkers
* maxMultiplier
160 expect(queuePool
.info
.executedTasks
).toBe(numberOfWorkers
* maxMultiplier
)
161 expect(queuePool
.info
.backPressure
).toBe(false)
162 expect(queuePool
.info
.stolenTasks
).toBeGreaterThanOrEqual(0)
163 expect(queuePool
.info
.stolenTasks
).toBeLessThanOrEqual(
164 numberOfWorkers
* maxMultiplier
168 it('Verify that is possible to have a worker that return undefined', async () => {
169 const result
= await emptyPool
.execute()
170 expect(result
).toBeUndefined()
173 it('Verify that data are sent to the worker correctly', async () => {
174 const data
= { f
: 10 }
175 const result
= await echoPool
.execute(data
)
176 expect(result
).toStrictEqual(data
)
179 it('Verify that error handling is working properly:sync', async () => {
180 const data
= { f
: 10 }
182 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
187 await errorPool
.execute(data
)
191 expect(inError
).toBeDefined()
192 expect(typeof inError
=== 'string').toBe(true)
193 expect(inError
).toBe('Error Message from ClusterWorker')
194 expect(taskError
).toStrictEqual({
195 name
: DEFAULT_TASK_NAME
,
196 message
: 'Error Message from ClusterWorker',
200 errorPool
.workerNodes
.some(
201 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
206 it('Verify that error handling is working properly:async', async () => {
207 const data
= { f
: 10 }
209 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
214 await asyncErrorPool
.execute(data
)
218 expect(inError
).toBeDefined()
219 expect(typeof inError
=== 'string').toBe(true)
220 expect(inError
).toBe('Error Message from ClusterWorker:async')
221 expect(taskError
).toStrictEqual({
222 name
: DEFAULT_TASK_NAME
,
223 message
: 'Error Message from ClusterWorker:async',
227 asyncErrorPool
.workerNodes
.some(
228 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
233 it('Verify that async function is working properly', async () => {
234 const data
= { f
: 10 }
235 const startTime
= performance
.now()
236 const result
= await asyncPool
.execute(data
)
237 const usedTime
= performance
.now() - startTime
238 expect(result
).toStrictEqual(data
)
239 expect(usedTime
).toBeGreaterThanOrEqual(2000)
242 it('Shutdown test', async () => {
243 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
245 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
247 const numberOfExitEvents
= await exitPromise
248 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
249 expect(poolDestroy
).toBe(1)
252 it('Verify that cluster pool options are checked', async () => {
253 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
254 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
255 expect(pool1
.opts
.env
).toBeUndefined()
256 expect(pool1
.opts
.settings
).toBeUndefined()
257 await pool1
.destroy()
258 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
259 env
: { TEST
: 'test' },
260 settings
: { args
: ['--use', 'http'], silent
: true }
262 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
263 expect(pool1
.opts
.settings
).toStrictEqual({
264 args
: ['--use', 'http'],
267 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
268 args
: ['--use', 'http'],
272 await pool1
.destroy()
275 it('Should work even without opts in input', async () => {
276 const pool1
= new FixedClusterPool(
278 './tests/worker-files/cluster/testWorker.js'
280 const res
= await pool1
.execute()
281 expect(res
).toStrictEqual({ ok
: 1 })
282 // We need to clean up the resources after our test
283 await pool1
.destroy()
286 it('Verify that a pool with zero worker fails', async () => {
289 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
290 ).toThrowError('Cannot instantiate a fixed pool with zero worker')