1 import { expect } from 'expect'
2 import { FixedClusterPool, PoolEvents } from '../../../lib/index.js'
3 import { TaskFunctions } from '../../test-types.js'
4 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
7 describe('Fixed cluster pool test suite', () => {
8 const numberOfWorkers = 8
9 const tasksConcurrency = 2
10 const pool = new FixedClusterPool(
12 './tests/worker-files/cluster/testWorker.js',
14 errorHandler: e => console.error(e)
17 const queuePool = new FixedClusterPool(
19 './tests/worker-files/cluster/testWorker.js',
21 enableTasksQueue: true,
23 concurrency: tasksConcurrency
25 errorHandler: e => console.error(e)
28 const emptyPool = new FixedClusterPool(
30 './tests/worker-files/cluster/emptyWorker.js',
31 { exitHandler: () => console.info('empty pool worker exited') }
33 const echoPool = new FixedClusterPool(
35 './tests/worker-files/cluster/echoWorker.js'
37 const errorPool = new FixedClusterPool(
39 './tests/worker-files/cluster/errorWorker.js',
41 errorHandler: e => console.error(e)
44 const asyncErrorPool = new FixedClusterPool(
46 './tests/worker-files/cluster/asyncErrorWorker.js',
48 errorHandler: e => console.error(e)
51 const asyncPool = new FixedClusterPool(
53 './tests/worker-files/cluster/asyncWorker.js'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
66 it('Verify that the function is executed in a worker cluster', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
74 expect(result).toBe(9.33262154439441e157)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedClusterPool(
85 './tests/worker-files/cluster/testWorker.js',
87 errorHandler: e => console.error(e)
90 expect(pool.emitter.eventNames()).toStrictEqual([])
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
95 expect(poolReady).toBe(1)
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
101 expect(pool.emitter.eventNames()).toStrictEqual([])
103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
105 for (let i = 0; i < numberOfWorkers * 2; i++) {
106 promises.add(pool.execute())
108 await Promise.all(promises)
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfWorkers + 1)
114 it('Verify that tasks queuing is working', async () => {
115 const promises = new Set()
116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
117 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
118 promises.add(queuePool.execute())
120 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
124 queuePool.opts.tasksQueueOptions.concurrency
126 expect(workerNode.usage.tasks.executed).toBe(0)
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
134 expect(workerNode.usage.tasks.stolen).toBe(0)
136 expect(queuePool.info.executedTasks).toBe(0)
137 expect(queuePool.info.executingTasks).toBe(
138 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
140 expect(queuePool.info.queuedTasks).toBe(
142 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
144 expect(queuePool.info.maxQueuedTasks).toBe(
146 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
148 expect(queuePool.info.backPressure).toBe(false)
149 expect(queuePool.info.stolenTasks).toBe(0)
150 await Promise.all(promises)
151 for (const workerNode of queuePool.workerNodes) {
152 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
153 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
154 numberOfWorkers * maxMultiplier
156 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
157 expect(workerNode.usage.tasks.queued).toBe(0)
158 expect(workerNode.usage.tasks.maxQueued).toBe(
159 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
165 numberOfWorkers * maxMultiplier
167 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
168 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
169 numberOfWorkers * maxMultiplier
172 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
173 expect(queuePool.info.backPressure).toBe(false)
174 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
175 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
176 numberOfWorkers * maxMultiplier
180 it('Verify that is possible to have a worker that return undefined', async () => {
181 const result = await emptyPool.execute()
182 expect(result).toBeUndefined()
185 it('Verify that data are sent to the worker correctly', async () => {
186 const data = { f: 10 }
187 const result = await echoPool.execute(data)
188 expect(result).toStrictEqual(data)
191 it('Verify that error handling is working properly:sync', async () => {
192 const data = { f: 10 }
193 expect(errorPool.emitter.eventNames()).toStrictEqual([])
195 errorPool.emitter.on(PoolEvents.taskError, e => {
198 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
201 await errorPool.execute(data)
205 expect(inError).toBeDefined()
206 expect(typeof inError === 'string').toBe(true)
207 expect(inError).toBe('Error Message from ClusterWorker')
208 expect(taskError).toStrictEqual({
209 name: DEFAULT_TASK_NAME,
210 message: 'Error Message from ClusterWorker',
214 errorPool.workerNodes.some(
215 workerNode => workerNode.usage.tasks.failed === 1
220 it('Verify that error handling is working properly:async', async () => {
221 const data = { f: 10 }
222 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
224 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
227 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
232 await asyncErrorPool.execute(data)
236 expect(inError).toBeDefined()
237 expect(typeof inError === 'string').toBe(true)
238 expect(inError).toBe('Error Message from ClusterWorker:async')
239 expect(taskError).toStrictEqual({
240 name: DEFAULT_TASK_NAME,
241 message: 'Error Message from ClusterWorker:async',
245 asyncErrorPool.workerNodes.some(
246 workerNode => workerNode.usage.tasks.failed === 1
251 it('Verify that async function is working properly', async () => {
252 const data = { f: 10 }
253 const startTime = performance.now()
254 const result = await asyncPool.execute(data)
255 const usedTime = performance.now() - startTime
256 expect(result).toStrictEqual(data)
257 expect(usedTime).toBeGreaterThanOrEqual(2000)
260 it('Shutdown test', async () => {
261 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
262 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
264 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
265 expect(pool.emitter.eventNames()).toStrictEqual([
270 const numberOfExitEvents = await exitPromise
271 expect(pool.started).toBe(false)
272 expect(pool.readyEventEmitted).toBe(false)
273 expect(pool.workerNodes.length).toBe(0)
274 expect(numberOfExitEvents).toBe(numberOfWorkers)
275 expect(poolDestroy).toBe(1)
278 it('Verify that cluster pool options are checked', async () => {
279 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
280 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
281 expect(pool.opts.env).toBeUndefined()
282 expect(pool.opts.settings).toBeUndefined()
284 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
285 env: { TEST: 'test' },
286 settings: { args: ['--use', 'http'], silent: true }
288 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
289 expect(pool.opts.settings).toStrictEqual({
290 args: ['--use', 'http'],
293 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
294 args: ['--use', 'http'],
301 it('Should work even without opts in input', async () => {
302 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
303 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
304 const res = await pool.execute()
305 expect(res).toStrictEqual({ ok: 1 })
306 // We need to clean up the resources after our test
310 it('Verify destroyWorkerNode()', async () => {
311 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
312 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
313 const workerNodeKey = 0
314 let disconnectEvent = 0
315 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
319 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
322 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
323 expect(disconnectEvent).toBe(1)
324 expect(exitEvent).toBe(1)
325 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
329 it('Verify that a pool with zero worker fails', () => {
332 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
333 ).toThrow('Cannot instantiate a fixed pool with zero worker')