1 const { expect
} = require('expect')
2 const { FixedClusterPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed cluster pool test suite', () => {
7 const numberOfWorkers
= 6
8 const pool
= new FixedClusterPool(
10 './tests/worker-files/cluster/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedClusterPool(
28 './tests/worker-files/cluster/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedClusterPool(
33 './tests/worker-files/cluster/echoWorker.js'
35 const errorPool
= new FixedClusterPool(
37 './tests/worker-files/cluster/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedClusterPool(
44 './tests/worker-files/cluster/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedClusterPool(
51 './tests/worker-files/cluster/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker cluster', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedClusterPool(
83 './tests/worker-files/cluster/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
89 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
90 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
91 expect(poolReady
).toBe(1)
94 it("Verify that 'busy' event is emitted", () => {
96 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
97 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
100 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
101 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
102 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
105 it('Verify that tasks queuing is working', async () => {
106 const promises
= new Set()
107 const maxMultiplier
= 2
108 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
109 promises
.add(queuePool
.execute())
111 expect(promises
.size
).toBe(numberOfWorkers
* maxMultiplier
)
112 for (const workerNode
of queuePool
.workerNodes
) {
113 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
114 queuePool
.opts
.tasksQueueOptions
.concurrency
116 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
117 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
118 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
120 expect(queuePool
.info
.executingTasks
).toBe(numberOfWorkers
)
121 expect(queuePool
.info
.queuedTasks
).toBe(
122 numberOfWorkers
* maxMultiplier
- numberOfWorkers
124 expect(queuePool
.info
.maxQueuedTasks
).toBe(
125 numberOfWorkers
* maxMultiplier
- numberOfWorkers
127 await Promise
.all(promises
)
128 for (const workerNode
of queuePool
.workerNodes
) {
129 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
130 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
131 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
132 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
133 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
137 it('Verify that is possible to have a worker that return undefined', async () => {
138 const result
= await emptyPool
.execute()
139 expect(result
).toBeUndefined()
142 it('Verify that data are sent to the worker correctly', async () => {
143 const data
= { f
: 10 }
144 const result
= await echoPool
.execute(data
)
145 expect(result
).toStrictEqual(data
)
148 it('Verify that error handling is working properly:sync', async () => {
149 const data
= { f
: 10 }
151 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
156 await errorPool
.execute(data
)
160 expect(inError
).toBeDefined()
161 expect(typeof inError
=== 'string').toBe(true)
162 expect(inError
).toBe('Error Message from ClusterWorker')
163 expect(taskError
).toStrictEqual({
165 message
: 'Error Message from ClusterWorker',
169 errorPool
.workerNodes
.some(
170 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
175 it('Verify that error handling is working properly:async', async () => {
176 const data
= { f
: 10 }
178 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
183 await asyncErrorPool
.execute(data
)
187 expect(inError
).toBeDefined()
188 expect(typeof inError
=== 'string').toBe(true)
189 expect(inError
).toBe('Error Message from ClusterWorker:async')
190 expect(taskError
).toStrictEqual({
192 message
: 'Error Message from ClusterWorker:async',
196 asyncErrorPool
.workerNodes
.some(
197 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
202 it('Verify that async function is working properly', async () => {
203 const data
= { f
: 10 }
204 const startTime
= performance
.now()
205 const result
= await asyncPool
.execute(data
)
206 const usedTime
= performance
.now() - startTime
207 expect(result
).toStrictEqual(data
)
208 expect(usedTime
).toBeGreaterThanOrEqual(2000)
211 it('Shutdown test', async () => {
212 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfWorkers
)
214 const numberOfExitEvents
= await exitPromise
215 expect(numberOfExitEvents
).toBe(numberOfWorkers
)
218 it('Verify that cluster pool options are checked', async () => {
219 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
220 let pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
)
221 expect(pool1
.opts
.env
).toBeUndefined()
222 expect(pool1
.opts
.settings
).toBeUndefined()
223 await pool1
.destroy()
224 pool1
= new FixedClusterPool(numberOfWorkers
, workerFilePath
, {
225 env
: { TEST
: 'test' },
226 settings
: { args
: ['--use', 'http'], silent
: true }
228 expect(pool1
.opts
.env
).toStrictEqual({ TEST
: 'test' })
229 expect(pool1
.opts
.settings
).toStrictEqual({
230 args
: ['--use', 'http'],
233 expect({ ...pool1
.opts
.settings
, exec
: workerFilePath
}).toStrictEqual({
234 args
: ['--use', 'http'],
238 await pool1
.destroy()
241 it('Should work even without opts in input', async () => {
242 const pool1
= new FixedClusterPool(
244 './tests/worker-files/cluster/testWorker.js'
246 const res
= await pool1
.execute()
247 expect(res
).toStrictEqual({ ok
: 1 })
248 // We need to clean up the resources after our test
249 await pool1
.destroy()
252 it('Verify that a pool with zero worker fails', async () => {
255 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
256 ).toThrowError('Cannot instantiate a fixed pool with zero worker')