1 import { expect } from 'expect'
2 import { FixedClusterPool, PoolEvents } from '../../../lib/index.js'
3 import { TaskFunctions } from '../../test-types.js'
4 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
7 describe('Fixed cluster pool test suite', () => {
8 const numberOfWorkers = 8
9 const tasksConcurrency = 2
10 const pool = new FixedClusterPool(
12 './tests/worker-files/cluster/testWorker.js',
14 errorHandler: e => console.error(e)
17 const queuePool = new FixedClusterPool(
19 './tests/worker-files/cluster/testWorker.js',
21 enableTasksQueue: true,
23 concurrency: tasksConcurrency
25 errorHandler: e => console.error(e)
28 const emptyPool = new FixedClusterPool(
30 './tests/worker-files/cluster/emptyWorker.js',
31 { exitHandler: () => console.info('empty pool worker exited') }
33 const echoPool = new FixedClusterPool(
35 './tests/worker-files/cluster/echoWorker.js'
37 const errorPool = new FixedClusterPool(
39 './tests/worker-files/cluster/errorWorker.js',
41 errorHandler: e => console.error(e)
44 const asyncErrorPool = new FixedClusterPool(
46 './tests/worker-files/cluster/asyncErrorWorker.js',
48 errorHandler: e => console.error(e)
51 const asyncPool = new FixedClusterPool(
53 './tests/worker-files/cluster/asyncWorker.js'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
66 it('Verify that the function is executed in a worker cluster', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
74 expect(result).toBe(9.33262154439441e157)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedClusterPool(
85 './tests/worker-files/cluster/testWorker.js',
87 errorHandler: e => console.error(e)
90 expect(pool.emitter.eventNames()).toStrictEqual([])
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
95 expect(poolReady).toBe(1)
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
101 expect(pool.emitter.eventNames()).toStrictEqual([])
103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
105 for (let i = 0; i < numberOfWorkers * 2; i++) {
106 promises.add(pool.execute())
108 await Promise.all(promises)
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfWorkers + 1)
114 it('Verify that tasks queuing is working', async () => {
115 const promises = new Set()
116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
117 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
118 promises.add(queuePool.execute())
120 expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
124 queuePool.opts.tasksQueueOptions.concurrency
126 expect(workerNode.usage.tasks.executed).toBe(0)
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 expect(workerNode.usage.tasks.stolen).toBe(0)
135 expect(queuePool.info.executedTasks).toBe(0)
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfWorkers * queuePool.opts.tasksQueueOptions.concurrency
139 expect(queuePool.info.queuedTasks).toBe(
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
143 expect(queuePool.info.maxQueuedTasks).toBe(
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
147 expect(queuePool.info.backPressure).toBe(false)
148 expect(queuePool.info.stolenTasks).toBe(0)
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfWorkers * maxMultiplier
155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
156 expect(workerNode.usage.tasks.queued).toBe(0)
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfWorkers * maxMultiplier
165 expect(queuePool.info.executedTasks).toBe(numberOfWorkers * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfWorkers * maxMultiplier
173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
175 expect(result).toBeUndefined()
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
181 expect(result).toStrictEqual(data)
184 it('Verify that error handling is working properly:sync', async () => {
185 const data = { f: 10 }
186 expect(errorPool.emitter.eventNames()).toStrictEqual([])
188 errorPool.emitter.on(PoolEvents.taskError, e => {
191 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
194 await errorPool.execute(data)
198 expect(inError).toBeDefined()
199 expect(typeof inError === 'string').toBe(true)
200 expect(inError).toBe('Error Message from ClusterWorker')
201 expect(taskError).toStrictEqual({
202 name: DEFAULT_TASK_NAME,
203 message: 'Error Message from ClusterWorker',
207 errorPool.workerNodes.some(
208 workerNode => workerNode.usage.tasks.failed === 1
213 it('Verify that error handling is working properly:async', async () => {
214 const data = { f: 10 }
215 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
217 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
220 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
225 await asyncErrorPool.execute(data)
229 expect(inError).toBeDefined()
230 expect(typeof inError === 'string').toBe(true)
231 expect(inError).toBe('Error Message from ClusterWorker:async')
232 expect(taskError).toStrictEqual({
233 name: DEFAULT_TASK_NAME,
234 message: 'Error Message from ClusterWorker:async',
238 asyncErrorPool.workerNodes.some(
239 workerNode => workerNode.usage.tasks.failed === 1
244 it('Verify that async function is working properly', async () => {
245 const data = { f: 10 }
246 const startTime = performance.now()
247 const result = await asyncPool.execute(data)
248 const usedTime = performance.now() - startTime
249 expect(result).toStrictEqual(data)
250 expect(usedTime).toBeGreaterThanOrEqual(2000)
253 it('Shutdown test', async () => {
254 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
255 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
257 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
258 expect(pool.emitter.eventNames()).toStrictEqual([
263 const numberOfExitEvents = await exitPromise
264 expect(pool.started).toBe(false)
265 expect(pool.workerNodes.length).toBe(0)
266 expect(numberOfExitEvents).toBe(numberOfWorkers)
267 expect(poolDestroy).toBe(1)
270 it('Verify that cluster pool options are checked', async () => {
271 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
272 let pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
273 expect(pool.opts.env).toBeUndefined()
274 expect(pool.opts.settings).toBeUndefined()
276 pool = new FixedClusterPool(numberOfWorkers, workerFilePath, {
277 env: { TEST: 'test' },
278 settings: { args: ['--use', 'http'], silent: true }
280 expect(pool.opts.env).toStrictEqual({ TEST: 'test' })
281 expect(pool.opts.settings).toStrictEqual({
282 args: ['--use', 'http'],
285 expect({ ...pool.opts.settings, exec: workerFilePath }).toStrictEqual({
286 args: ['--use', 'http'],
293 it('Should work even without opts in input', async () => {
294 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
295 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
296 const res = await pool.execute()
297 expect(res).toStrictEqual({ ok: 1 })
298 // We need to clean up the resources after our test
302 it('Verify destroyWorkerNode()', async () => {
303 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
304 const pool = new FixedClusterPool(numberOfWorkers, workerFilePath)
305 const workerNodeKey = 0
306 let disconnectEvent = 0
307 pool.workerNodes[workerNodeKey].worker.on('disconnect', () => {
311 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
314 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
315 expect(disconnectEvent).toBe(1)
316 expect(exitEvent).toBe(1)
317 expect(pool.workerNodes.length).toBe(numberOfWorkers - 1)
321 it('Verify that a pool with zero worker fails', () => {
324 new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
325 ).toThrowError('Cannot instantiate a fixed pool with zero worker')