1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const TestUtils
= require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const pool
= new FixedClusterPool(
10 './tests/worker-files/cluster/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedClusterPool(
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedClusterPool(
33 './tests/worker-files/cluster/echoWorker.js'
35 const errorPool
= new FixedClusterPool(
37 './tests/worker-files/cluster/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedClusterPool(
44 './tests/worker-files/cluster/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedClusterPool(
51 './tests/worker-files/cluster/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker cluster', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(121393)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toBe(false)
80 it("Verify that 'busy' event is emitted", async () => {
82 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
83 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
88 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
91 it('Verify that tasks queuing is working', async () => {
92 const promises
= new Set()
93 const maxMultiplier
= 2
94 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
95 promises
.add(queuePool
.execute())
97 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
98 for (const workerNode
of queuePool
.workerNodes
) {
99 expect(workerNode
.workerUsage
.tasks
.executing
).toBeLessThanOrEqual(
100 queuePool
.opts
.tasksQueueOptions
.concurrency
102 expect(workerNode
.workerUsage
.tasks
.executed
).toBe(0)
103 expect(workerNode
.tasksQueue
.size
).toBeGreaterThan(0)
105 expect(queuePool
.info
.executingTasks
).toBe(numberOfWorkers
)
106 expect(queuePool
.info
.queuedTasks
).toBe(
107 numberOfWorkers
* maxMultiplier
- numberOfWorkers
109 expect(queuePool
.info
.maxQueuedTasks
).toBe(
110 numberOfWorkers
* maxMultiplier
- numberOfWorkers
112 await Promise
.all(promises
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.workerUsage
.tasks
.executing
).toBe(0)
115 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
116 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
119 expect(workerNode
.tasksQueue
.size
).toBe(0)
123 it('Verify that is possible to have a worker that return undefined', async () => {
124 const result
= await emptyPool
.execute()
125 expect(result
).toBeUndefined()
128 it('Verify that data are sent to the worker correctly', async () => {
129 const data
= { f
: 10 }
130 const result
= await echoPool
.execute(data
)
131 expect(result
).toStrictEqual(data
)
134 it('Verify that error handling is working properly:sync', async () => {
135 const data
= { f
: 10 }
137 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
142 await errorPool
.execute(data
)
146 expect(inError
).toBeDefined()
147 expect(typeof inError
=== 'string').toBe(true)
148 expect(inError
).toBe('Error Message from ClusterWorker')
149 expect(taskError
).toStrictEqual({
150 message
: 'Error Message from ClusterWorker',
154 errorPool
.workerNodes
.some(
155 workerNode
=> workerNode
.workerUsage
.tasks
.failed
=== 1
160 it('Verify that error handling is working properly:async', async () => {
161 const data
= { f
: 10 }
163 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
168 await asyncErrorPool
.execute(data
)
172 expect(inError
).toBeDefined()
173 expect(typeof inError
=== 'string').toBe(true)
174 expect(inError
).toBe('Error Message from ClusterWorker:async')
175 expect(taskError
).toStrictEqual({
176 message
: 'Error Message from ClusterWorker:async',
180 asyncErrorPool
.workerNodes
.some(
181 workerNode
=> workerNode
.workerUsage
.tasks
.failed
=== 1
186 it('Verify that async function is working properly', async () => {
187 const data
= { f
: 10 }
188 const startTime
= performance
.now()
189 const result
= await asyncPool
.execute(data
)
190 const usedTime
= performance
.now() - startTime
191 expect(result
).toStrictEqual(data
)
192 expect(usedTime
).toBeGreaterThanOrEqual(2000)
195 it('Shutdown test', async () => {
196 const exitPromise
= TestUtils
.waitWorkerExits(pool
, numberOfWorkers
)
198 const numberOfExitEvents
= await exitPromise
199 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
202 it('Verify that cluster pool options are checked', async () => {
203 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
204 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
205 expect(pool1
.opts
.env
).toBeUndefined()
206 expect(pool1
.opts
.settings
).toBeUndefined()
207 await pool1
.destroy()
208 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
209 env
: { TEST
: 'test' },
210 settings
: { args
: ['--use', 'http'], silent
: true }
212 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
213 expect(pool1
.opts
.settings
).toStrictEqual({
214 args
: ['--use', 'http'],
217 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
218 args
: ['--use', 'http'],
222 await pool1
.destroy()
225 it('Should work even without opts in input', async () => {
226 const pool1
= new FixedClusterPool(
228 './tests/worker-files/cluster/testWorker.js'
230 const res
= await pool1
.execute()
231 expect(res
).toBe(false)
232 // We need to clean up the resources after our test
233 await pool1
.destroy()
236 it('Verify that a pool with zero worker fails', async () => {
239 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
240 ).toThrowError('Cannot instantiate a fixed pool with no worker')