1 import { expect } from '@std/expect'
2 import cluster from 'node:cluster'
4 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
6 import { TaskFunctions } from '../../test-types.cjs'
7 import { waitWorkerEvents } from '../../test-utils.cjs'
9 describe('Fixed cluster pool test suite', () => {
10 const numberOfWorkers = 8
11 const tasksConcurrency = 2
12 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
14 before('Create pools', () => {
15 pool = new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.cjs',
19 errorHandler: e => console.error(e),
22 queuePool = new FixedClusterPool(
24 './tests/worker-files/cluster/testWorker.cjs',
26 enableTasksQueue: true,
27 errorHandler: e => console.error(e),
29 concurrency: tasksConcurrency,
33 emptyPool = new FixedClusterPool(
35 './tests/worker-files/cluster/emptyWorker.cjs',
36 { exitHandler: () => console.info('empty pool worker exited') }
38 echoPool = new FixedClusterPool(
40 './tests/worker-files/cluster/echoWorker.cjs'
42 errorPool = new FixedClusterPool(
44 './tests/worker-files/cluster/errorWorker.cjs',
46 errorHandler: e => console.error(e),
49 asyncErrorPool = new FixedClusterPool(
51 './tests/worker-files/cluster/asyncErrorWorker.cjs',
53 errorHandler: e => console.error(e),
56 asyncPool = new FixedClusterPool(
58 './tests/worker-files/cluster/asyncWorker.cjs'
62 after('Destroy pools', async () => {
63 // We need to clean up the resources after our tests
64 await echoPool.destroy()
65 await asyncPool.destroy()
66 await errorPool.destroy()
67 await asyncErrorPool.destroy()
68 await emptyPool.destroy()
69 await queuePool.destroy()
72 it('Verify that the function is executed in a worker cluster', async () => {
73 let result = await pool.execute(
75 function: TaskFunctions.fibonacci,
78 AbortSignal.timeout(2000)
80 expect(result).toBe(354224848179262000000)
81 result = await pool.execute(
83 function: TaskFunctions.factorial,
86 AbortSignal.timeout(2000)
88 expect(result).toBe(9.33262154439441e157)
91 it('Verify that is possible to invoke the execute() method without input', async () => {
92 const result = await pool.execute()
93 expect(result).toStrictEqual({ ok: 1 })
96 it('Verify that tasks queuing is working', async () => {
97 const promises = new Set()
98 const maxMultiplier = 3 // Must be greater than tasksConcurrency
99 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
100 promises.add(queuePool.execute())
102 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
103 for (const workerNode of queuePool.workerNodes) {
104 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
105 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
106 queuePool.opts.tasksQueueOptions.concurrency
108 expect(workerNode.usage.tasks.executed).toBe(0)
109 expect(workerNode.usage.tasks.queued).toBe(
110 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
112 expect(workerNode.usage.tasks.maxQueued).toBe(
113 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
115 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
116 expect(workerNode.usage.tasks.stolen).toBe(0)
118 expect(queuePool.info.executedTasks).toBe(0)
119 expect(queuePool.info.executingTasks).toBe(
120 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
122 expect(queuePool.info.queuedTasks).toBe(
124 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
126 expect(queuePool.info.maxQueuedTasks).toBe(
128 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
130 expect(queuePool.info.backPressure).toBe(false)
131 expect(queuePool.info.stolenTasks).toBe(0)
132 await Promise.all(promises)
133 for (const workerNode of queuePool.workerNodes) {
134 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
135 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
136 numberOfWorkers * maxMultiplier
138 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
139 expect(workerNode.usage.tasks.queued).toBe(0)
140 expect(workerNode.usage.tasks.maxQueued).toBe(
141 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
143 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
146 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
147 numberOfWorkers * maxMultiplier
149 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
150 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
151 numberOfWorkers * maxMultiplier
154 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
155 expect(queuePool.info.backPressure).toBe(false)
156 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
157 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
158 numberOfWorkers * maxMultiplier
162 it('Verify that is possible to have a worker that return undefined', async () => {
163 const result = await emptyPool.execute()
164 expect(result).toBeUndefined()
167 it('Verify that data are sent to the worker correctly', async () => {
168 const data = { f: 10 }
169 const result = await echoPool.execute(data)
170 expect(result).toStrictEqual(data)
173 it('Verify that error handling is working properly:sync', async () => {
174 const data = { f: 10 }
175 expect(errorPool.emitter.eventNames()).toStrictEqual([])
177 errorPool.emitter.on(PoolEvents.taskError, e => {
180 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
183 await errorPool.execute(data)
187 expect(inError).toBeInstanceOf(Error)
188 expect(inError.message).toStrictEqual('Error Message from ClusterWorker')
189 expect(typeof inError.stack === 'string').toBe(true)
190 expect(taskError).toStrictEqual({
193 message: inError.message,
194 name: DEFAULT_TASK_NAME,
195 stack: inError.stack,
198 errorPool.workerNodes.some(
199 workerNode => workerNode.usage.tasks.failed === 1
204 it('Verify that error handling is working properly:async', async () => {
205 const data = { f: 10 }
206 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
208 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
211 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
212 PoolEvents.taskError,
216 await asyncErrorPool.execute(data)
220 expect(inError).toBeInstanceOf(Error)
221 expect(inError.message).toStrictEqual(
222 'Error Message from ClusterWorker:async'
224 expect(typeof inError.stack === 'string').toBe(true)
225 expect(taskError).toStrictEqual({
228 message: inError.message,
229 name: DEFAULT_TASK_NAME,
230 stack: inError.stack,
233 asyncErrorPool.workerNodes.some(
234 workerNode => workerNode.usage.tasks.failed === 1
239 it('Verify that async function is working properly', async () => {
240 const data = { f: 10 }
241 const startTime = performance.now()
242 const result = await asyncPool.execute(data)
243 const usedTime = performance.now() - startTime
244 expect(result).toStrictEqual(data)
245 expect(usedTime).toBeGreaterThanOrEqual(2000)
248 it('Verify that task can be aborted', async () => {
252 await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
256 expect(error).toBeInstanceOf(Error)
257 expect(error.name).toBe('TimeoutError')
258 expect(error.message).toBe('The operation was aborted due to timeout')
259 expect(error.stack).toBeDefined()
261 const abortController = new AbortController()
263 abortController.abort(new Error('Task aborted'))
266 await asyncErrorPool.execute({}, 'default', abortController.signal)
270 expect(error).toBeInstanceOf(Error)
271 expect(error.message).toBe('Task aborted')
272 expect(error.stack).toBeDefined()
275 it('Shutdown test', async () => {
276 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
277 expect(pool.emitter.eventNames()).toStrictEqual([])
279 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
280 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
282 const numberOfExitEvents = await exitPromise
283 expect(pool.info.started).toBe(false)
284 expect(pool.info.ready).toBe(false)
285 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
286 expect(pool.readyEventEmitted).toBe(false)
287 expect(pool.busyEventEmitted).toBe(false)
288 expect(pool.backPressureEventEmitted).toBe(false)
289 expect(pool.workerNodes.length).toBe(0)
290 expect(numberOfExitEvents).toBe(numberOfWorkers)
291 expect(poolDestroy).toBe(1)
294 it('Verify that cluster pool options are checked', async () => {
295 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
296 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
297 expect(pool.opts.env).toBeUndefined()
298 expect(pool.opts.settings).toBeUndefined()
299 expect(cluster.settings).toMatchObject({
300 exec: workerFilePath,
304 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
305 env: { TEST: 'test' },
306 settings: { args: ['--use', 'http'], silent: true },
308 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
309 expect(pool.opts.settings).toStrictEqual({
310 args: ['--use', 'http'],
313 expect(cluster.settings).toMatchObject({
314 args: ['--use', 'http'],
315 exec: workerFilePath,
321 it('Verify destroyWorkerNode()', async () => {
322 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
323 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
324 const workerNodeKey = 0
325 let disconnectEvent = 0
326 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
330 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
333 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
334 expect(disconnectEvent).toBe(1)
335 expect(exitEvent).toBe(1)
336 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
337 expect(pool.workerNodes.length).toBe(numberOfWorkers)
341 it('Verify that a pool with zero worker fails', () => {
344 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
345 ).toThrow('Cannot instantiate a fixed pool with zero worker')