c638663f01c01ecdfde4b8c3c9b3ecaefe73539e
1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const pool
= new FixedClusterPool(
10 './tests/worker-files/cluster/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedClusterPool(
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedClusterPool(
33 './tests/worker-files/cluster/echoWorker.js'
35 const errorPool
= new FixedClusterPool(
37 './tests/worker-files/cluster/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedClusterPool(
44 './tests/worker-files/cluster/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedClusterPool(
51 './tests/worker-files/cluster/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker cluster', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedClusterPool(
83 './tests/worker-files/cluster/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
89 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
90 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
91 expect(poolReady
).toBe(1)
94 it("Verify that 'busy' event is emitted", async () => {
96 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
97 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
100 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
101 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
102 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
105 it('Verify that tasks queuing is working', async () => {
106 const promises
= new Set()
107 const maxMultiplier
= 2
108 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
109 promises
.add(queuePool
.execute())
111 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
112 for (const workerNode
of queuePool
.workerNodes
) {
113 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
114 queuePool
.opts
.tasksQueueOptions
.concurrency
116 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
117 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
118 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
120 expect(queuePool
.info
.executingTasks
).toBe(numberOfWorkers
)
121 expect(queuePool
.info
.queuedTasks
).toBe(
122 numberOfWorkers
* maxMultiplier
- numberOfWorkers
124 expect(queuePool
.info
.maxQueuedTasks
).toBe(
125 numberOfWorkers
* maxMultiplier
- numberOfWorkers
127 await Promise
.all(promises
)
128 for (const workerNode
of queuePool
.workerNodes
) {
129 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
130 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
131 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
132 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
133 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
137 it('Verify that is possible to have a worker that return undefined', async () => {
138 const result
= await emptyPool
.execute()
139 expect(result
).toBeUndefined()
142 it('Verify that data are sent to the worker correctly', async () => {
143 const data
= { f
: 10 }
144 const result
= await echoPool
.execute(data
)
145 expect(result
).toStrictEqual(data
)
148 it('Verify that error handling is working properly:sync', async () => {
149 const data
= { f
: 10 }
151 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
156 await errorPool
.execute(data
)
160 expect(inError
).toBeDefined()
161 expect(typeof inError
=== 'string').toBe(true)
162 expect(inError
).toBe('Error Message from ClusterWorker')
163 expect(taskError
).toStrictEqual({
164 message
: 'Error Message from ClusterWorker',
168 errorPool
.workerNodes
.some(
169 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
174 it('Verify that error handling is working properly:async', async () => {
175 const data
= { f
: 10 }
177 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
182 await asyncErrorPool
.execute(data
)
186 expect(inError
).toBeDefined()
187 expect(typeof inError
=== 'string').toBe(true)
188 expect(inError
).toBe('Error Message from ClusterWorker:async')
189 expect(taskError
).toStrictEqual({
190 message
: 'Error Message from ClusterWorker:async',
194 asyncErrorPool
.workerNodes
.some(
195 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
200 it('Verify that async function is working properly', async () => {
201 const data
= { f
: 10 }
202 const startTime
= performance
.now()
203 const result
= await asyncPool
.execute(data
)
204 const usedTime
= performance
.now() - startTime
205 expect(result
).toStrictEqual(data
)
206 expect(usedTime
).toBeGreaterThanOrEqual(2000)
209 it('Shutdown test', async () => {
210 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
212 const numberOfExitEvents
= await exitPromise
213 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
216 it('Verify that cluster pool options are checked', async () => {
217 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
218 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
219 expect(pool1
.opts
.env
).toBeUndefined()
220 expect(pool1
.opts
.settings
).toBeUndefined()
221 await pool1
.destroy()
222 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
223 env
: { TEST
: 'test' },
224 settings
: { args
: ['--use', 'http'], silent
: true }
226 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
227 expect(pool1
.opts
.settings
).toStrictEqual({
228 args
: ['--use', 'http'],
231 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
232 args
: ['--use', 'http'],
236 await pool1
.destroy()
239 it('Should work even without opts in input', async () => {
240 const pool1
= new FixedClusterPool(
242 './tests/worker-files/cluster/testWorker.js'
244 const res
= await pool1
.execute()
245 expect(res
).toStrictEqual({ ok
: 1 })
246 // We need to clean up the resources after our test
247 await pool1
.destroy()
250 it('Verify that a pool with zero worker fails', async () => {
253 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
254 ).toThrowError('Cannot instantiate a fixed pool with zero worker')