1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const TestUtils
= require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const pool
= new FixedClusterPool(
10 './tests/worker-files/cluster/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedClusterPool(
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedClusterPool(
33 './tests/worker-files/cluster/echoWorker.js'
35 const errorPool
= new FixedClusterPool(
37 './tests/worker-files/cluster/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedClusterPool(
44 './tests/worker-files/cluster/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedClusterPool(
51 './tests/worker-files/cluster/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker cluster', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toBe(false)
80 it("Verify that 'busy' event is emitted", async () => {
82 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
83 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
88 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
91 it('Verify that tasks queuing is working', async () => {
92 const promises
= new Set()
93 const maxMultiplier
= 2
94 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
95 promises
.add(queuePool
.execute())
97 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
98 for (const workerNode
of queuePool
.workerNodes
) {
99 expect(workerNode
.workerUsage
.tasks
.executing
).toBeLessThanOrEqual(
100 queuePool
.opts
.tasksQueueOptions
.concurrency
102 expect(workerNode
.workerUsage
.tasks
.executed
).toBe(0)
103 expect(workerNode
.workerUsage
.tasks
.queued
).toBeGreaterThan(0)
104 expect(workerNode
.workerUsage
.tasks
.maxQueued
).toBeGreaterThan(0)
106 expect(queuePool
.info
.executingTasks
).toBe(numberOfWorkers
)
107 expect(queuePool
.info
.queuedTasks
).toBe(
108 numberOfWorkers
* maxMultiplier
- numberOfWorkers
110 expect(queuePool
.info
.maxQueuedTasks
).toBe(
111 numberOfWorkers
* maxMultiplier
- numberOfWorkers
113 await Promise
.all(promises
)
114 for (const workerNode
of queuePool
.workerNodes
) {
115 expect(workerNode
.workerUsage
.tasks
.executing
).toBe(0)
116 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
117 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
120 expect(workerNode
.workerUsage
.tasks
.queued
).toBe(0)
121 expect(workerNode
.workerUsage
.tasks
.maxQueued
).toBe(1)
125 it('Verify that is possible to have a worker that return undefined', async () => {
126 const result
= await emptyPool
.execute()
127 expect(result
).toBeUndefined()
130 it('Verify that data are sent to the worker correctly', async () => {
131 const data
= { f
: 10 }
132 const result
= await echoPool
.execute(data
)
133 expect(result
).toStrictEqual(data
)
136 it('Verify that error handling is working properly:sync', async () => {
137 const data
= { f
: 10 }
139 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
144 await errorPool
.execute(data
)
148 expect(inError
).toBeDefined()
149 expect(typeof inError
=== 'string').toBe(true)
150 expect(inError
).toBe('Error Message from ClusterWorker')
151 expect(taskError
).toStrictEqual({
152 message
: 'Error Message from ClusterWorker',
156 errorPool
.workerNodes
.some(
157 workerNode
=> workerNode
.workerUsage
.tasks
.failed
=== 1
162 it('Verify that error handling is working properly:async', async () => {
163 const data
= { f
: 10 }
165 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
170 await asyncErrorPool
.execute(data
)
174 expect(inError
).toBeDefined()
175 expect(typeof inError
=== 'string').toBe(true)
176 expect(inError
).toBe('Error Message from ClusterWorker:async')
177 expect(taskError
).toStrictEqual({
178 message
: 'Error Message from ClusterWorker:async',
182 asyncErrorPool
.workerNodes
.some(
183 workerNode
=> workerNode
.workerUsage
.tasks
.failed
=== 1
188 it('Verify that async function is working properly', async () => {
189 const data
= { f
: 10 }
190 const startTime
= performance
.now()
191 const result
= await asyncPool
.execute(data
)
192 const usedTime
= performance
.now() - startTime
193 expect(result
).toStrictEqual(data
)
194 expect(usedTime
).toBeGreaterThanOrEqual(2000)
197 it('Shutdown test', async () => {
198 const exitPromise
= TestUtils
.waitWorkerEvents(
204 const numberOfExitEvents
= await exitPromise
205 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
208 it('Verify that cluster pool options are checked', async () => {
209 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
210 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
211 expect(pool1
.opts
.env
).toBeUndefined()
212 expect(pool1
.opts
.settings
).toBeUndefined()
213 await pool1
.destroy()
214 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
215 env
: { TEST
: 'test' },
216 settings
: { args
: ['--use', 'http'], silent
: true }
218 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
219 expect(pool1
.opts
.settings
).toStrictEqual({
220 args
: ['--use', 'http'],
223 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
224 args
: ['--use', 'http'],
228 await pool1
.destroy()
231 it('Should work even without opts in input', async () => {
232 const pool1
= new FixedClusterPool(
234 './tests/worker-files/cluster/testWorker.js'
236 const res
= await pool1
.execute()
237 expect(res
).toBe(false)
238 // We need to clean up the resources after our test
239 await pool1
.destroy()
242 it('Verify that a pool with zero worker fails', async () => {
245 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
246 ).toThrowError('Cannot instantiate a fixed pool with no worker')