bfc0a7246fa1d3a03d12786e52ff6504685bb024
1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const pool
= new FixedClusterPool(
10 './tests/worker-files/cluster/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedClusterPool(
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedClusterPool(
33 './tests/worker-files/cluster/echoWorker.js'
35 const errorPool
= new FixedClusterPool(
37 './tests/worker-files/cluster/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedClusterPool(
44 './tests/worker-files/cluster/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedClusterPool(
51 './tests/worker-files/cluster/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker cluster', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it
.skip("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedClusterPool(
83 './tests/worker-files/cluster/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, info
=> {
94 expect(poolReady
).toBe(1)
95 expect(poolInfo
).toBeDefined()
98 it("Verify that 'busy' event is emitted", async () => {
100 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
101 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
106 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
109 it('Verify that tasks queuing is working', async () => {
110 const promises
= new Set()
111 const maxMultiplier
= 2
112 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
113 promises
.add(queuePool
.execute())
115 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
116 for (const workerNode
of queuePool
.workerNodes
) {
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
122 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
124 expect(queuePool
.info
.executingTasks
).toBe(numberOfWorkers
)
125 expect(queuePool
.info
.queuedTasks
).toBe(
126 numberOfWorkers
* maxMultiplier
- numberOfWorkers
128 expect(queuePool
.info
.maxQueuedTasks
).toBe(
129 numberOfWorkers
* maxMultiplier
- numberOfWorkers
131 await Promise
.all(promises
)
132 for (const workerNode
of queuePool
.workerNodes
) {
133 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
134 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
135 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
136 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
137 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
141 it('Verify that is possible to have a worker that return undefined', async () => {
142 const result
= await emptyPool
.execute()
143 expect(result
).toBeUndefined()
146 it('Verify that data are sent to the worker correctly', async () => {
147 const data
= { f
: 10 }
148 const result
= await echoPool
.execute(data
)
149 expect(result
).toStrictEqual(data
)
152 it('Verify that error handling is working properly:sync', async () => {
153 const data
= { f
: 10 }
155 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
160 await errorPool
.execute(data
)
164 expect(inError
).toBeDefined()
165 expect(typeof inError
=== 'string').toBe(true)
166 expect(inError
).toBe('Error Message from ClusterWorker')
167 expect(taskError
).toStrictEqual({
169 message
: 'Error Message from ClusterWorker',
173 errorPool
.workerNodes
.some(
174 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
179 it('Verify that error handling is working properly:async', async () => {
180 const data
= { f
: 10 }
182 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
187 await asyncErrorPool
.execute(data
)
191 expect(inError
).toBeDefined()
192 expect(typeof inError
=== 'string').toBe(true)
193 expect(inError
).toBe('Error Message from ClusterWorker:async')
194 expect(taskError
).toStrictEqual({
196 message
: 'Error Message from ClusterWorker:async',
200 asyncErrorPool
.workerNodes
.some(
201 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
206 it('Verify that async function is working properly', async () => {
207 const data
= { f
: 10 }
208 const startTime
= performance
.now()
209 const result
= await asyncPool
.execute(data
)
210 const usedTime
= performance
.now() - startTime
211 expect(result
).toStrictEqual(data
)
212 expect(usedTime
).toBeGreaterThanOrEqual(2000)
215 it('Shutdown test', async () => {
216 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
218 const numberOfExitEvents
= await exitPromise
219 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
222 it('Verify that cluster pool options are checked', async () => {
223 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
224 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
225 expect(pool1
.opts
.env
).toBeUndefined()
226 expect(pool1
.opts
.settings
).toBeUndefined()
227 await pool1
.destroy()
228 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
229 env
: { TEST
: 'test' },
230 settings
: { args
: ['--use', 'http'], silent
: true }
232 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
233 expect(pool1
.opts
.settings
).toStrictEqual({
234 args
: ['--use', 'http'],
237 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
238 args
: ['--use', 'http'],
242 await pool1
.destroy()
245 it('Should work even without opts in input', async () => {
246 const pool1
= new FixedClusterPool(
248 './tests/worker-files/cluster/testWorker.js'
250 const res
= await pool1
.execute()
251 expect(res
).toStrictEqual({ ok
: 1 })
252 // We need to clean up the resources after our test
253 await pool1
.destroy()
256 it('Verify that a pool with zero worker fails', async () => {
259 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
260 ).toThrowError('Cannot instantiate a fixed pool with zero worker')