feat: add pool runtime setters
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 PoolEvents,
7 WorkerChoiceStrategies
8 } = require('../../../lib/index')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers = 1
13 const workerNotFoundInPoolError = new Error(
14 'Worker could not be found in the pool worker nodes'
15 )
16 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
17 removeAllWorker () {
18 this.workerNodes = []
19 this.promiseResponseMap.clear()
20 }
21 }
22 class StubPoolWithIsMain extends FixedThreadPool {
23 isMain () {
24 return false
25 }
26 }
27
28 it('Simulate pool creation from a non main thread/process', () => {
29 expect(
30 () =>
31 new StubPoolWithIsMain(
32 numberOfWorkers,
33 './tests/worker-files/thread/testWorker.js',
34 {
35 errorHandler: e => console.error(e)
36 }
37 )
38 ).toThrowError('Cannot start a pool from a worker!')
39 })
40
41 it('Verify that filePath is checked', () => {
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
46 expectedError
47 )
48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
49 expectedError
50 )
51 })
52
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
55 'Cannot instantiate a pool without specifying the number of workers'
56 )
57 })
58
59 it('Verify that a negative number of workers is checked', () => {
60 expect(
61 () =>
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
63 ).toThrowError(
64 new RangeError(
65 'Cannot instantiate a pool with a negative number of workers'
66 )
67 )
68 })
69
70 it('Verify that a non integer number of workers is checked', () => {
71 expect(
72 () =>
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
74 ).toThrowError(
75 new TypeError(
76 'Cannot instantiate a pool with a non integer number of workers'
77 )
78 )
79 })
80
81 it('Verify that pool options are checked', async () => {
82 let pool = new FixedThreadPool(
83 numberOfWorkers,
84 './tests/worker-files/thread/testWorker.js'
85 )
86 expect(pool.opts.enableEvents).toBe(true)
87 expect(pool.emitter).toBeDefined()
88 expect(pool.opts.enableTasksQueue).toBe(false)
89 expect(pool.opts.tasksQueueOptions).toBeUndefined()
90 expect(pool.opts.workerChoiceStrategy).toBe(
91 WorkerChoiceStrategies.ROUND_ROBIN
92 )
93 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
94 medRunTime: false
95 })
96 expect(pool.opts.messageHandler).toBeUndefined()
97 expect(pool.opts.errorHandler).toBeUndefined()
98 expect(pool.opts.onlineHandler).toBeUndefined()
99 expect(pool.opts.exitHandler).toBeUndefined()
100 await pool.destroy()
101 const testHandler = () => console.log('test handler executed')
102 pool = new FixedThreadPool(
103 numberOfWorkers,
104 './tests/worker-files/thread/testWorker.js',
105 {
106 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
107 workerChoiceStrategyOptions: { medRunTime: true },
108 enableEvents: false,
109 enableTasksQueue: true,
110 tasksQueueOptions: { concurrency: 2 },
111 messageHandler: testHandler,
112 errorHandler: testHandler,
113 onlineHandler: testHandler,
114 exitHandler: testHandler
115 }
116 )
117 expect(pool.opts.enableEvents).toBe(false)
118 expect(pool.emitter).toBeUndefined()
119 expect(pool.opts.enableTasksQueue).toBe(true)
120 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
121 expect(pool.opts.workerChoiceStrategy).toBe(
122 WorkerChoiceStrategies.LESS_USED
123 )
124 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
125 medRunTime: true
126 })
127 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
128 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
129 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
130 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
131 await pool.destroy()
132 })
133
134 it('Verify that pool options are validated', async () => {
135 expect(
136 () =>
137 new FixedThreadPool(
138 numberOfWorkers,
139 './tests/worker-files/thread/testWorker.js',
140 {
141 enableTasksQueue: true,
142 tasksQueueOptions: { concurrency: 0 }
143 }
144 )
145 ).toThrowError("Invalid worker tasks concurrency '0'")
146 expect(
147 () =>
148 new FixedThreadPool(
149 numberOfWorkers,
150 './tests/worker-files/thread/testWorker.js',
151 {
152 workerChoiceStrategy: 'invalidStrategy'
153 }
154 )
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
156 })
157
158 it('Verify that worker choice strategy options can be set', async () => {
159 const pool = new FixedThreadPool(
160 numberOfWorkers,
161 './tests/worker-files/thread/testWorker.js',
162 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
163 )
164 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
165 medRunTime: false
166 })
167 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
168 .workerChoiceStrategies) {
169 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
170 }
171 expect(
172 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
173 ).toBe(true)
174 expect(
175 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
176 ).toBe(false)
177 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
178 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
179 medRunTime: true
180 })
181 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
182 .workerChoiceStrategies) {
183 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
184 }
185 expect(
186 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
187 ).toBe(false)
188 expect(
189 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
190 ).toBe(true)
191 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
192 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
193 medRunTime: false
194 })
195 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
196 .workerChoiceStrategies) {
197 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
198 }
199 expect(
200 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
201 ).toBe(true)
202 expect(
203 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
204 ).toBe(false)
205 await pool.destroy()
206 })
207
208 it('Verify that tasks queue can be enabled/disabled', async () => {
209 const pool = new FixedThreadPool(
210 numberOfWorkers,
211 './tests/worker-files/thread/testWorker.js'
212 )
213 expect(pool.opts.enableTasksQueue).toBe(false)
214 expect(pool.opts.tasksQueueOptions).toBeUndefined()
215 pool.enableTasksQueue(true)
216 expect(pool.opts.enableTasksQueue).toBe(true)
217 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
218 pool.enableTasksQueue(true, { concurrency: 2 })
219 expect(pool.opts.enableTasksQueue).toBe(true)
220 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
221 pool.enableTasksQueue(false)
222 expect(pool.opts.enableTasksQueue).toBe(false)
223 expect(pool.opts.tasksQueueOptions).toBeUndefined()
224 await pool.destroy()
225 })
226
227 it('Verify that tasks queue options can be set', async () => {
228 const pool = new FixedThreadPool(
229 numberOfWorkers,
230 './tests/worker-files/thread/testWorker.js',
231 { enableTasksQueue: true }
232 )
233 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
234 pool.setTasksQueueOptions({ concurrency: 2 })
235 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
236 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
237 "Invalid worker tasks concurrency '0'"
238 )
239 await pool.destroy()
240 })
241
242 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
243 const pool = new StubPoolWithRemoveAllWorker(
244 numberOfWorkers,
245 './tests/worker-files/cluster/testWorker.js',
246 {
247 errorHandler: e => console.error(e)
248 }
249 )
250 expect(pool.workerNodes.length).toBe(numberOfWorkers)
251 // Simulate worker not found.
252 pool.removeAllWorker()
253 expect(pool.workerNodes.length).toBe(0)
254 expect(() => pool.getWorkerTasksUsage()).toThrowError(
255 workerNotFoundInPoolError
256 )
257 await pool.destroy()
258 })
259
260 it('Verify that worker pool tasks usage are initialized', async () => {
261 const pool = new FixedClusterPool(
262 numberOfWorkers,
263 './tests/worker-files/cluster/testWorker.js'
264 )
265 for (const workerNode of pool.workerNodes) {
266 expect(workerNode.tasksUsage).toBeDefined()
267 expect(workerNode.tasksUsage.run).toBe(0)
268 expect(workerNode.tasksUsage.running).toBe(0)
269 expect(workerNode.tasksUsage.runTime).toBe(0)
270 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
271 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
272 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
273 expect(workerNode.tasksUsage.medRunTime).toBe(0)
274 expect(workerNode.tasksUsage.error).toBe(0)
275 }
276 await pool.destroy()
277 })
278
279 it('Verify that worker pool tasks queue are initialized', async () => {
280 const pool = new FixedClusterPool(
281 numberOfWorkers,
282 './tests/worker-files/cluster/testWorker.js'
283 )
284 for (const workerNode of pool.workerNodes) {
285 expect(workerNode.tasksQueue).toBeDefined()
286 expect(workerNode.tasksQueue).toBeInstanceOf(Array)
287 expect(workerNode.tasksQueue.length).toBe(0)
288 }
289 await pool.destroy()
290 })
291
292 it('Verify that worker pool tasks usage are computed', async () => {
293 const pool = new FixedClusterPool(
294 numberOfWorkers,
295 './tests/worker-files/cluster/testWorker.js'
296 )
297 const promises = []
298 for (let i = 0; i < numberOfWorkers * 2; i++) {
299 promises.push(pool.execute())
300 }
301 for (const workerNode of pool.workerNodes) {
302 expect(workerNode.tasksUsage).toBeDefined()
303 expect(workerNode.tasksUsage.run).toBe(0)
304 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
305 expect(workerNode.tasksUsage.runTime).toBe(0)
306 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
307 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
308 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
309 expect(workerNode.tasksUsage.medRunTime).toBe(0)
310 expect(workerNode.tasksUsage.error).toBe(0)
311 }
312 await Promise.all(promises)
313 for (const workerNode of pool.workerNodes) {
314 expect(workerNode.tasksUsage).toBeDefined()
315 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
316 expect(workerNode.tasksUsage.running).toBe(0)
317 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
318 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
319 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
320 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
321 expect(workerNode.tasksUsage.medRunTime).toBe(0)
322 expect(workerNode.tasksUsage.error).toBe(0)
323 }
324 await pool.destroy()
325 })
326
327 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
328 const pool = new DynamicThreadPool(
329 numberOfWorkers,
330 numberOfWorkers,
331 './tests/worker-files/thread/testWorker.js'
332 )
333 const promises = []
334 for (let i = 0; i < numberOfWorkers * 2; i++) {
335 promises.push(pool.execute())
336 }
337 await Promise.all(promises)
338 for (const workerNode of pool.workerNodes) {
339 expect(workerNode.tasksUsage).toBeDefined()
340 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
341 expect(workerNode.tasksUsage.running).toBe(0)
342 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
343 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
344 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
345 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
346 expect(workerNode.tasksUsage.medRunTime).toBe(0)
347 expect(workerNode.tasksUsage.error).toBe(0)
348 }
349 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
350 for (const workerNode of pool.workerNodes) {
351 expect(workerNode.tasksUsage).toBeDefined()
352 expect(workerNode.tasksUsage.run).toBe(0)
353 expect(workerNode.tasksUsage.running).toBe(0)
354 expect(workerNode.tasksUsage.runTime).toBe(0)
355 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
356 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
357 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
358 expect(workerNode.tasksUsage.medRunTime).toBe(0)
359 expect(workerNode.tasksUsage.error).toBe(0)
360 }
361 await pool.destroy()
362 })
363
364 it("Verify that pool event emitter 'full' event can register a callback", async () => {
365 const pool = new DynamicThreadPool(
366 numberOfWorkers,
367 numberOfWorkers,
368 './tests/worker-files/thread/testWorker.js'
369 )
370 const promises = []
371 let poolFull = 0
372 pool.emitter.on(PoolEvents.full, () => ++poolFull)
373 for (let i = 0; i < numberOfWorkers * 2; i++) {
374 promises.push(pool.execute())
375 }
376 await Promise.all(promises)
377 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
378 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
379 expect(poolFull).toBe(numberOfWorkers + 1)
380 await pool.destroy()
381 })
382
383 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
384 const pool = new FixedThreadPool(
385 numberOfWorkers,
386 './tests/worker-files/thread/testWorker.js'
387 )
388 const promises = []
389 let poolBusy = 0
390 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
391 for (let i = 0; i < numberOfWorkers * 2; i++) {
392 promises.push(pool.execute())
393 }
394 await Promise.all(promises)
395 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
396 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
397 expect(poolBusy).toBe(numberOfWorkers + 1)
398 await pool.destroy()
399 })
400 })