6daec1400dbc9dc0fdb921d6b25b53dd9d751eab
1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', async () => {
7 const numberOfWorkers
= 6
8 const pool
= new FixedClusterPool(
10 './tests/worker-files/cluster/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
16 pool
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
17 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
18 const queuePool
= new FixedClusterPool(
20 './tests/worker-files/cluster/testWorker.js',
22 enableTasksQueue
: true,
26 errorHandler
: e
=> console
.error(e
)
29 const emptyPool
= new FixedClusterPool(
31 './tests/worker-files/cluster/emptyWorker.js',
32 { exitHandler
: () => console
.log('empty pool worker exited') }
34 const echoPool
= new FixedClusterPool(
36 './tests/worker-files/cluster/echoWorker.js'
38 const errorPool
= new FixedClusterPool(
40 './tests/worker-files/cluster/errorWorker.js',
42 errorHandler
: e
=> console
.error(e
)
45 const asyncErrorPool
= new FixedClusterPool(
47 './tests/worker-files/cluster/asyncErrorWorker.js',
49 errorHandler
: e
=> console
.error(e
)
52 const asyncPool
= new FixedClusterPool(
54 './tests/worker-files/cluster/asyncWorker.js'
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool
.destroy()
60 await asyncPool
.destroy()
61 await errorPool
.destroy()
62 await asyncErrorPool
.destroy()
63 await emptyPool
.destroy()
64 await queuePool
.destroy()
67 it('Verify that the function is executed in a worker cluster', async () => {
68 let result
= await pool
.execute({
69 function: WorkerFunctions
.fibonacci
71 expect(result
).toBe(75025)
72 result
= await pool
.execute({
73 function: WorkerFunctions
.factorial
75 expect(result
).toBe(9.33262154439441e157
)
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result
= await pool
.execute()
80 expect(result
).toStrictEqual({ ok
: 1 })
83 it("Verify that 'ready' event is emitted", async () => {
84 expect(poolReady
).toBe(1)
87 it("Verify that 'busy' event is emitted", async () => {
89 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
90 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
93 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
94 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
95 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
98 it('Verify that tasks queuing is working', async () => {
99 const promises
= new Set()
100 const maxMultiplier
= 2
101 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
102 promises
.add(queuePool
.execute())
104 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
105 for (const workerNode
of queuePool
.workerNodes
) {
106 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
107 queuePool
.opts
.tasksQueueOptions
.concurrency
109 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
110 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
111 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
113 expect(queuePool
.info
.executingTasks
).toBe(numberOfWorkers
)
114 expect(queuePool
.info
.queuedTasks
).toBe(
115 numberOfWorkers
* maxMultiplier
- numberOfWorkers
117 expect(queuePool
.info
.maxQueuedTasks
).toBe(
118 numberOfWorkers
* maxMultiplier
- numberOfWorkers
120 await Promise
.all(promises
)
121 for (const workerNode
of queuePool
.workerNodes
) {
122 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
123 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
124 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
125 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
126 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
130 it('Verify that is possible to have a worker that return undefined', async () => {
131 const result
= await emptyPool
.execute()
132 expect(result
).toBeUndefined()
135 it('Verify that data are sent to the worker correctly', async () => {
136 const data
= { f
: 10 }
137 const result
= await echoPool
.execute(data
)
138 expect(result
).toStrictEqual(data
)
141 it('Verify that error handling is working properly:sync', async () => {
142 const data
= { f
: 10 }
144 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
149 await errorPool
.execute(data
)
153 expect(inError
).toBeDefined()
154 expect(typeof inError
=== 'string').toBe(true)
155 expect(inError
).toBe('Error Message from ClusterWorker')
156 expect(taskError
).toStrictEqual({
157 message
: 'Error Message from ClusterWorker',
161 errorPool
.workerNodes
.some(
162 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
167 it('Verify that error handling is working properly:async', async () => {
168 const data
= { f
: 10 }
170 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
175 await asyncErrorPool
.execute(data
)
179 expect(inError
).toBeDefined()
180 expect(typeof inError
=== 'string').toBe(true)
181 expect(inError
).toBe('Error Message from ClusterWorker:async')
182 expect(taskError
).toStrictEqual({
183 message
: 'Error Message from ClusterWorker:async',
187 asyncErrorPool
.workerNodes
.some(
188 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
193 it('Verify that async function is working properly', async () => {
194 const data
= { f
: 10 }
195 const startTime
= performance
.now()
196 const result
= await asyncPool
.execute(data
)
197 const usedTime
= performance
.now() - startTime
198 expect(result
).toStrictEqual(data
)
199 expect(usedTime
).toBeGreaterThanOrEqual(2000)
202 it('Shutdown test', async () => {
203 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
205 const numberOfExitEvents
= await exitPromise
206 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
209 it('Verify that cluster pool options are checked', async () => {
210 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
211 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
212 expect(pool1
.opts
.env
).toBeUndefined()
213 expect(pool1
.opts
.settings
).toBeUndefined()
214 await pool1
.destroy()
215 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
216 env
: { TEST
: 'test' },
217 settings
: { args
: ['--use', 'http'], silent
: true }
219 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
220 expect(pool1
.opts
.settings
).toStrictEqual({
221 args
: ['--use', 'http'],
224 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
225 args
: ['--use', 'http'],
229 await pool1
.destroy()
232 it('Should work even without opts in input', async () => {
233 const pool1
= new FixedClusterPool(
235 './tests/worker-files/cluster/testWorker.js'
237 const res
= await pool1
.execute()
238 expect(res
).toStrictEqual({ ok
: 1 })
239 // We need to clean up the resources after our test
240 await pool1
.destroy()
243 it('Verify that a pool with zero worker fails', async () => {
246 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
247 ).toThrowError('Cannot instantiate a fixed pool with zero worker')