1 import { expect } from 'expect'
2 import { FixedClusterPool, PoolEvents } from '../../../lib/index.js'
3 import { TaskFunctions } from '../../test-types.js'
4 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
7 describe('Fixed cluster pool test suite', () => {
8 const numberOfWorkers = 8
9 const tasksConcurrency = 2
10 const pool = new FixedClusterPool(
12 './tests/worker-files/cluster/testWorker.js',
14 errorHandler: e => console.error(e)
17 const queuePool = new FixedClusterPool(
19 './tests/worker-files/cluster/testWorker.js',
21 enableTasksQueue: true,
23 concurrency: tasksConcurrency
25 errorHandler: e => console.error(e)
28 const emptyPool = new FixedClusterPool(
30 './tests/worker-files/cluster/emptyWorker.js',
31 { exitHandler: () => console.info('empty pool worker exited') }
33 const echoPool = new FixedClusterPool(
35 './tests/worker-files/cluster/echoWorker.js'
37 const errorPool = new FixedClusterPool(
39 './tests/worker-files/cluster/errorWorker.js',
41 errorHandler: e => console.error(e)
44 const asyncErrorPool = new FixedClusterPool(
46 './tests/worker-files/cluster/asyncErrorWorker.js',
48 errorHandler: e => console.error(e)
51 const asyncPool = new FixedClusterPool(
53 './tests/worker-files/cluster/asyncWorker.js'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
66 it('Verify that the function is executed in a worker cluster', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
74 expect(result).toBe(9.33262154439441e157)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedClusterPool(
85 './tests/worker-files/cluster/testWorker.js',
87 errorHandler: e => console.error(e)
90 expect(pool.emitter.eventNames()).toStrictEqual([])
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
95 expect(poolReady).toBe(1)
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
101 expect(pool.emitter.eventNames()).toStrictEqual([])
103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
105 for (let i = 0; i < numberOfWorkers * 2; i++) {
106 promises.add(pool.execute())
108 await Promise.all(promises)
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfWorkers + 1)
114 it('Verify that tasks queuing is working', async () => {
115 const promises = new Set()
116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
117 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
118 promises.add(queuePool.execute())
120 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
124 queuePool.opts.tasksQueueOptions.concurrency
126 expect(workerNode.usage.tasks.executed).toBe(0)
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
134 expect(workerNode.usage.tasks.stolen).toBe(0)
136 expect(queuePool.info.executedTasks).toBe(0)
137 expect(queuePool.info.executingTasks).toBe(
138 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
140 expect(queuePool.info.queuedTasks).toBe(
142 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
144 expect(queuePool.info.maxQueuedTasks).toBe(
146 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
148 expect(queuePool.info.backPressure).toBe(false)
149 expect(queuePool.info.stolenTasks).toBe(0)
150 await Promise.all(promises)
151 for (const workerNode of queuePool.workerNodes) {
152 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
153 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
154 numberOfWorkers * maxMultiplier
156 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
157 expect(workerNode.usage.tasks.queued).toBe(0)
158 expect(workerNode.usage.tasks.maxQueued).toBe(
159 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
165 numberOfWorkers * maxMultiplier
167 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
168 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
169 numberOfWorkers * maxMultiplier
172 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
173 expect(queuePool.info.backPressure).toBe(false)
174 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
175 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
176 numberOfWorkers * maxMultiplier
180 it('Verify that is possible to have a worker that return undefined', async () => {
181 const result = await emptyPool.execute()
182 expect(result).toBeUndefined()
185 it('Verify that data are sent to the worker correctly', async () => {
186 const data = { f: 10 }
187 const result = await echoPool.execute(data)
188 expect(result).toStrictEqual(data)
191 it('Verify that error handling is working properly:sync', async () => {
192 const data = { f: 10 }
193 expect(errorPool.emitter.eventNames()).toStrictEqual([])
195 errorPool.emitter.on(PoolEvents.taskError, e => {
198 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
201 await errorPool.execute(data)
205 expect(inError).toBeDefined()
206 expect(typeof inError === 'string').toBe(true)
207 expect(inError).toBe('Error Message from ClusterWorker')
208 expect(taskError).toStrictEqual({
209 name: DEFAULT_TASK_NAME,
210 message: 'Error Message from ClusterWorker',
214 errorPool.workerNodes.some(
215 workerNode => workerNode.usage.tasks.failed === 1
220 it('Verify that error handling is working properly:async', async () => {
221 const data = { f: 10 }
222 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
224 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
227 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
232 await asyncErrorPool.execute(data)
236 expect(inError).toBeDefined()
237 expect(typeof inError === 'string').toBe(true)
238 expect(inError).toBe('Error Message from ClusterWorker:async')
239 expect(taskError).toStrictEqual({
240 name: DEFAULT_TASK_NAME,
241 message: 'Error Message from ClusterWorker:async',
245 asyncErrorPool.workerNodes.some(
246 workerNode => workerNode.usage.tasks.failed === 1
251 it('Verify that async function is working properly', async () => {
252 const data = { f: 10 }
253 const startTime = performance.now()
254 const result = await asyncPool.execute(data)
255 const usedTime = performance.now() - startTime
256 expect(result).toStrictEqual(data)
257 expect(usedTime).toBeGreaterThanOrEqual(2000)
260 it('Shutdown test', async () => {
261 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
262 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
264 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
265 expect(pool.emitter.eventNames()).toStrictEqual([
270 const numberOfExitEvents = await exitPromise
271 expect(pool.started).toBe(false)
272 expect(pool.emitter.eventNames()).toStrictEqual([])
273 expect(pool.readyEventEmitted).toBe(false)
274 expect(pool.workerNodes.length).toBe(0)
275 expect(numberOfExitEvents).toBe(numberOfWorkers)
276 expect(poolDestroy).toBe(1)
279 it('Verify that cluster pool options are checked', async () => {
280 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
281 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
282 expect(pool.opts.env).toBeUndefined()
283 expect(pool.opts.settings).toBeUndefined()
285 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
286 env: { TEST: 'test' },
287 settings: { args: ['--use', 'http'], silent: true }
289 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
290 expect(pool.opts.settings).toStrictEqual({
291 args: ['--use', 'http'],
294 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
295 args: ['--use', 'http'],
302 it('Should work even without opts in input', async () => {
303 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
304 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
305 const res = await pool.execute()
306 expect(res).toStrictEqual({ ok: 1 })
307 // We need to clean up the resources after our test
311 it('Verify destroyWorkerNode()', async () => {
312 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
313 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
314 const workerNodeKey = 0
315 let disconnectEvent = 0
316 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
320 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
323 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
324 expect(disconnectEvent).toBe(1)
325 expect(exitEvent).toBe(1)
326 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
330 it('Verify that a pool with zero worker fails', () => {
333 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
334 ).toThrow('Cannot instantiate a fixed pool with zero worker')