1 import cluster from 'node:cluster'
2 import { afterAll, beforeAll, describe, expect, it } from 'vitest'
4 import { FixedClusterPool, PoolEvents } from '../../../lib/index.mjs'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.mjs'
6 import { TaskFunctions } from '../../test-types.cjs'
7 import { sleep, waitWorkerEvents } from '../../test-utils.cjs'
9 describe('Fixed cluster pool test suite', () => {
10 const numberOfWorkers = 8
11 const tasksConcurrency = 2
12 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
15 pool = new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.cjs',
19 errorHandler: e => console.error(e),
22 queuePool = new FixedClusterPool(
24 './tests/worker-files/cluster/testWorker.cjs',
26 enableTasksQueue: true,
27 errorHandler: e => console.error(e),
29 concurrency: tasksConcurrency,
33 emptyPool = new FixedClusterPool(
35 './tests/worker-files/cluster/emptyWorker.cjs',
36 { exitHandler: () => console.info('empty pool worker exited') }
38 echoPool = new FixedClusterPool(
40 './tests/worker-files/cluster/echoWorker.cjs'
42 errorPool = new FixedClusterPool(
44 './tests/worker-files/cluster/errorWorker.cjs',
46 errorHandler: e => console.error(e),
49 asyncErrorPool = new FixedClusterPool(
51 './tests/worker-files/cluster/asyncErrorWorker.cjs',
53 errorHandler: e => console.error(e),
56 asyncPool = new FixedClusterPool(
58 './tests/worker-files/cluster/asyncWorker.cjs'
62 afterAll(async () => {
63 // Skip on CI to avoid afterAll hook timeout
64 if (process.env.CI != null) return
65 // We need to clean up the resources after our tests
66 await echoPool.destroy()
67 await asyncPool.destroy()
68 await errorPool.destroy()
69 await asyncErrorPool.destroy()
70 await emptyPool.destroy()
71 await queuePool.destroy()
74 it('Verify that the function is executed in a worker cluster', async () => {
75 let result = await pool.execute(
77 function: TaskFunctions.fibonacci,
80 AbortSignal.timeout(2000)
82 expect(result).toBe(354224848179262000000)
83 result = await pool.execute(
85 function: TaskFunctions.factorial,
88 AbortSignal.timeout(2000)
90 expect(result).toBe(9.33262154439441e157)
93 it('Verify that is possible to invoke the execute() method without input', async () => {
94 const result = await pool.execute()
95 expect(result).toStrictEqual({ ok: 1 })
98 it('Verify that tasks queuing is working', async () => {
99 const promises = new Set()
100 const maxMultiplier = 3 // Must be greater than tasksConcurrency
101 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
102 promises.add(queuePool.execute())
104 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
105 for (const workerNode of queuePool.workerNodes) {
106 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
107 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
108 queuePool.opts.tasksQueueOptions.concurrency
110 expect(workerNode.usage.tasks.executed).toBe(0)
111 expect(workerNode.usage.tasks.queued).toBe(
112 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
114 expect(workerNode.usage.tasks.maxQueued).toBe(
115 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
117 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
118 expect(workerNode.usage.tasks.stolen).toBe(0)
120 expect(queuePool.info.executedTasks).toBe(0)
121 expect(queuePool.info.executingTasks).toBe(
122 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
124 expect(queuePool.info.queuedTasks).toBe(
126 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
128 expect(queuePool.info.maxQueuedTasks).toBe(
130 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
132 expect(queuePool.info.backPressure).toBe(false)
133 expect(queuePool.info.stolenTasks).toBe(0)
134 await Promise.all(promises)
135 for (const workerNode of queuePool.workerNodes) {
136 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
137 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
138 numberOfWorkers * maxMultiplier
140 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
141 expect(workerNode.usage.tasks.queued).toBe(0)
142 expect(workerNode.usage.tasks.maxQueued).toBe(
143 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
145 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
148 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
149 numberOfWorkers * maxMultiplier
151 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
153 numberOfWorkers * maxMultiplier
156 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
157 expect(queuePool.info.backPressure).toBe(false)
158 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
159 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
160 numberOfWorkers * maxMultiplier
164 it('Verify that is possible to have a worker that return undefined', async () => {
165 const result = await emptyPool.execute()
166 expect(result).toBeUndefined()
169 it('Verify that data are sent to the worker correctly', async () => {
170 const data = { f: 10 }
171 const result = await echoPool.execute(data)
172 expect(result).toStrictEqual(data)
175 it('Verify that error handling is working properly:sync', async () => {
176 const data = { f: 10 }
177 expect(errorPool.emitter.eventNames()).toStrictEqual([])
179 errorPool.emitter.on(PoolEvents.taskError, e => {
182 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
185 await errorPool.execute(data)
189 expect(inError).toBeInstanceOf(Error)
190 expect(inError.message).toStrictEqual('Error Message from ClusterWorker')
191 expect(typeof inError.stack === 'string').toBe(true)
192 expect(taskError).toStrictEqual({
195 message: inError.message,
196 name: DEFAULT_TASK_NAME,
197 stack: inError.stack,
200 errorPool.workerNodes.some(
201 workerNode => workerNode.usage.tasks.failed === 1
206 it('Verify that error handling is working properly:async', async () => {
207 const data = { f: 10 }
208 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
210 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
213 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
214 PoolEvents.taskError,
218 await asyncErrorPool.execute(data)
222 expect(inError).toBeInstanceOf(Error)
223 expect(inError.message).toStrictEqual(
224 'Error Message from ClusterWorker:async'
226 expect(typeof inError.stack === 'string').toBe(true)
227 expect(taskError).toStrictEqual({
230 message: inError.message,
231 name: DEFAULT_TASK_NAME,
232 stack: inError.stack,
235 asyncErrorPool.workerNodes.some(
236 workerNode => workerNode.usage.tasks.failed === 1
241 it('Verify that async function is working properly', async () => {
242 const data = { f: 10 }
243 const startTime = performance.now()
244 const result = await asyncPool.execute(data)
245 const usedTime = performance.now() - startTime
246 expect(result).toStrictEqual(data)
247 expect(usedTime).toBeGreaterThanOrEqual(2000)
250 it('Verify that task can be aborted', async () => {
254 await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
258 expect(error).toBeInstanceOf(Error)
259 expect(error.name).toBe('TimeoutError')
260 expect(error.message).toBe('The operation was aborted due to timeout')
261 expect(error.stack).toBeDefined()
263 const abortController = new AbortController()
265 abortController.abort(new Error('Task aborted'))
268 await asyncErrorPool.execute({}, 'default', abortController.signal)
272 expect(error).toBeInstanceOf(Error)
273 expect(error.message).toBe('Task aborted')
274 expect(error.stack).toBeDefined()
277 it('Shutdown test', { retry: 0 }, async ({ skip }) => {
278 if (process.env.CI != null) {
282 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
283 expect(pool.emitter.eventNames()).toStrictEqual([])
285 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
286 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
288 const exitEvents = await exitPromise
289 expect(pool.info.started).toBe(false)
290 expect(pool.info.ready).toBe(false)
291 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
292 expect(pool.readyEventEmitted).toBe(false)
293 expect(pool.busyEventEmitted).toBe(false)
294 expect(pool.backPressureEventEmitted).toBe(false)
295 expect(pool.workerNodes.length).toBe(0)
296 expect(exitEvents).toBe(numberOfWorkers)
297 expect(poolDestroy).toBe(1)
300 it('Verify that cluster pool options are checked', async () => {
301 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
302 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
303 expect(pool.opts.env).toBeUndefined()
304 expect(pool.opts.settings).toBeUndefined()
305 expect(cluster.settings).toMatchObject({
306 exec: workerFilePath,
310 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
311 env: { TEST: 'test' },
312 settings: { args: ['--use', 'http'], silent: true },
314 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
315 expect(pool.opts.settings).toStrictEqual({
316 args: ['--use', 'http'],
319 expect(cluster.settings).toMatchObject({
320 args: ['--use', 'http'],
321 exec: workerFilePath,
327 it('Verify destroyWorkerNode()', async () => {
328 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
329 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
330 const workerNodeKey = 0
331 let disconnectEvent = 0
332 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
336 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
339 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
340 expect(disconnectEvent).toBe(1)
341 expect(exitEvent).toBe(1)
342 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
343 expect(pool.workerNodes.length).toBe(numberOfWorkers)
348 it('Verify that a pool with zero worker fails', () => {
351 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
352 ).toThrow('Cannot instantiate a fixed pool with zero worker')