0b33e36b85005dd843f8577d4bb0c6f34bee9c47
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 FixedClusterPool,
4 FixedThreadPool,
5 WorkerChoiceStrategies
6 } = require('../../../lib/index')
7
8 describe('Abstract pool test suite', () => {
9 const numberOfWorkers = 1
10 const workerNotFoundInTasksUsageMapError = new Error(
11 'Worker could not be found in worker tasks usage map'
12 )
13 class StubPoolWithWorkerTasksUsageMapClear extends FixedThreadPool {
14 removeAllWorker () {
15 this.workersTasksUsage.clear()
16 }
17 }
18 class StubPoolWithIsMainMethod extends FixedThreadPool {
19 isMain () {
20 return false
21 }
22 }
23
24 it('Simulate pool creation from a non main thread/process', () => {
25 expect(
26 () =>
27 new StubPoolWithIsMainMethod(
28 numberOfWorkers,
29 './tests/worker-files/thread/testWorker.js',
30 {
31 errorHandler: e => console.error(e)
32 }
33 )
34 ).toThrowError(new Error('Cannot start a pool from a worker!'))
35 })
36
37 it('Verify that filePath is checked', () => {
38 const expectedError = new Error(
39 'Please specify a file with a worker implementation'
40 )
41 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
42 expectedError
43 )
44 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
45 expectedError
46 )
47 })
48
49 it('Verify that numberOfWorkers is checked', () => {
50 expect(() => new FixedThreadPool()).toThrowError(
51 new Error(
52 'Cannot instantiate a pool without specifying the number of workers'
53 )
54 )
55 })
56
57 it('Verify that a negative number of workers is checked', () => {
58 expect(
59 () =>
60 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
61 ).toThrowError(
62 new Error('Cannot instantiate a pool with a negative number of workers')
63 )
64 })
65
66 it('Verify that a non integer number of workers is checked', () => {
67 expect(
68 () =>
69 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
70 ).toThrowError(
71 new Error(
72 'Cannot instantiate a pool with a non integer number of workers'
73 )
74 )
75 })
76
77 it('Verify that pool options are checked', async () => {
78 let pool = new FixedThreadPool(
79 numberOfWorkers,
80 './tests/worker-files/thread/testWorker.js'
81 )
82 expect(pool.opts.enableEvents).toBe(true)
83 expect(pool.emitter).toBeDefined()
84 expect(pool.opts.workerChoiceStrategy).toBe(
85 WorkerChoiceStrategies.ROUND_ROBIN
86 )
87 expect(pool.opts.messageHandler).toBeUndefined()
88 expect(pool.opts.errorHandler).toBeUndefined()
89 expect(pool.opts.onlineHandler).toBeUndefined()
90 expect(pool.opts.exitHandler).toBeUndefined()
91 await pool.destroy()
92 const testHandler = () => console.log('test handler executed')
93 pool = new FixedThreadPool(
94 numberOfWorkers,
95 './tests/worker-files/thread/testWorker.js',
96 {
97 workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED,
98 enableEvents: false,
99 messageHandler: testHandler,
100 errorHandler: testHandler,
101 onlineHandler: testHandler,
102 exitHandler: testHandler
103 }
104 )
105 expect(pool.opts.enableEvents).toBe(false)
106 expect(pool.emitter).toBeUndefined()
107 expect(pool.opts.workerChoiceStrategy).toBe(
108 WorkerChoiceStrategies.LESS_RECENTLY_USED
109 )
110 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
111 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
112 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
113 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
114 await pool.destroy()
115 })
116
117 it('Simulate worker not found during increaseWorkerRunningTasks', async () => {
118 const pool = new StubPoolWithWorkerTasksUsageMapClear(
119 numberOfWorkers,
120 './tests/worker-files/cluster/testWorker.js'
121 )
122 // Simulate worker not found.
123 pool.removeAllWorker()
124 expect(() => pool.increaseWorkerRunningTasks()).toThrowError(
125 workerNotFoundInTasksUsageMapError
126 )
127 await pool.destroy()
128 })
129
130 it('Simulate worker not found during decreaseWorkerRunningTasks', async () => {
131 const pool = new StubPoolWithWorkerTasksUsageMapClear(
132 numberOfWorkers,
133 './tests/worker-files/cluster/testWorker.js',
134 {
135 errorHandler: e => console.error(e)
136 }
137 )
138 // Simulate worker not found.
139 pool.removeAllWorker()
140 expect(() => pool.decreaseWorkerRunningTasks()).toThrowError(
141 workerNotFoundInTasksUsageMapError
142 )
143 await pool.destroy()
144 })
145
146 it('Simulate worker not found during stepWorkerRunTasks', async () => {
147 const pool = new StubPoolWithWorkerTasksUsageMapClear(
148 numberOfWorkers,
149 './tests/worker-files/cluster/testWorker.js',
150 {
151 errorHandler: e => console.error(e)
152 }
153 )
154 // Simulate worker not found.
155 pool.removeAllWorker()
156 expect(() => pool.stepWorkerRunTasks()).toThrowError(
157 workerNotFoundInTasksUsageMapError
158 )
159 await pool.destroy()
160 })
161
162 it('Simulate worker not found during updateWorkerTasksRunTime with strategy not requiring it', async () => {
163 const pool = new StubPoolWithWorkerTasksUsageMapClear(
164 numberOfWorkers,
165 './tests/worker-files/cluster/testWorker.js',
166 {
167 errorHandler: e => console.error(e)
168 }
169 )
170 // Simulate worker not found.
171 pool.removeAllWorker()
172 expect(() => pool.updateWorkerTasksRunTime()).not.toThrowError()
173 await pool.destroy()
174 })
175
176 it('Simulate worker not found during updateWorkerTasksRunTime with strategy requiring it', async () => {
177 const pool = new StubPoolWithWorkerTasksUsageMapClear(
178 numberOfWorkers,
179 './tests/worker-files/cluster/testWorker.js',
180 {
181 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
182 errorHandler: e => console.error(e)
183 }
184 )
185 // Simulate worker not found.
186 pool.removeAllWorker()
187 expect(() => pool.updateWorkerTasksRunTime()).toThrowError(
188 workerNotFoundInTasksUsageMapError
189 )
190 await pool.destroy()
191 })
192
193 it('Verify that worker pool tasks usage are initialized', async () => {
194 const pool = new FixedClusterPool(
195 numberOfWorkers,
196 './tests/worker-files/cluster/testWorker.js'
197 )
198 for (const tasksUsage of pool.workersTasksUsage.values()) {
199 expect(tasksUsage).toBeDefined()
200 expect(tasksUsage.run).toBe(0)
201 expect(tasksUsage.running).toBe(0)
202 expect(tasksUsage.runTime).toBe(0)
203 expect(tasksUsage.avgRunTime).toBe(0)
204 }
205 await pool.destroy()
206 })
207
208 it('Verify that worker pool tasks usage are computed', async () => {
209 const pool = new FixedClusterPool(
210 numberOfWorkers,
211 './tests/worker-files/cluster/testWorker.js'
212 )
213 const promises = []
214 for (let i = 0; i < numberOfWorkers * 2; i++) {
215 promises.push(pool.execute())
216 }
217 for (const tasksUsage of pool.workersTasksUsage.values()) {
218 expect(tasksUsage).toBeDefined()
219 expect(tasksUsage.run).toBe(0)
220 expect(tasksUsage.running).toBe(numberOfWorkers * 2)
221 expect(tasksUsage.runTime).toBe(0)
222 expect(tasksUsage.avgRunTime).toBe(0)
223 }
224 await Promise.all(promises)
225 for (const tasksUsage of pool.workersTasksUsage.values()) {
226 expect(tasksUsage).toBeDefined()
227 expect(tasksUsage.run).toBe(numberOfWorkers * 2)
228 expect(tasksUsage.running).toBe(0)
229 expect(tasksUsage.runTime).toBeGreaterThanOrEqual(0)
230 expect(tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
231 }
232 await pool.destroy()
233 })
234
235 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
236 const pool = new FixedThreadPool(
237 numberOfWorkers,
238 './tests/worker-files/thread/testWorker.js'
239 )
240 const promises = []
241 for (let i = 0; i < numberOfWorkers * 2; i++) {
242 promises.push(pool.execute())
243 }
244 await Promise.all(promises)
245 for (const tasksUsage of pool.workersTasksUsage.values()) {
246 expect(tasksUsage).toBeDefined()
247 expect(tasksUsage.run).toBe(numberOfWorkers * 2)
248 expect(tasksUsage.running).toBe(0)
249 expect(tasksUsage.runTime).toBeGreaterThanOrEqual(0)
250 expect(tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
251 }
252 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
253 for (const tasksUsage of pool.workersTasksUsage.values()) {
254 expect(tasksUsage).toBeDefined()
255 expect(tasksUsage.run).toBe(0)
256 expect(tasksUsage.running).toBe(0)
257 expect(tasksUsage.runTime).toBe(0)
258 expect(tasksUsage.avgRunTime).toBe(0)
259 }
260 await pool.destroy()
261 })
262
263 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
264 const pool = new FixedThreadPool(
265 numberOfWorkers,
266 './tests/worker-files/thread/testWorker.js'
267 )
268 const promises = []
269 let poolBusy = 0
270 pool.emitter.on('busy', () => poolBusy++)
271 for (let i = 0; i < numberOfWorkers * 2; i++) {
272 promises.push(pool.execute())
273 }
274 await Promise.all(promises)
275 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
276 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
277 expect(poolBusy).toBe(numberOfWorkers + 1)
278 await pool.destroy()
279 })
280 })