refactor: use PoolEvents enum
[poolifier.git] / tests / pools / thread / dynamic.test.mjs
1 import { expect } from 'expect'
2
3 import {
4 DynamicThreadPool,
5 PoolEvents,
6 WorkerChoiceStrategies
7 } from '../../../lib/index.cjs'
8 import { TaskFunctions } from '../../test-types.cjs'
9 import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
10
11 describe('Dynamic thread pool test suite', () => {
12 const min = 1
13 const max = 3
14 const pool = new DynamicThreadPool(
15 min,
16 max,
17 './tests/worker-files/thread/testWorker.mjs',
18 {
19 errorHandler: e => console.error(e)
20 }
21 )
22
23 it('Verify that the function is executed in a worker thread', async () => {
24 let result = await pool.execute({
25 function: TaskFunctions.fibonacci
26 })
27 expect(result).toBe(75025)
28 result = await pool.execute({
29 function: TaskFunctions.factorial
30 })
31 expect(result).toBe(9.33262154439441e157)
32 })
33
34 it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
35 let poolBusy = 0
36 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
37 for (let i = 0; i < max * 2; i++) {
38 pool.execute()
39 }
40 expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
41 expect(pool.workerNodes.length).toBeGreaterThan(min)
42 expect(poolBusy).toBe(1)
43 const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
44 expect(numberOfExitEvents).toBe(max - min)
45 expect(pool.workerNodes.length).toBe(min)
46 })
47
48 it('Verify scale thread up and down is working', async () => {
49 for (let i = 0; i < max * 2; i++) {
50 pool.execute()
51 }
52 expect(pool.workerNodes.length).toBe(max)
53 await waitWorkerEvents(pool, 'exit', max - min)
54 expect(pool.workerNodes.length).toBe(min)
55 for (let i = 0; i < max * 2; i++) {
56 pool.execute()
57 }
58 expect(pool.workerNodes.length).toBe(max)
59 await waitWorkerEvents(pool, 'exit', max - min)
60 expect(pool.workerNodes.length).toBe(min)
61 })
62
63 it('Shutdown test', async () => {
64 const exitPromise = waitWorkerEvents(pool, 'exit', min)
65 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
66 let poolDestroy = 0
67 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
68 expect(pool.emitter.eventNames()).toStrictEqual([
69 PoolEvents.busy,
70 PoolEvents.destroy
71 ])
72 await pool.destroy()
73 const numberOfExitEvents = await exitPromise
74 expect(pool.started).toBe(false)
75 expect(pool.emitter.eventNames()).toStrictEqual([
76 PoolEvents.busy,
77 PoolEvents.destroy
78 ])
79 expect(pool.readyEventEmitted).toBe(false)
80 expect(pool.workerNodes.length).toBe(0)
81 expect(numberOfExitEvents).toBe(min)
82 expect(poolDestroy).toBe(1)
83 })
84
85 it('Validation of inputs test', () => {
86 expect(() => new DynamicThreadPool(min)).toThrow(
87 'The worker file path must be specified'
88 )
89 })
90
91 it('Should work even without opts in input', async () => {
92 const pool = new DynamicThreadPool(
93 min,
94 max,
95 './tests/worker-files/thread/testWorker.mjs'
96 )
97 const res = await pool.execute()
98 expect(res).toStrictEqual({ ok: 1 })
99 // We need to clean up the resources after our test
100 await pool.destroy()
101 })
102
103 it('Verify scale thread up and down is working when long executing task is used:hard', async () => {
104 const longRunningPool = new DynamicThreadPool(
105 min,
106 max,
107 './tests/worker-files/thread/longRunningWorkerHardBehavior.mjs',
108 {
109 errorHandler: e => console.error(e),
110 onlineHandler: () => console.info('long executing worker is online'),
111 exitHandler: () => console.info('long executing worker exited')
112 }
113 )
114 expect(longRunningPool.workerNodes.length).toBe(min)
115 for (let i = 0; i < max * 2; i++) {
116 longRunningPool.execute()
117 }
118 expect(longRunningPool.workerNodes.length).toBe(max)
119 await waitWorkerEvents(longRunningPool, 'exit', max - min)
120 expect(longRunningPool.workerNodes.length).toBe(min)
121 expect(
122 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
124 ).nextWorkerNodeKey
125 ).toBeLessThan(longRunningPool.workerNodes.length)
126 // We need to clean up the resources after our test
127 await longRunningPool.destroy()
128 })
129
130 it('Verify scale thread up and down is working when long executing task is used:soft', async () => {
131 const longRunningPool = new DynamicThreadPool(
132 min,
133 max,
134 './tests/worker-files/thread/longRunningWorkerSoftBehavior.mjs',
135 {
136 errorHandler: e => console.error(e),
137 onlineHandler: () => console.info('long executing worker is online'),
138 exitHandler: () => console.info('long executing worker exited')
139 }
140 )
141 expect(longRunningPool.workerNodes.length).toBe(min)
142 for (let i = 0; i < max * 2; i++) {
143 longRunningPool.execute()
144 }
145 expect(longRunningPool.workerNodes.length).toBe(max)
146 await sleep(1000)
147 // Here we expect the workerNodes to be at the max size since the task is still executing
148 expect(longRunningPool.workerNodes.length).toBe(max)
149 // We need to clean up the resources after our test
150 await longRunningPool.destroy()
151 })
152
153 it('Verify that a pool with zero worker can be instantiated', async () => {
154 const pool = new DynamicThreadPool(
155 0,
156 max,
157 './tests/worker-files/thread/testWorker.mjs'
158 )
159 expect(pool).toBeInstanceOf(DynamicThreadPool)
160 // We need to clean up the resources after our test
161 await pool.destroy()
162 })
163
164 it('Verify that a pool with zero worker works', async () => {
165 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
166 const pool = new DynamicThreadPool(
167 0,
168 max,
169 './tests/worker-files/thread/testWorker.mjs',
170 {
171 workerChoiceStrategy
172 }
173 )
174 expect(pool.starting).toBe(false)
175 expect(pool.readyEventEmitted).toBe(false)
176 for (let run = 0; run < 2; run++) {
177 run % 2 !== 0 && pool.enableTasksQueue(true)
178 const maxMultiplier = 4
179 const promises = new Set()
180 expect(pool.workerNodes.length).toBe(pool.info.minSize)
181 for (let i = 0; i < max * maxMultiplier; i++) {
182 promises.add(pool.execute())
183 }
184 await Promise.all(promises)
185 expect(pool.readyEventEmitted).toBe(true)
186 expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
187 expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
188 await waitPoolEvents(pool, PoolEvents.empty, 1)
189 expect(pool.readyEventEmitted).toBe(false)
190 expect(pool.workerNodes.length).toBe(pool.info.minSize)
191 }
192 // We need to clean up the resources after our test
193 await pool.destroy()
194 }
195 })
196 })