refactor: cleanup task usage properties namespace
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies,
9 PoolTypes,
10 WorkerTypes
11 } = require('../../../lib')
12 const { CircularArray } = require('../../../lib/circular-array')
13 const { Queue } = require('../../../lib/queue')
14
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers = 2
17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
18 removeAllWorker () {
19 this.workerNodes = []
20 this.promiseResponseMap.clear()
21 }
22 }
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 it('Simulate pool creation from a non main thread/process', () => {
30 expect(
31 () =>
32 new StubPoolWithIsMain(
33 numberOfWorkers,
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
39 ).toThrowError('Cannot start a pool from a worker!')
40 })
41
42 it('Verify that filePath is checked', () => {
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
47 expectedError
48 )
49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
50 expectedError
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
76 new TypeError(
77 'Cannot instantiate a pool with a non safe integer number of workers'
78 )
79 )
80 })
81
82 it('Verify that pool options are checked', async () => {
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
87 expect(pool.emitter).toBeDefined()
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.opts.restartWorkerOnError).toBe(true)
90 expect(pool.opts.enableTasksQueue).toBe(false)
91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 medRunTime: false,
97 medWaitTime: false
98 })
99 expect(pool.opts.messageHandler).toBeUndefined()
100 expect(pool.opts.errorHandler).toBeUndefined()
101 expect(pool.opts.onlineHandler).toBeUndefined()
102 expect(pool.opts.exitHandler).toBeUndefined()
103 await pool.destroy()
104 const testHandler = () => console.log('test handler executed')
105 pool = new FixedThreadPool(
106 numberOfWorkers,
107 './tests/worker-files/thread/testWorker.js',
108 {
109 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
110 workerChoiceStrategyOptions: {
111 medRunTime: true,
112 weights: { 0: 300, 1: 200 }
113 },
114 enableEvents: false,
115 restartWorkerOnError: false,
116 enableTasksQueue: true,
117 tasksQueueOptions: { concurrency: 2 },
118 messageHandler: testHandler,
119 errorHandler: testHandler,
120 onlineHandler: testHandler,
121 exitHandler: testHandler
122 }
123 )
124 expect(pool.emitter).toBeUndefined()
125 expect(pool.opts.enableEvents).toBe(false)
126 expect(pool.opts.restartWorkerOnError).toBe(false)
127 expect(pool.opts.enableTasksQueue).toBe(true)
128 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
129 expect(pool.opts.workerChoiceStrategy).toBe(
130 WorkerChoiceStrategies.LEAST_USED
131 )
132 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
133 medRunTime: true,
134 weights: { 0: 300, 1: 200 }
135 })
136 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
137 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
138 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
139 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
140 await pool.destroy()
141 })
142
143 it('Verify that pool options are validated', async () => {
144 expect(
145 () =>
146 new FixedThreadPool(
147 numberOfWorkers,
148 './tests/worker-files/thread/testWorker.js',
149 {
150 enableTasksQueue: true,
151 tasksQueueOptions: { concurrency: 0 }
152 }
153 )
154 ).toThrowError("Invalid worker tasks concurrency '0'")
155 expect(
156 () =>
157 new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js',
160 {
161 workerChoiceStrategy: 'invalidStrategy'
162 }
163 )
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
165 expect(
166 () =>
167 new FixedThreadPool(
168 numberOfWorkers,
169 './tests/worker-files/thread/testWorker.js',
170 {
171 workerChoiceStrategyOptions: { weights: {} }
172 }
173 )
174 ).toThrowError(
175 'Invalid worker choice strategy options: must have a weight for each worker node'
176 )
177 })
178
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool = new FixedThreadPool(
181 numberOfWorkers,
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
184 )
185 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
186 medRunTime: false,
187 medWaitTime: false
188 })
189 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
190 .workerChoiceStrategies) {
191 expect(workerChoiceStrategy.opts).toStrictEqual({
192 medRunTime: false,
193 medWaitTime: false
194 })
195 }
196 expect(
197 pool.workerChoiceStrategyContext.getRequiredStatistics()
198 ).toStrictEqual({
199 runTime: true,
200 avgRunTime: true,
201 medRunTime: false,
202 waitTime: false,
203 avgWaitTime: false,
204 medWaitTime: false
205 })
206 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
207 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
208 medRunTime: true
209 })
210 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
211 .workerChoiceStrategies) {
212 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
213 }
214 expect(
215 pool.workerChoiceStrategyContext.getRequiredStatistics()
216 ).toStrictEqual({
217 runTime: true,
218 avgRunTime: false,
219 medRunTime: true,
220 waitTime: false,
221 avgWaitTime: false,
222 medWaitTime: false
223 })
224 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
225 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
226 medRunTime: false
227 })
228 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
229 .workerChoiceStrategies) {
230 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
231 }
232 expect(
233 pool.workerChoiceStrategyContext.getRequiredStatistics()
234 ).toStrictEqual({
235 runTime: true,
236 avgRunTime: true,
237 medRunTime: false,
238 waitTime: false,
239 avgWaitTime: false,
240 medWaitTime: false
241 })
242 await pool.destroy()
243 })
244
245 it('Verify that tasks queue can be enabled/disabled', async () => {
246 const pool = new FixedThreadPool(
247 numberOfWorkers,
248 './tests/worker-files/thread/testWorker.js'
249 )
250 expect(pool.opts.enableTasksQueue).toBe(false)
251 expect(pool.opts.tasksQueueOptions).toBeUndefined()
252 pool.enableTasksQueue(true)
253 expect(pool.opts.enableTasksQueue).toBe(true)
254 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
255 pool.enableTasksQueue(true, { concurrency: 2 })
256 expect(pool.opts.enableTasksQueue).toBe(true)
257 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
258 pool.enableTasksQueue(false)
259 expect(pool.opts.enableTasksQueue).toBe(false)
260 expect(pool.opts.tasksQueueOptions).toBeUndefined()
261 await pool.destroy()
262 })
263
264 it('Verify that tasks queue options can be set', async () => {
265 const pool = new FixedThreadPool(
266 numberOfWorkers,
267 './tests/worker-files/thread/testWorker.js',
268 { enableTasksQueue: true }
269 )
270 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
271 pool.setTasksQueueOptions({ concurrency: 2 })
272 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
273 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
274 "Invalid worker tasks concurrency '0'"
275 )
276 await pool.destroy()
277 })
278
279 it('Verify that pool info is set', async () => {
280 let pool = new FixedThreadPool(
281 numberOfWorkers,
282 './tests/worker-files/thread/testWorker.js'
283 )
284 expect(pool.info).toStrictEqual({
285 type: PoolTypes.fixed,
286 worker: WorkerTypes.thread,
287 minSize: numberOfWorkers,
288 maxSize: numberOfWorkers,
289 workerNodes: numberOfWorkers,
290 idleWorkerNodes: numberOfWorkers,
291 busyWorkerNodes: 0,
292 runningTasks: 0,
293 queuedTasks: 0,
294 maxQueuedTasks: 0
295 })
296 await pool.destroy()
297 pool = new DynamicClusterPool(
298 numberOfWorkers,
299 numberOfWorkers * 2,
300 './tests/worker-files/thread/testWorker.js'
301 )
302 expect(pool.info).toStrictEqual({
303 type: PoolTypes.dynamic,
304 worker: WorkerTypes.cluster,
305 minSize: numberOfWorkers,
306 maxSize: numberOfWorkers * 2,
307 workerNodes: numberOfWorkers,
308 idleWorkerNodes: numberOfWorkers,
309 busyWorkerNodes: 0,
310 runningTasks: 0,
311 queuedTasks: 0,
312 maxQueuedTasks: 0
313 })
314 await pool.destroy()
315 })
316
317 it('Simulate worker not found', async () => {
318 const pool = new StubPoolWithRemoveAllWorker(
319 numberOfWorkers,
320 './tests/worker-files/cluster/testWorker.js',
321 {
322 errorHandler: e => console.error(e)
323 }
324 )
325 expect(pool.workerNodes.length).toBe(numberOfWorkers)
326 // Simulate worker not found.
327 pool.removeAllWorker()
328 expect(pool.workerNodes.length).toBe(0)
329 await pool.destroy()
330 })
331
332 it('Verify that worker pool tasks usage are initialized', async () => {
333 const pool = new FixedClusterPool(
334 numberOfWorkers,
335 './tests/worker-files/cluster/testWorker.js'
336 )
337 for (const workerNode of pool.workerNodes) {
338 expect(workerNode.tasksUsage).toStrictEqual({
339 ran: 0,
340 running: 0,
341 runTime: 0,
342 runTimeHistory: expect.any(CircularArray),
343 avgRunTime: 0,
344 medRunTime: 0,
345 waitTime: 0,
346 waitTimeHistory: expect.any(CircularArray),
347 avgWaitTime: 0,
348 medWaitTime: 0,
349 error: 0
350 })
351 }
352 await pool.destroy()
353 })
354
355 it('Verify that worker pool tasks queue are initialized', async () => {
356 const pool = new FixedClusterPool(
357 numberOfWorkers,
358 './tests/worker-files/cluster/testWorker.js'
359 )
360 for (const workerNode of pool.workerNodes) {
361 expect(workerNode.tasksQueue).toBeDefined()
362 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
363 expect(workerNode.tasksQueue.size).toBe(0)
364 }
365 await pool.destroy()
366 })
367
368 it('Verify that worker pool tasks usage are computed', async () => {
369 const pool = new FixedClusterPool(
370 numberOfWorkers,
371 './tests/worker-files/cluster/testWorker.js'
372 )
373 const promises = new Set()
374 const maxMultiplier = 2
375 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
376 promises.add(pool.execute())
377 }
378 for (const workerNode of pool.workerNodes) {
379 expect(workerNode.tasksUsage).toStrictEqual({
380 ran: 0,
381 running: maxMultiplier,
382 runTime: 0,
383 runTimeHistory: expect.any(CircularArray),
384 avgRunTime: 0,
385 medRunTime: 0,
386 waitTime: 0,
387 waitTimeHistory: expect.any(CircularArray),
388 avgWaitTime: 0,
389 medWaitTime: 0,
390 error: 0
391 })
392 }
393 await Promise.all(promises)
394 for (const workerNode of pool.workerNodes) {
395 expect(workerNode.tasksUsage).toStrictEqual({
396 ran: maxMultiplier,
397 running: 0,
398 runTime: 0,
399 runTimeHistory: expect.any(CircularArray),
400 avgRunTime: 0,
401 medRunTime: 0,
402 waitTime: 0,
403 waitTimeHistory: expect.any(CircularArray),
404 avgWaitTime: 0,
405 medWaitTime: 0,
406 error: 0
407 })
408 }
409 await pool.destroy()
410 })
411
412 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
413 const pool = new DynamicThreadPool(
414 numberOfWorkers,
415 numberOfWorkers,
416 './tests/worker-files/thread/testWorker.js'
417 )
418 const promises = new Set()
419 const maxMultiplier = 2
420 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
421 promises.add(pool.execute())
422 }
423 await Promise.all(promises)
424 for (const workerNode of pool.workerNodes) {
425 expect(workerNode.tasksUsage).toStrictEqual({
426 ran: expect.any(Number),
427 running: 0,
428 runTime: 0,
429 runTimeHistory: expect.any(CircularArray),
430 avgRunTime: 0,
431 medRunTime: 0,
432 waitTime: 0,
433 waitTimeHistory: expect.any(CircularArray),
434 avgWaitTime: 0,
435 medWaitTime: 0,
436 error: 0
437 })
438 expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
439 expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
440 }
441 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
442 for (const workerNode of pool.workerNodes) {
443 expect(workerNode.tasksUsage).toStrictEqual({
444 ran: 0,
445 running: 0,
446 runTime: 0,
447 runTimeHistory: expect.any(CircularArray),
448 avgRunTime: 0,
449 medRunTime: 0,
450 waitTime: 0,
451 waitTimeHistory: expect.any(CircularArray),
452 avgWaitTime: 0,
453 medWaitTime: 0,
454 error: 0
455 })
456 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
457 expect(workerNode.tasksUsage.waitTimeHistory.length).toBe(0)
458 }
459 await pool.destroy()
460 })
461
462 it("Verify that pool event emitter 'full' event can register a callback", async () => {
463 const pool = new DynamicThreadPool(
464 numberOfWorkers,
465 numberOfWorkers,
466 './tests/worker-files/thread/testWorker.js'
467 )
468 const promises = new Set()
469 let poolFull = 0
470 pool.emitter.on(PoolEvents.full, () => ++poolFull)
471 for (let i = 0; i < numberOfWorkers * 2; i++) {
472 promises.add(pool.execute())
473 }
474 await Promise.all(promises)
475 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
476 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
477 expect(poolFull).toBe(numberOfWorkers * 2)
478 await pool.destroy()
479 })
480
481 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
482 const pool = new FixedThreadPool(
483 numberOfWorkers,
484 './tests/worker-files/thread/testWorker.js'
485 )
486 const promises = new Set()
487 let poolBusy = 0
488 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
489 for (let i = 0; i < numberOfWorkers * 2; i++) {
490 promises.add(pool.execute())
491 }
492 await Promise.all(promises)
493 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
494 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
495 expect(poolBusy).toBe(numberOfWorkers + 1)
496 await pool.destroy()
497 })
498
499 it('Verify that multiple tasks worker is working', async () => {
500 const pool = new DynamicClusterPool(
501 numberOfWorkers,
502 numberOfWorkers * 2,
503 './tests/worker-files/cluster/testMultiTasksWorker.js'
504 )
505 const data = { n: 10 }
506 const result0 = await pool.execute(data)
507 expect(result0).toBe(false)
508 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
509 expect(result1).toBe(false)
510 const result2 = await pool.execute(data, 'factorial')
511 expect(result2).toBe(3628800)
512 const result3 = await pool.execute(data, 'fibonacci')
513 expect(result3).toBe(89)
514 })
515 })