refactor: rename worker choice strategies to sensible names
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies
9 } = require('../../../lib')
10 const { CircularArray } = require('../../../lib/circular-array')
11 const { Queue } = require('../../../lib/queue')
12
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers = 1
15 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
16 removeAllWorker () {
17 this.workerNodes = []
18 this.promiseResponseMap.clear()
19 }
20 }
21 class StubPoolWithIsMain extends FixedThreadPool {
22 isMain () {
23 return false
24 }
25 }
26
27 it('Simulate pool creation from a non main thread/process', () => {
28 expect(
29 () =>
30 new StubPoolWithIsMain(
31 numberOfWorkers,
32 './tests/worker-files/thread/testWorker.js',
33 {
34 errorHandler: e => console.error(e)
35 }
36 )
37 ).toThrowError('Cannot start a pool from a worker!')
38 })
39
40 it('Verify that filePath is checked', () => {
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
45 expectedError
46 )
47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
48 expectedError
49 )
50 })
51
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
54 'Cannot instantiate a pool without specifying the number of workers'
55 )
56 })
57
58 it('Verify that a negative number of workers is checked', () => {
59 expect(
60 () =>
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 ).toThrowError(
63 new RangeError(
64 'Cannot instantiate a pool with a negative number of workers'
65 )
66 )
67 })
68
69 it('Verify that a non integer number of workers is checked', () => {
70 expect(
71 () =>
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 ).toThrowError(
74 new TypeError(
75 'Cannot instantiate a pool with a non safe integer number of workers'
76 )
77 )
78 })
79
80 it('Verify that pool options are checked', async () => {
81 let pool = new FixedThreadPool(
82 numberOfWorkers,
83 './tests/worker-files/thread/testWorker.js'
84 )
85 expect(pool.opts.enableEvents).toBe(true)
86 expect(pool.emitter).toBeDefined()
87 expect(pool.opts.enableTasksQueue).toBe(false)
88 expect(pool.opts.tasksQueueOptions).toBeUndefined()
89 expect(pool.opts.workerChoiceStrategy).toBe(
90 WorkerChoiceStrategies.ROUND_ROBIN
91 )
92 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
93 medRunTime: false
94 })
95 expect(pool.opts.messageHandler).toBeUndefined()
96 expect(pool.opts.errorHandler).toBeUndefined()
97 expect(pool.opts.onlineHandler).toBeUndefined()
98 expect(pool.opts.exitHandler).toBeUndefined()
99 await pool.destroy()
100 const testHandler = () => console.log('test handler executed')
101 pool = new FixedThreadPool(
102 numberOfWorkers,
103 './tests/worker-files/thread/testWorker.js',
104 {
105 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
106 workerChoiceStrategyOptions: {
107 medRunTime: true,
108 weights: { 0: 300 }
109 },
110 enableEvents: false,
111 enableTasksQueue: true,
112 tasksQueueOptions: { concurrency: 2 },
113 messageHandler: testHandler,
114 errorHandler: testHandler,
115 onlineHandler: testHandler,
116 exitHandler: testHandler
117 }
118 )
119 expect(pool.opts.enableEvents).toBe(false)
120 expect(pool.emitter).toBeUndefined()
121 expect(pool.opts.enableTasksQueue).toBe(true)
122 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
123 expect(pool.opts.workerChoiceStrategy).toBe(
124 WorkerChoiceStrategies.LEAST_USED
125 )
126 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
127 medRunTime: true,
128 weights: { 0: 300 }
129 })
130 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
131 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
132 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
133 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
134 await pool.destroy()
135 })
136
137 it('Verify that pool options are validated', async () => {
138 expect(
139 () =>
140 new FixedThreadPool(
141 numberOfWorkers,
142 './tests/worker-files/thread/testWorker.js',
143 {
144 enableTasksQueue: true,
145 tasksQueueOptions: { concurrency: 0 }
146 }
147 )
148 ).toThrowError("Invalid worker tasks concurrency '0'")
149 expect(
150 () =>
151 new FixedThreadPool(
152 numberOfWorkers,
153 './tests/worker-files/thread/testWorker.js',
154 {
155 workerChoiceStrategy: 'invalidStrategy'
156 }
157 )
158 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
159 expect(
160 () =>
161 new FixedThreadPool(
162 numberOfWorkers,
163 './tests/worker-files/thread/testWorker.js',
164 {
165 workerChoiceStrategyOptions: { weights: {} }
166 }
167 )
168 ).toThrowError(
169 'Invalid worker choice strategy options: must have a weight for each worker node'
170 )
171 })
172
173 it('Verify that worker choice strategy options can be set', async () => {
174 const pool = new FixedThreadPool(
175 numberOfWorkers,
176 './tests/worker-files/thread/testWorker.js',
177 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
178 )
179 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
180 medRunTime: false
181 })
182 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
183 .workerChoiceStrategies) {
184 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
185 }
186 expect(
187 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
188 ).toBe(true)
189 expect(
190 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
191 ).toBe(false)
192 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
193 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
194 medRunTime: true
195 })
196 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
197 .workerChoiceStrategies) {
198 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
199 }
200 expect(
201 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
202 ).toBe(false)
203 expect(
204 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
205 ).toBe(true)
206 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
207 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
208 medRunTime: false
209 })
210 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
211 .workerChoiceStrategies) {
212 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
213 }
214 expect(
215 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
216 ).toBe(true)
217 expect(
218 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
219 ).toBe(false)
220 await pool.destroy()
221 })
222
223 it('Verify that tasks queue can be enabled/disabled', async () => {
224 const pool = new FixedThreadPool(
225 numberOfWorkers,
226 './tests/worker-files/thread/testWorker.js'
227 )
228 expect(pool.opts.enableTasksQueue).toBe(false)
229 expect(pool.opts.tasksQueueOptions).toBeUndefined()
230 pool.enableTasksQueue(true)
231 expect(pool.opts.enableTasksQueue).toBe(true)
232 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
233 pool.enableTasksQueue(true, { concurrency: 2 })
234 expect(pool.opts.enableTasksQueue).toBe(true)
235 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
236 pool.enableTasksQueue(false)
237 expect(pool.opts.enableTasksQueue).toBe(false)
238 expect(pool.opts.tasksQueueOptions).toBeUndefined()
239 await pool.destroy()
240 })
241
242 it('Verify that tasks queue options can be set', async () => {
243 const pool = new FixedThreadPool(
244 numberOfWorkers,
245 './tests/worker-files/thread/testWorker.js',
246 { enableTasksQueue: true }
247 )
248 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
249 pool.setTasksQueueOptions({ concurrency: 2 })
250 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
251 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
252 "Invalid worker tasks concurrency '0'"
253 )
254 await pool.destroy()
255 })
256
257 it('Simulate worker not found', async () => {
258 const pool = new StubPoolWithRemoveAllWorker(
259 numberOfWorkers,
260 './tests/worker-files/cluster/testWorker.js',
261 {
262 errorHandler: e => console.error(e)
263 }
264 )
265 expect(pool.workerNodes.length).toBe(numberOfWorkers)
266 // Simulate worker not found.
267 pool.removeAllWorker()
268 expect(pool.workerNodes.length).toBe(0)
269 await pool.destroy()
270 })
271
272 it('Verify that worker pool tasks usage are initialized', async () => {
273 const pool = new FixedClusterPool(
274 numberOfWorkers,
275 './tests/worker-files/cluster/testWorker.js'
276 )
277 for (const workerNode of pool.workerNodes) {
278 expect(workerNode.tasksUsage).toBeDefined()
279 expect(workerNode.tasksUsage.run).toBe(0)
280 expect(workerNode.tasksUsage.running).toBe(0)
281 expect(workerNode.tasksUsage.runTime).toBe(0)
282 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
283 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
284 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
285 expect(workerNode.tasksUsage.medRunTime).toBe(0)
286 expect(workerNode.tasksUsage.error).toBe(0)
287 }
288 await pool.destroy()
289 })
290
291 it('Verify that worker pool tasks queue are initialized', async () => {
292 const pool = new FixedClusterPool(
293 numberOfWorkers,
294 './tests/worker-files/cluster/testWorker.js'
295 )
296 for (const workerNode of pool.workerNodes) {
297 expect(workerNode.tasksQueue).toBeDefined()
298 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
299 expect(workerNode.tasksQueue.size).toBe(0)
300 }
301 await pool.destroy()
302 })
303
304 it('Verify that worker pool tasks usage are computed', async () => {
305 const pool = new FixedClusterPool(
306 numberOfWorkers,
307 './tests/worker-files/cluster/testWorker.js'
308 )
309 const promises = []
310 for (let i = 0; i < numberOfWorkers * 2; i++) {
311 promises.push(pool.execute())
312 }
313 for (const workerNode of pool.workerNodes) {
314 expect(workerNode.tasksUsage).toBeDefined()
315 expect(workerNode.tasksUsage.run).toBe(0)
316 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
317 expect(workerNode.tasksUsage.runTime).toBe(0)
318 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
319 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
320 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
321 expect(workerNode.tasksUsage.medRunTime).toBe(0)
322 expect(workerNode.tasksUsage.error).toBe(0)
323 }
324 await Promise.all(promises)
325 for (const workerNode of pool.workerNodes) {
326 expect(workerNode.tasksUsage).toBeDefined()
327 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
328 expect(workerNode.tasksUsage.running).toBe(0)
329 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
330 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
331 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
332 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
333 expect(workerNode.tasksUsage.medRunTime).toBe(0)
334 expect(workerNode.tasksUsage.error).toBe(0)
335 }
336 await pool.destroy()
337 })
338
339 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
340 const pool = new DynamicThreadPool(
341 numberOfWorkers,
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.js'
344 )
345 const promises = []
346 for (let i = 0; i < numberOfWorkers * 2; i++) {
347 promises.push(pool.execute())
348 }
349 await Promise.all(promises)
350 for (const workerNode of pool.workerNodes) {
351 expect(workerNode.tasksUsage).toBeDefined()
352 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
353 expect(workerNode.tasksUsage.running).toBe(0)
354 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
355 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
356 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
357 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
358 expect(workerNode.tasksUsage.medRunTime).toBe(0)
359 expect(workerNode.tasksUsage.error).toBe(0)
360 }
361 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
362 for (const workerNode of pool.workerNodes) {
363 expect(workerNode.tasksUsage).toBeDefined()
364 expect(workerNode.tasksUsage.run).toBe(0)
365 expect(workerNode.tasksUsage.running).toBe(0)
366 expect(workerNode.tasksUsage.runTime).toBe(0)
367 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
368 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
369 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
370 expect(workerNode.tasksUsage.medRunTime).toBe(0)
371 expect(workerNode.tasksUsage.error).toBe(0)
372 }
373 await pool.destroy()
374 })
375
376 it("Verify that pool event emitter 'full' event can register a callback", async () => {
377 const pool = new DynamicThreadPool(
378 numberOfWorkers,
379 numberOfWorkers,
380 './tests/worker-files/thread/testWorker.js'
381 )
382 const promises = []
383 let poolFull = 0
384 pool.emitter.on(PoolEvents.full, () => ++poolFull)
385 for (let i = 0; i < numberOfWorkers * 2; i++) {
386 promises.push(pool.execute())
387 }
388 await Promise.all(promises)
389 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
390 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
391 expect(poolFull).toBe(numberOfWorkers + 1)
392 await pool.destroy()
393 })
394
395 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
396 const pool = new FixedThreadPool(
397 numberOfWorkers,
398 './tests/worker-files/thread/testWorker.js'
399 )
400 const promises = []
401 let poolBusy = 0
402 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
403 for (let i = 0; i < numberOfWorkers * 2; i++) {
404 promises.push(pool.execute())
405 }
406 await Promise.all(promises)
407 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
408 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
409 expect(poolBusy).toBe(numberOfWorkers + 1)
410 await pool.destroy()
411 })
412
413 it('Verify that multiple tasks worker is working', async () => {
414 const pool = new DynamicClusterPool(
415 numberOfWorkers,
416 numberOfWorkers * 2,
417 './tests/worker-files/cluster/testMultiTasksWorker.js'
418 )
419 const data = { n: 10 }
420 const result0 = await pool.execute(data)
421 expect(result0).toBe(false)
422 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
423 expect(result1).toBe(false)
424 const result2 = await pool.execute(data, 'factorial')
425 expect(result2).toBe(3628800)
426 const result3 = await pool.execute(data, 'fibonacci')
427 expect(result3).toBe(89)
428 })
429 })