1 import { expect } from 'expect'
7 } from '../../../lib/index.cjs'
8 import { TaskFunctions } from '../../test-types.cjs'
9 import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
11 describe('Dynamic cluster pool test suite', () => {
14 const pool = new DynamicClusterPool(
17 './tests/worker-files/cluster/testWorker.cjs',
19 errorHandler: e => console.error(e)
23 it('Verify that the function is executed in a worker cluster', async () => {
24 let result = await pool.execute({
25 function: TaskFunctions.fibonacci
27 expect(result).toBe(354224848179262000000)
28 result = await pool.execute({
29 function: TaskFunctions.factorial
31 expect(result).toBe(9.33262154439441e157)
34 it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
36 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
37 for (let i = 0; i < max * 2; i++) {
40 expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
41 expect(pool.workerNodes.length).toBeGreaterThan(min)
42 expect(poolBusy).toBe(1)
43 const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
44 expect(numberOfExitEvents).toBe(max - min)
45 expect(pool.workerNodes.length).toBe(min)
48 it('Verify scale worker up and down is working', async () => {
49 for (let i = 0; i < max * 2; i++) {
52 expect(pool.workerNodes.length).toBeGreaterThan(min)
53 await waitWorkerEvents(pool, 'exit', max - min)
54 expect(pool.workerNodes.length).toBe(min)
55 for (let i = 0; i < max * 2; i++) {
58 expect(pool.workerNodes.length).toBeGreaterThan(min)
59 await waitWorkerEvents(pool, 'exit', max - min)
60 expect(pool.workerNodes.length).toBe(min)
63 it('Shutdown test', async () => {
64 const exitPromise = waitWorkerEvents(pool, 'exit', min)
65 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
67 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
68 expect(pool.emitter.eventNames()).toStrictEqual([
73 const numberOfExitEvents = await exitPromise
74 expect(pool.started).toBe(false)
75 expect(pool.emitter.eventNames()).toStrictEqual([
79 expect(pool.readyEventEmitted).toBe(false)
80 expect(pool.workerNodes.length).toBe(0)
81 expect(numberOfExitEvents).toBe(min)
82 expect(poolDestroy).toBe(1)
85 it('Validation of inputs test', () => {
86 expect(() => new DynamicClusterPool(min)).toThrow(
87 'The worker file path must be specified'
91 it('Should work even without opts in input', async () => {
92 const pool = new DynamicClusterPool(
95 './tests/worker-files/cluster/testWorker.cjs'
97 const result = await pool.execute()
98 expect(result).toStrictEqual({ ok: 1 })
99 // We need to clean up the resources after our test
103 it('Verify scale processes up and down is working when long executing task is used:hard', async () => {
104 const longRunningPool = new DynamicClusterPool(
107 './tests/worker-files/cluster/longRunningWorkerHardBehavior.cjs',
109 errorHandler: e => console.error(e),
110 onlineHandler: () => console.info('long executing worker is online'),
111 exitHandler: () => console.info('long executing worker exited')
114 expect(longRunningPool.workerNodes.length).toBe(min)
115 for (let i = 0; i < max * 2; i++) {
116 longRunningPool.execute()
118 expect(longRunningPool.workerNodes.length).toBe(max)
119 await waitWorkerEvents(longRunningPool, 'exit', max - min)
120 expect(longRunningPool.workerNodes.length).toBe(min)
122 longRunningPool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
123 longRunningPool.workerChoiceStrategiesContext
124 .defaultWorkerChoiceStrategy
126 ).toBeLessThan(longRunningPool.workerNodes.length)
127 // We need to clean up the resources after our test
128 await longRunningPool.destroy()
131 it('Verify scale processes up and down is working when long executing task is used:soft', async () => {
132 const longRunningPool = new DynamicClusterPool(
135 './tests/worker-files/cluster/longRunningWorkerSoftBehavior.cjs',
137 errorHandler: e => console.error(e),
138 onlineHandler: () => console.info('long executing worker is online'),
139 exitHandler: () => console.info('long executing worker exited')
142 expect(longRunningPool.workerNodes.length).toBe(min)
143 for (let i = 0; i < max * 2; i++) {
144 longRunningPool.execute()
146 expect(longRunningPool.workerNodes.length).toBe(max)
148 // Here we expect the workerNodes to be at the max size since the task is still executing
149 expect(longRunningPool.workerNodes.length).toBe(max)
150 // We need to clean up the resources after our test
151 await longRunningPool.destroy()
154 it('Verify that a pool with zero worker can be instantiated', async () => {
155 const pool = new DynamicClusterPool(
158 './tests/worker-files/cluster/testWorker.cjs'
160 expect(pool).toBeInstanceOf(DynamicClusterPool)
161 // We need to clean up the resources after our test
165 it('Verify that a pool with zero worker works', async () => {
166 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
167 const pool = new DynamicClusterPool(
170 './tests/worker-files/cluster/testWorker.cjs',
175 expect(pool.starting).toBe(false)
176 expect(pool.readyEventEmitted).toBe(false)
177 for (let run = 0; run < 2; run++) {
178 run % 2 !== 0 && pool.enableTasksQueue(true)
179 const maxMultiplier = 4
180 const promises = new Set()
181 expect(pool.workerNodes.length).toBe(pool.info.minSize)
182 for (let i = 0; i < max * maxMultiplier; i++) {
183 promises.add(pool.execute())
185 await Promise.all(promises)
186 expect(pool.readyEventEmitted).toBe(true)
187 expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
188 expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
189 await waitPoolEvents(pool, PoolEvents.empty, 1)
190 expect(pool.readyEventEmitted).toBe(false)
191 expect(pool.workerNodes.length).toBe(pool.info.minSize)
193 // We need to clean up the resources after our test