1 import cluster from 'node:cluster'
3 import { expect } from 'expect'
5 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
6 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
7 import { TaskFunctions } from '../../test-types.cjs'
8 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
10 describe('Fixed cluster pool test suite', () => {
11 const numberOfWorkers = 8
12 const tasksConcurrency = 2
13 const pool = new FixedClusterPool(
15 './tests/worker-files/cluster/testWorker.cjs',
17 errorHandler: e => console.error(e),
20 const queuePool = new FixedClusterPool(
22 './tests/worker-files/cluster/testWorker.cjs',
24 enableTasksQueue: true,
26 concurrency: tasksConcurrency,
28 errorHandler: e => console.error(e),
31 const emptyPool = new FixedClusterPool(
33 './tests/worker-files/cluster/emptyWorker.cjs',
34 { exitHandler: () => console.info('empty pool worker exited') }
36 const echoPool = new FixedClusterPool(
38 './tests/worker-files/cluster/echoWorker.cjs'
40 const errorPool = new FixedClusterPool(
42 './tests/worker-files/cluster/errorWorker.cjs',
44 errorHandler: e => console.error(e),
47 const asyncErrorPool = new FixedClusterPool(
49 './tests/worker-files/cluster/asyncErrorWorker.cjs',
51 errorHandler: e => console.error(e),
54 const asyncPool = new FixedClusterPool(
56 './tests/worker-files/cluster/asyncWorker.cjs'
59 after('Destroy all pools', async () => {
60 // We need to clean up the resources after our test
61 await echoPool.destroy()
62 await asyncPool.destroy()
63 await errorPool.destroy()
64 await asyncErrorPool.destroy()
65 await emptyPool.destroy()
66 await queuePool.destroy()
69 it('Verify that the function is executed in a worker cluster', async () => {
70 let result = await pool.execute({
71 function: TaskFunctions.fibonacci,
73 expect(result).toBe(354224848179262000000)
74 result = await pool.execute({
75 function: TaskFunctions.factorial,
77 expect(result).toBe(9.33262154439441e157)
80 it('Verify that is possible to invoke the execute() method without input', async () => {
81 const result = await pool.execute()
82 expect(result).toStrictEqual({ ok: 1 })
85 it("Verify that 'ready' event is emitted", async () => {
86 const pool = new FixedClusterPool(
88 './tests/worker-files/cluster/testWorker.cjs',
90 errorHandler: e => console.error(e),
93 expect(pool.emitter.eventNames()).toStrictEqual([])
95 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
96 await waitPoolEvents(pool, PoolEvents.ready, 1)
97 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
98 expect(poolReady).toBe(1)
102 it("Verify that 'busy' event is emitted", async () => {
103 const promises = new Set()
104 expect(pool.emitter.eventNames()).toStrictEqual([])
106 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
107 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
108 for (let i = 0; i < numberOfWorkers * 2; i++) {
109 promises.add(pool.execute())
111 await Promise.all(promises)
112 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
113 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
114 expect(poolBusy).toBe(numberOfWorkers + 1)
117 it('Verify that tasks queuing is working', async () => {
118 const promises = new Set()
119 const maxMultiplier = 3 // Must be greater than tasksConcurrency
120 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
121 promises.add(queuePool.execute())
123 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
124 for (const workerNode of queuePool.workerNodes) {
125 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
126 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
127 queuePool.opts.tasksQueueOptions.concurrency
129 expect(workerNode.usage.tasks.executed).toBe(0)
130 expect(workerNode.usage.tasks.queued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 expect(workerNode.usage.tasks.maxQueued).toBe(
134 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
136 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
137 expect(workerNode.usage.tasks.stolen).toBe(0)
139 expect(queuePool.info.executedTasks).toBe(0)
140 expect(queuePool.info.executingTasks).toBe(
141 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
143 expect(queuePool.info.queuedTasks).toBe(
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
147 expect(queuePool.info.maxQueuedTasks).toBe(
149 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
151 expect(queuePool.info.backPressure).toBe(false)
152 expect(queuePool.info.stolenTasks).toBe(0)
153 await Promise.all(promises)
154 for (const workerNode of queuePool.workerNodes) {
155 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
156 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
157 numberOfWorkers * maxMultiplier
159 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
160 expect(workerNode.usage.tasks.queued).toBe(0)
161 expect(workerNode.usage.tasks.maxQueued).toBe(
162 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
167 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
168 numberOfWorkers * maxMultiplier
170 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
171 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
172 numberOfWorkers * maxMultiplier
175 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
176 expect(queuePool.info.backPressure).toBe(false)
177 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
178 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
179 numberOfWorkers * maxMultiplier
183 it('Verify that is possible to have a worker that return undefined', async () => {
184 const result = await emptyPool.execute()
185 expect(result).toBeUndefined()
188 it('Verify that data are sent to the worker correctly', async () => {
189 const data = { f: 10 }
190 const result = await echoPool.execute(data)
191 expect(result).toStrictEqual(data)
194 it('Verify that error handling is working properly:sync', async () => {
195 const data = { f: 10 }
196 expect(errorPool.emitter.eventNames()).toStrictEqual([])
198 errorPool.emitter.on(PoolEvents.taskError, e => {
201 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
204 await errorPool.execute(data)
208 expect(inError).toBeDefined()
209 expect(typeof inError === 'string').toBe(true)
210 expect(inError).toBe('Error Message from ClusterWorker')
211 expect(taskError).toStrictEqual({
212 name: DEFAULT_TASK_NAME,
213 message: 'Error Message from ClusterWorker',
217 errorPool.workerNodes.some(
218 workerNode => workerNode.usage.tasks.failed === 1
223 it('Verify that error handling is working properly:async', async () => {
224 const data = { f: 10 }
225 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
227 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
230 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
231 PoolEvents.taskError,
235 await asyncErrorPool.execute(data)
239 expect(inError).toBeDefined()
240 expect(typeof inError === 'string').toBe(true)
241 expect(inError).toBe('Error Message from ClusterWorker:async')
242 expect(taskError).toStrictEqual({
243 name: DEFAULT_TASK_NAME,
244 message: 'Error Message from ClusterWorker:async',
248 asyncErrorPool.workerNodes.some(
249 workerNode => workerNode.usage.tasks.failed === 1
254 it('Verify that async function is working properly', async () => {
255 const data = { f: 10 }
256 const startTime = performance.now()
257 const result = await asyncPool.execute(data)
258 const usedTime = performance.now() - startTime
259 expect(result).toStrictEqual(data)
260 expect(usedTime).toBeGreaterThanOrEqual(2000)
263 it('Shutdown test', async () => {
264 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
265 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
267 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
268 expect(pool.emitter.eventNames()).toStrictEqual([
273 const numberOfExitEvents = await exitPromise
274 expect(pool.started).toBe(false)
275 expect(pool.emitter.eventNames()).toStrictEqual([
279 expect(pool.readyEventEmitted).toBe(false)
280 expect(pool.workerNodes.length).toBe(0)
281 expect(numberOfExitEvents).toBe(numberOfWorkers)
282 expect(poolDestroy).toBe(1)
285 it('Verify that cluster pool options are checked', async () => {
286 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
287 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
288 expect(pool.opts.env).toBeUndefined()
289 expect(pool.opts.settings).toBeUndefined()
290 expect(cluster.settings).toMatchObject({
291 exec: workerFilePath,
295 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
296 env: { TEST: 'test' },
297 settings: { args: ['--use', 'http'], silent: true },
299 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
300 expect(pool.opts.settings).toStrictEqual({
301 args: ['--use', 'http'],
304 expect(cluster.settings).toMatchObject({
305 args: ['--use', 'http'],
307 exec: workerFilePath,
312 it('Should work even without opts in input', async () => {
313 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
314 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
315 const res = await pool.execute()
316 expect(res).toStrictEqual({ ok: 1 })
317 // We need to clean up the resources after our test
321 it('Verify destroyWorkerNode()', async () => {
322 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
323 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
324 const workerNodeKey = 0
325 let disconnectEvent = 0
326 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
330 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
333 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
334 expect(disconnectEvent).toBe(1)
335 expect(exitEvent).toBe(1)
336 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
337 expect(pool.workerNodes.length).toBe(numberOfWorkers)
341 it('Verify that a pool with zero worker fails', () => {
344 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
345 ).toThrow('Cannot instantiate a fixed pool with zero worker')