c93155b0bfbb3e8e8a29a579928e38b74363146e
1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const pool
= new FixedClusterPool(
10 './tests/worker-files/cluster/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedClusterPool(
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedClusterPool(
33 './tests/worker-files/cluster/echoWorker.js'
35 const errorPool
= new FixedClusterPool(
37 './tests/worker-files/cluster/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedClusterPool(
44 './tests/worker-files/cluster/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedClusterPool(
51 './tests/worker-files/cluster/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker cluster', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedClusterPool(
83 './tests/worker-files/cluster/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
89 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
90 if (!pool1
.info
.ready
) {
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
93 expect(poolReady
).toBe(1)
96 it("Verify that 'busy' event is emitted", async () => {
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
102 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
103 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
104 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
107 it('Verify that tasks queuing is working', async () => {
108 const promises
= new Set()
109 const maxMultiplier
= 2
110 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
111 promises
.add(queuePool
.execute())
113 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
114 for (const workerNode
of queuePool
.workerNodes
) {
115 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
116 queuePool
.opts
.tasksQueueOptions
.concurrency
118 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
119 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
120 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
122 expect(queuePool
.info
.executingTasks
).toBe(numberOfWorkers
)
123 expect(queuePool
.info
.queuedTasks
).toBe(
124 numberOfWorkers
* maxMultiplier
- numberOfWorkers
126 expect(queuePool
.info
.maxQueuedTasks
).toBe(
127 numberOfWorkers
* maxMultiplier
- numberOfWorkers
129 await Promise
.all(promises
)
130 for (const workerNode
of queuePool
.workerNodes
) {
131 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
132 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
133 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
134 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
135 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
139 it('Verify that is possible to have a worker that return undefined', async () => {
140 const result
= await emptyPool
.execute()
141 expect(result
).toBeUndefined()
144 it('Verify that data are sent to the worker correctly', async () => {
145 const data
= { f
: 10 }
146 const result
= await echoPool
.execute(data
)
147 expect(result
).toStrictEqual(data
)
150 it('Verify that error handling is working properly:sync', async () => {
151 const data
= { f
: 10 }
153 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
158 await errorPool
.execute(data
)
162 expect(inError
).toBeDefined()
163 expect(typeof inError
=== 'string').toBe(true)
164 expect(inError
).toBe('Error Message from ClusterWorker')
165 expect(taskError
).toStrictEqual({
167 message
: 'Error Message from ClusterWorker',
171 errorPool
.workerNodes
.some(
172 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
177 it('Verify that error handling is working properly:async', async () => {
178 const data
= { f
: 10 }
180 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
185 await asyncErrorPool
.execute(data
)
189 expect(inError
).toBeDefined()
190 expect(typeof inError
=== 'string').toBe(true)
191 expect(inError
).toBe('Error Message from ClusterWorker:async')
192 expect(taskError
).toStrictEqual({
194 message
: 'Error Message from ClusterWorker:async',
198 asyncErrorPool
.workerNodes
.some(
199 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
204 it('Verify that async function is working properly', async () => {
205 const data
= { f
: 10 }
206 const startTime
= performance
.now()
207 const result
= await asyncPool
.execute(data
)
208 const usedTime
= performance
.now() - startTime
209 expect(result
).toStrictEqual(data
)
210 expect(usedTime
).toBeGreaterThanOrEqual(2000)
213 it('Shutdown test', async () => {
214 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
216 const numberOfExitEvents
= await exitPromise
217 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
220 it('Verify that cluster pool options are checked', async () => {
221 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
222 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
223 expect(pool1
.opts
.env
).toBeUndefined()
224 expect(pool1
.opts
.settings
).toBeUndefined()
225 await pool1
.destroy()
226 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
227 env
: { TEST
: 'test' },
228 settings
: { args
: ['--use', 'http'], silent
: true }
230 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
231 expect(pool1
.opts
.settings
).toStrictEqual({
232 args
: ['--use', 'http'],
235 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
236 args
: ['--use', 'http'],
240 await pool1
.destroy()
243 it('Should work even without opts in input', async () => {
244 const pool1
= new FixedClusterPool(
246 './tests/worker-files/cluster/testWorker.js'
248 const res
= await pool1
.execute()
249 expect(res
).toStrictEqual({ ok
: 1 })
250 // We need to clean up the resources after our test
251 await pool1
.destroy()
254 it('Verify that a pool with zero worker fails', async () => {
257 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
258 ).toThrowError('Cannot instantiate a fixed pool with zero worker')