1 import { expect } from 'expect'
3 import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
4 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
5 import { TaskFunctions } from '../../test-types.cjs'
6 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
8 describe('Fixed cluster pool test suite', () => {
9 const numberOfWorkers = 8
10 const tasksConcurrency = 2
11 const pool = new FixedClusterPool(
13 './tests/worker-files/cluster/testWorker.cjs',
15 errorHandler: e => console.error(e)
18 const queuePool = new FixedClusterPool(
20 './tests/worker-files/cluster/testWorker.cjs',
22 enableTasksQueue: true,
24 concurrency: tasksConcurrency
26 errorHandler: e => console.error(e)
29 const emptyPool = new FixedClusterPool(
31 './tests/worker-files/cluster/emptyWorker.cjs',
32 { exitHandler: () => console.info('empty pool worker exited') }
34 const echoPool = new FixedClusterPool(
36 './tests/worker-files/cluster/echoWorker.cjs'
38 const errorPool = new FixedClusterPool(
40 './tests/worker-files/cluster/errorWorker.cjs',
42 errorHandler: e => console.error(e)
45 const asyncErrorPool = new FixedClusterPool(
47 './tests/worker-files/cluster/asyncErrorWorker.cjs',
49 errorHandler: e => console.error(e)
52 const asyncPool = new FixedClusterPool(
54 './tests/worker-files/cluster/asyncWorker.cjs'
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
64 await queuePool.destroy()
67 it('Verify that the function is executed in a worker cluster', async () => {
68 let result = await pool.execute({
69 function: TaskFunctions.fibonacci
71 expect(result).toBe(75025)
72 result = await pool.execute({
73 function: TaskFunctions.factorial
75 expect(result).toBe(9.33262154439441e157)
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result = await pool.execute()
80 expect(result).toStrictEqual({ ok: 1 })
83 it("Verify that 'ready' event is emitted", async () => {
84 const pool = new FixedClusterPool(
86 './tests/worker-files/cluster/testWorker.cjs',
88 errorHandler: e => console.error(e)
91 expect(pool.emitter.eventNames()).toStrictEqual([])
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
96 expect(poolReady).toBe(1)
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
102 expect(pool.emitter.eventNames()).toStrictEqual([])
104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
106 for (let i = 0; i < numberOfWorkers * 2; i++) {
107 promises.add(pool.execute())
109 await Promise.all(promises)
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfWorkers + 1)
115 it('Verify that tasks queuing is working', async () => {
116 const promises = new Set()
117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
118 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
119 promises.add(queuePool.execute())
121 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
122 for (const workerNode of queuePool.workerNodes) {
123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
125 queuePool.opts.tasksQueueOptions.concurrency
127 expect(workerNode.usage.tasks.executed).toBe(0)
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
135 expect(workerNode.usage.tasks.stolen).toBe(0)
137 expect(queuePool.info.executedTasks).toBe(0)
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
141 expect(queuePool.info.queuedTasks).toBe(
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
145 expect(queuePool.info.maxQueuedTasks).toBe(
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
149 expect(queuePool.info.backPressure).toBe(false)
150 expect(queuePool.info.stolenTasks).toBe(0)
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfWorkers * maxMultiplier
157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
158 expect(workerNode.usage.tasks.queued).toBe(0)
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfWorkers * maxMultiplier
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfWorkers * maxMultiplier
173 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfWorkers * maxMultiplier
181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
183 expect(result).toBeUndefined()
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
189 expect(result).toStrictEqual(data)
192 it('Verify that error handling is working properly:sync', async () => {
193 const data = { f: 10 }
194 expect(errorPool.emitter.eventNames()).toStrictEqual([])
196 errorPool.emitter.on(PoolEvents.taskError, e => {
199 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
202 await errorPool.execute(data)
206 expect(inError).toBeDefined()
207 expect(typeof inError === 'string').toBe(true)
208 expect(inError).toBe('Error Message from ClusterWorker')
209 expect(taskError).toStrictEqual({
210 name: DEFAULT_TASK_NAME,
211 message: 'Error Message from ClusterWorker',
215 errorPool.workerNodes.some(
216 workerNode => workerNode.usage.tasks.failed === 1
221 it('Verify that error handling is working properly:async', async () => {
222 const data = { f: 10 }
223 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
225 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
228 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
233 await asyncErrorPool.execute(data)
237 expect(inError).toBeDefined()
238 expect(typeof inError === 'string').toBe(true)
239 expect(inError).toBe('Error Message from ClusterWorker:async')
240 expect(taskError).toStrictEqual({
241 name: DEFAULT_TASK_NAME,
242 message: 'Error Message from ClusterWorker:async',
246 asyncErrorPool.workerNodes.some(
247 workerNode => workerNode.usage.tasks.failed === 1
252 it('Verify that async function is working properly', async () => {
253 const data = { f: 10 }
254 const startTime = performance.now()
255 const result = await asyncPool.execute(data)
256 const usedTime = performance.now() - startTime
257 expect(result).toStrictEqual(data)
258 expect(usedTime).toBeGreaterThanOrEqual(2000)
261 it('Shutdown test', async () => {
262 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
265 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
266 expect(pool.emitter.eventNames()).toStrictEqual([
271 const numberOfExitEvents = await exitPromise
272 expect(pool.started).toBe(false)
273 expect(pool.emitter.eventNames()).toStrictEqual([
277 expect(pool.readyEventEmitted).toBe(false)
278 expect(pool.workerNodes.length).toBe(0)
279 expect(numberOfExitEvents).toBe(numberOfWorkers)
280 expect(poolDestroy).toBe(1)
283 it('Verify that cluster pool options are checked', async () => {
284 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
285 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
286 expect(pool.opts.env).toBeUndefined()
287 expect(pool.opts.settings).toBeUndefined()
289 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
290 env: { TEST: 'test' },
291 settings: { args: ['--use', 'http'], silent: true }
293 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
294 expect(pool.opts.settings).toStrictEqual({
295 args: ['--use', 'http'],
298 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
299 args: ['--use', 'http'],
306 it('Should work even without opts in input', async () => {
307 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
308 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
309 const res = await pool.execute()
310 expect(res).toStrictEqual({ ok: 1 })
311 // We need to clean up the resources after our test
315 it('Verify destroyWorkerNode()', async () => {
316 const workerFilePath = './tests/worker-files/cluster/testWorker.cjs'
317 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
318 const workerNodeKey = 0
319 let disconnectEvent = 0
320 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
324 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
327 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
328 expect(disconnectEvent).toBe(1)
329 expect(exitEvent).toBe(1)
330 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
334 it('Verify that a pool with zero worker fails', () => {
337 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.cjs')
338 ).toThrow('Cannot instantiate a fixed pool with zero worker')