1 import { expect } from 'expect'
3 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
4 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
5 import { TaskFunctions } from '../../test-types.cjs'
6 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
8 describe('Fixed cluster pool test suite', () => {
9 const numberOfWorkers = 8
10 const tasksConcurrency = 2
11 const pool = new FixedClusterPool(
13 './tests/worker-files/cluster/testWorker.cjs',
15 errorHandler: e => console.error(e)
18 const queuePool = new FixedClusterPool(
20 './tests/worker-files/cluster/testWorker.cjs',
22 enableTasksQueue: true,
24 concurrency: tasksConcurrency
26 errorHandler: e => console.error(e)
29 const emptyPool = new FixedClusterPool(
31 './tests/worker-files/cluster/emptyWorker.cjs',
32 { exitHandler: () => console.info('empty pool worker exited') }
34 const echoPool = new FixedClusterPool(
36 './tests/worker-files/cluster/echoWorker.cjs'
38 const errorPool = new FixedClusterPool(
40 './tests/worker-files/cluster/errorWorker.cjs',
42 errorHandler: e => console.error(e)
45 const asyncErrorPool = new FixedClusterPool(
47 './tests/worker-files/cluster/asyncErrorWorker.cjs',
49 errorHandler: e => console.error(e)
52 const asyncPool = new FixedClusterPool(
54 './tests/worker-files/cluster/asyncWorker.cjs'
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
64 await queuePool.destroy()
67 it('Verify that the function is executed in a worker cluster', async () => {
68 let result = await pool.execute({
69 function: TaskFunctions.fibonacci
71 expect(result).toBe(75025)
72 result = await pool.execute({
73 function: TaskFunctions.factorial
75 expect(result).toBe(9.33262154439441e157)
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result = await pool.execute()
80 expect(result).toStrictEqual({ ok: 1 })
83 it("Verify that 'ready' event is emitted", async () => {
84 const pool = new FixedClusterPool(
86 './tests/worker-files/cluster/testWorker.cjs',
88 errorHandler: e => console.error(e)
91 expect(pool.emitter.eventNames()).toStrictEqual([])
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
96 expect(poolReady).toBe(1)
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
102 expect(pool.emitter.eventNames()).toStrictEqual([])
104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
106 for (let i = 0; i < numberOfWorkers * 2; i++) {
107 promises.add(pool.execute())
109 await Promise.all(promises)
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfWorkers + 1)
115 it('Verify that tasks queuing is working', async () => {
116 const promises = new Set()
117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
118 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
119 promises.add(queuePool.execute())
121 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
122 for (const workerNode of queuePool.workerNodes) {
123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
125 queuePool.opts.tasksQueueOptions.concurrency
127 expect(workerNode.usage.tasks.executed).toBe(0)
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
135 expect(workerNode.usage.tasks.stolen).toBe(0)
137 expect(queuePool.info.executedTasks).toBe(0)
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
141 expect(queuePool.info.queuedTasks).toBe(
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
145 expect(queuePool.info.maxQueuedTasks).toBe(
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
149 expect(queuePool.info.backPressure).toBe(false)
150 expect(queuePool.info.stolenTasks).toBe(0)
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfWorkers * maxMultiplier
157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
158 expect(workerNode.usage.tasks.queued).toBe(0)
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfWorkers * maxMultiplier
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfWorkers * maxMultiplier
173 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfWorkers * maxMultiplier
181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
183 expect(result).toBeUndefined()
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
189 expect(result).toStrictEqual(data)
192 it('Verify that error handling is working properly:sync', async () => {
193 const data = { f: 10 }
194 expect(errorPool.emitter.eventNames()).toStrictEqual([])
196 errorPool.emitter.on(PoolEvents.taskError, e => {
199 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
202 await errorPool.execute(data)
206 expect(inError).toBeDefined()
207 expect(typeof inError === 'string').toBe(true)
208 expect(inError).toBe('Error Message from ClusterWorker')
209 expect(taskError).toStrictEqual({
210 name: DEFAULT_TASK_NAME,
211 message: 'Error Message from ClusterWorker',
215 errorPool.workerNodes.some(
216 workerNode => workerNode.usage.tasks.failed === 1
221 it('Verify that error handling is working properly:async', async () => {
222 const data = { f: 10 }
223 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
225 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
228 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
233 await asyncErrorPool.execute(data)
237 expect(inError).toBeDefined()
238 expect(typeof inError === 'string').toBe(true)
239 expect(inError).toBe('Error Message from ClusterWorker:async')
240 expect(taskError).toStrictEqual({
241 name: DEFAULT_TASK_NAME,
242 message: 'Error Message from ClusterWorker:async',
246 asyncErrorPool.workerNodes.some(
247 workerNode => workerNode.usage.tasks.failed === 1
252 it('Verify that async function is working properly', async () => {
253 const data = { f: 10 }
254 const startTime = performance.now()
255 const result = await asyncPool.execute(data)
256 const usedTime = performance.now() - startTime
257 expect(result).toStrictEqual(data)
258 expect(usedTime).toBeGreaterThanOrEqual(2000)
261 it('Shutdown test', async () => {
262 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
265 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
266 expect(pool.emitter.eventNames()).toStrictEqual([
271 const numberOfExitEvents = await exitPromise
272 expect(pool.started).toBe(false)
273 expect(pool.emitter.eventNames()).toStrictEqual(['busy', 'destroy'])
274 expect(pool.readyEventEmitted).toBe(false)
275 expect(pool.workerNodes.length).toBe(0)
276 expect(numberOfExitEvents).toBe(numberOfWorkers)
277 expect(poolDestroy).toBe(1)
280 it('Verify that cluster pool options are checked', async () => {
281 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
282 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
283 expect(pool.opts.env).toBeUndefined()
284 expect(pool.opts.settings).toBeUndefined()
286 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
287 env: { TEST: 'test' },
288 settings: { args: ['--use', 'http'], silent: true }
290 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
291 expect(pool.opts.settings).toStrictEqual({
292 args: ['--use', 'http'],
295 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
296 args: ['--use', 'http'],
303 it('Should work even without opts in input', async () => {
304 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
305 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
306 const res = await pool.execute()
307 expect(res).toStrictEqual({ ok: 1 })
308 // We need to clean up the resources after our test
312 it('Verify destroyWorkerNode()', async () => {
313 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
314 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
315 const workerNodeKey = 0
316 let disconnectEvent = 0
317 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
321 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
324 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
325 expect(disconnectEvent).toBe(1)
326 expect(exitEvent).toBe(1)
327 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
331 it('Verify that a pool with zero worker fails', () => {
334 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
335 ).toThrow('Cannot instantiate a fixed pool with zero worker')