fix: fix dynamic pool with minimum # of workers set to zero
[poolifier.git] / tests / pools / thread / dynamic.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicThreadPool,
4 PoolEvents,
5 WorkerChoiceStrategies
6 } from '../../../lib/index.cjs'
7 import { TaskFunctions } from '../../test-types.cjs'
8 import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
9
10 describe('Dynamic thread pool test suite', () => {
11 const min = 1
12 const max = 3
13 const pool = new DynamicThreadPool(
14 min,
15 max,
16 './tests/worker-files/thread/testWorker.mjs',
17 {
18 errorHandler: e => console.error(e)
19 }
20 )
21
22 it('Verify that the function is executed in a worker thread', async () => {
23 let result = await pool.execute({
24 function: TaskFunctions.fibonacci
25 })
26 expect(result).toBe(75025)
27 result = await pool.execute({
28 function: TaskFunctions.factorial
29 })
30 expect(result).toBe(9.33262154439441e157)
31 })
32
33 it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
34 let poolBusy = 0
35 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
36 for (let i = 0; i < max * 2; i++) {
37 pool.execute()
38 }
39 expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
40 expect(pool.workerNodes.length).toBeGreaterThan(min)
41 expect(poolBusy).toBe(1)
42 const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
43 expect(numberOfExitEvents).toBe(max - min)
44 expect(pool.workerNodes.length).toBe(min)
45 })
46
47 it('Verify scale thread up and down is working', async () => {
48 for (let i = 0; i < max * 2; i++) {
49 pool.execute()
50 }
51 expect(pool.workerNodes.length).toBe(max)
52 await waitWorkerEvents(pool, 'exit', max - min)
53 expect(pool.workerNodes.length).toBe(min)
54 for (let i = 0; i < max * 2; i++) {
55 pool.execute()
56 }
57 expect(pool.workerNodes.length).toBe(max)
58 await waitWorkerEvents(pool, 'exit', max - min)
59 expect(pool.workerNodes.length).toBe(min)
60 })
61
62 it('Shutdown test', async () => {
63 const exitPromise = waitWorkerEvents(pool, 'exit', min)
64 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
65 let poolDestroy = 0
66 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
67 expect(pool.emitter.eventNames()).toStrictEqual([
68 PoolEvents.busy,
69 PoolEvents.destroy
70 ])
71 await pool.destroy()
72 const numberOfExitEvents = await exitPromise
73 expect(pool.started).toBe(false)
74 expect(pool.emitter.eventNames()).toStrictEqual([])
75 expect(pool.readyEventEmitted).toBe(false)
76 expect(pool.workerNodes.length).toBe(0)
77 expect(numberOfExitEvents).toBe(min)
78 expect(poolDestroy).toBe(1)
79 })
80
81 it('Validation of inputs test', () => {
82 expect(() => new DynamicThreadPool(min)).toThrow(
83 'The worker file path must be specified'
84 )
85 })
86
87 it('Should work even without opts in input', async () => {
88 const pool = new DynamicThreadPool(
89 min,
90 max,
91 './tests/worker-files/thread/testWorker.mjs'
92 )
93 const res = await pool.execute()
94 expect(res).toStrictEqual({ ok: 1 })
95 // We need to clean up the resources after our test
96 await pool.destroy()
97 })
98
99 it('Verify scale thread up and down is working when long executing task is used:hard', async () => {
100 const longRunningPool = new DynamicThreadPool(
101 min,
102 max,
103 './tests/worker-files/thread/longRunningWorkerHardBehavior.mjs',
104 {
105 errorHandler: e => console.error(e),
106 onlineHandler: () => console.info('long executing worker is online'),
107 exitHandler: () => console.info('long executing worker exited')
108 }
109 )
110 expect(longRunningPool.workerNodes.length).toBe(min)
111 for (let i = 0; i < max * 2; i++) {
112 longRunningPool.execute()
113 }
114 expect(longRunningPool.workerNodes.length).toBe(max)
115 await waitWorkerEvents(longRunningPool, 'exit', max - min)
116 expect(longRunningPool.workerNodes.length).toBe(min)
117 expect(
118 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
119 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
120 ).nextWorkerNodeKey
121 ).toBeLessThan(longRunningPool.workerNodes.length)
122 // We need to clean up the resources after our test
123 await longRunningPool.destroy()
124 })
125
126 it('Verify scale thread up and down is working when long executing task is used:soft', async () => {
127 const longRunningPool = new DynamicThreadPool(
128 min,
129 max,
130 './tests/worker-files/thread/longRunningWorkerSoftBehavior.mjs',
131 {
132 errorHandler: e => console.error(e),
133 onlineHandler: () => console.info('long executing worker is online'),
134 exitHandler: () => console.info('long executing worker exited')
135 }
136 )
137 expect(longRunningPool.workerNodes.length).toBe(min)
138 for (let i = 0; i < max * 2; i++) {
139 longRunningPool.execute()
140 }
141 expect(longRunningPool.workerNodes.length).toBe(max)
142 await sleep(1000)
143 // Here we expect the workerNodes to be at the max size since the task is still executing
144 expect(longRunningPool.workerNodes.length).toBe(max)
145 // We need to clean up the resources after our test
146 await longRunningPool.destroy()
147 })
148
149 it('Verify that a pool with zero worker can be instantiated', async () => {
150 const pool = new DynamicThreadPool(
151 0,
152 max,
153 './tests/worker-files/thread/testWorker.mjs'
154 )
155 expect(pool).toBeInstanceOf(DynamicThreadPool)
156 // We need to clean up the resources after our test
157 await pool.destroy()
158 })
159
160 it('Verify that a pool with zero worker works', async () => {
161 const pool = new DynamicThreadPool(
162 0,
163 max,
164 './tests/worker-files/thread/testWorker.mjs'
165 )
166 expect(pool.starting).toBe(false)
167 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
168 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
169 expect(pool.readyEventEmitted).toBe(false)
170 for (let run = 0; run < 2; run++) {
171 run % 2 !== 0 && pool.enableTasksQueue(true)
172 const maxMultiplier = 4
173 const promises = new Set()
174 expect(pool.workerNodes.length).toBe(pool.info.minSize)
175 for (let i = 0; i < max * maxMultiplier; i++) {
176 promises.add(pool.execute())
177 }
178 await Promise.all(promises)
179 expect(pool.readyEventEmitted).toBe(true)
180 expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
181 expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
182 await waitPoolEvents(pool, PoolEvents.empty, 1)
183 expect(pool.readyEventEmitted).toBe(false)
184 expect(pool.workerNodes.length).toBe(pool.info.minSize)
185 }
186 }
187 // We need to clean up the resources after our test
188 await pool.destroy()
189 })
190 })