1 import { expect } from 'expect'
2 import cluster from 'node:cluster'
4 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
6 import { TaskFunctions } from '../../test-types.cjs'
7 import { waitWorkerEvents } from '../../test-utils.cjs'
9 describe('Fixed cluster pool test suite', () => {
10 const numberOfWorkers = 8
11 const tasksConcurrency = 2
12 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
14 before('Create pools', () => {
15 pool = new FixedClusterPool(
17 './tests/worker-files/cluster/testWorker.cjs',
19 errorHandler: e => console.error(e),
22 queuePool = new FixedClusterPool(
24 './tests/worker-files/cluster/testWorker.cjs',
26 enableTasksQueue: true,
27 errorHandler: e => console.error(e),
29 concurrency: tasksConcurrency,
33 emptyPool = new FixedClusterPool(
35 './tests/worker-files/cluster/emptyWorker.cjs',
36 { exitHandler: () => console.info('empty pool worker exited') }
38 echoPool = new FixedClusterPool(
40 './tests/worker-files/cluster/echoWorker.cjs'
42 errorPool = new FixedClusterPool(
44 './tests/worker-files/cluster/errorWorker.cjs',
46 errorHandler: e => console.error(e),
49 asyncErrorPool = new FixedClusterPool(
51 './tests/worker-files/cluster/asyncErrorWorker.cjs',
53 errorHandler: e => console.error(e),
56 asyncPool = new FixedClusterPool(
58 './tests/worker-files/cluster/asyncWorker.cjs'
62 after('Destroy pools', async () => {
63 // We need to clean up the resources after our tests
64 await echoPool.destroy()
65 await asyncPool.destroy()
66 await errorPool.destroy()
67 await asyncErrorPool.destroy()
68 await emptyPool.destroy()
69 await queuePool.destroy()
72 it('Verify that the function is executed in a worker cluster', async () => {
73 let result = await pool.execute({
74 function: TaskFunctions.fibonacci,
76 expect(result).toBe(354224848179262000000)
77 result = await pool.execute({
78 function: TaskFunctions.factorial,
80 expect(result).toBe(9.33262154439441e157)
83 it('Verify that is possible to invoke the execute() method without input', async () => {
84 const result = await pool.execute()
85 expect(result).toStrictEqual({ ok: 1 })
88 it('Verify that tasks queuing is working', async () => {
89 const promises = new Set()
90 const maxMultiplier = 3 // Must be greater than tasksConcurrency
91 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
92 promises.add(queuePool.execute())
94 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
95 for (const workerNode of queuePool.workerNodes) {
96 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
97 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
98 queuePool.opts.tasksQueueOptions.concurrency
100 expect(workerNode.usage.tasks.executed).toBe(0)
101 expect(workerNode.usage.tasks.queued).toBe(
102 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
104 expect(workerNode.usage.tasks.maxQueued).toBe(
105 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
107 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
108 expect(workerNode.usage.tasks.stolen).toBe(0)
110 expect(queuePool.info.executedTasks).toBe(0)
111 expect(queuePool.info.executingTasks).toBe(
112 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
114 expect(queuePool.info.queuedTasks).toBe(
116 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
118 expect(queuePool.info.maxQueuedTasks).toBe(
120 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
122 expect(queuePool.info.backPressure).toBe(false)
123 expect(queuePool.info.stolenTasks).toBe(0)
124 await Promise.all(promises)
125 for (const workerNode of queuePool.workerNodes) {
126 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
127 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
128 numberOfWorkers * maxMultiplier
130 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
131 expect(workerNode.usage.tasks.queued).toBe(0)
132 expect(workerNode.usage.tasks.maxQueued).toBe(
133 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
135 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
138 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
139 numberOfWorkers * maxMultiplier
141 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
142 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
143 numberOfWorkers * maxMultiplier
146 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
147 expect(queuePool.info.backPressure).toBe(false)
148 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
149 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
150 numberOfWorkers * maxMultiplier
154 it('Verify that is possible to have a worker that return undefined', async () => {
155 const result = await emptyPool.execute()
156 expect(result).toBeUndefined()
159 it('Verify that data are sent to the worker correctly', async () => {
160 const data = { f: 10 }
161 const result = await echoPool.execute(data)
162 expect(result).toStrictEqual(data)
165 it('Verify that error handling is working properly:sync', async () => {
166 const data = { f: 10 }
167 expect(errorPool.emitter.eventNames()).toStrictEqual([])
169 errorPool.emitter.on(PoolEvents.taskError, e => {
172 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
175 await errorPool.execute(data)
179 expect(inError).toStrictEqual('Error Message from ClusterWorker')
180 expect(taskError).toStrictEqual({
182 message: 'Error Message from ClusterWorker',
183 name: DEFAULT_TASK_NAME,
186 errorPool.workerNodes.some(
187 workerNode => workerNode.usage.tasks.failed === 1
192 it('Verify that error handling is working properly:async', async () => {
193 const data = { f: 10 }
194 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
196 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
199 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
200 PoolEvents.taskError,
204 await asyncErrorPool.execute(data)
208 expect(inError).toStrictEqual('Error Message from ClusterWorker:async')
209 expect(taskError).toStrictEqual({
211 message: 'Error Message from ClusterWorker:async',
212 name: DEFAULT_TASK_NAME,
215 asyncErrorPool.workerNodes.some(
216 workerNode => workerNode.usage.tasks.failed === 1
221 it('Verify that async function is working properly', async () => {
222 const data = { f: 10 }
223 const startTime = performance.now()
224 const result = await asyncPool.execute(data)
225 const usedTime = performance.now() - startTime
226 expect(result).toStrictEqual(data)
227 expect(usedTime).toBeGreaterThanOrEqual(2000)
230 it('Shutdown test', async () => {
231 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
232 expect(pool.emitter.eventNames()).toStrictEqual([])
234 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
235 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
237 const numberOfExitEvents = await exitPromise
238 expect(pool.info.started).toBe(false)
239 expect(pool.info.ready).toBe(false)
240 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
241 expect(pool.readyEventEmitted).toBe(false)
242 expect(pool.busyEventEmitted).toBe(false)
243 expect(pool.backPressureEventEmitted).toBe(false)
244 expect(pool.workerNodes.length).toBe(0)
245 expect(numberOfExitEvents).toBe(numberOfWorkers)
246 expect(poolDestroy).toBe(1)
249 it('Verify that cluster pool options are checked', async () => {
250 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
251 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
252 expect(pool.opts.env).toBeUndefined()
253 expect(pool.opts.settings).toBeUndefined()
254 expect(cluster.settings).toMatchObject({
255 exec: workerFilePath,
259 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
260 env: { TEST: 'test' },
261 settings: { args: ['--use', 'http'], silent: true },
263 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
264 expect(pool.opts.settings).toStrictEqual({
265 args: ['--use', 'http'],
268 expect(cluster.settings).toMatchObject({
269 args: ['--use', 'http'],
270 exec: workerFilePath,
276 it('Verify destroyWorkerNode()', async () => {
277 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
278 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
279 const workerNodeKey = 0
280 let disconnectEvent = 0
281 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
285 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
288 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
289 expect(disconnectEvent).toBe(1)
290 expect(exitEvent).toBe(1)
291 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
292 expect(pool.workerNodes.length).toBe(numberOfWorkers)
296 it('Verify that a pool with zero worker fails', () => {
299 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
300 ).toThrow('Cannot instantiate a fixed pool with zero worker')