Merge branch 'master' into elu-strategy
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies,
9 PoolTypes,
10 WorkerTypes
11 } = require('../../../lib')
12 const { CircularArray } = require('../../../lib/circular-array')
13 const { Queue } = require('../../../lib/queue')
14
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers = 2
17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
18 removeAllWorker () {
19 this.workerNodes = []
20 this.promiseResponseMap.clear()
21 }
22 }
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 it('Simulate pool creation from a non main thread/process', () => {
30 expect(
31 () =>
32 new StubPoolWithIsMain(
33 numberOfWorkers,
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
39 ).toThrowError('Cannot start a pool from a worker!')
40 })
41
42 it('Verify that filePath is checked', () => {
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
47 expectedError
48 )
49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
50 expectedError
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
76 new TypeError(
77 'Cannot instantiate a pool with a non safe integer number of workers'
78 )
79 )
80 })
81
82 it('Verify that pool options are checked', async () => {
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
87 expect(pool.emitter).toBeDefined()
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.opts.restartWorkerOnError).toBe(true)
90 expect(pool.opts.enableTasksQueue).toBe(false)
91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 medRunTime: false,
97 medWaitTime: false
98 })
99 expect(pool.opts.messageHandler).toBeUndefined()
100 expect(pool.opts.errorHandler).toBeUndefined()
101 expect(pool.opts.onlineHandler).toBeUndefined()
102 expect(pool.opts.exitHandler).toBeUndefined()
103 await pool.destroy()
104 const testHandler = () => console.log('test handler executed')
105 pool = new FixedThreadPool(
106 numberOfWorkers,
107 './tests/worker-files/thread/testWorker.js',
108 {
109 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
110 workerChoiceStrategyOptions: {
111 medRunTime: true,
112 weights: { 0: 300, 1: 200 }
113 },
114 enableEvents: false,
115 restartWorkerOnError: false,
116 enableTasksQueue: true,
117 tasksQueueOptions: { concurrency: 2 },
118 messageHandler: testHandler,
119 errorHandler: testHandler,
120 onlineHandler: testHandler,
121 exitHandler: testHandler
122 }
123 )
124 expect(pool.emitter).toBeUndefined()
125 expect(pool.opts.enableEvents).toBe(false)
126 expect(pool.opts.restartWorkerOnError).toBe(false)
127 expect(pool.opts.enableTasksQueue).toBe(true)
128 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
129 expect(pool.opts.workerChoiceStrategy).toBe(
130 WorkerChoiceStrategies.LEAST_USED
131 )
132 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
133 medRunTime: true,
134 weights: { 0: 300, 1: 200 }
135 })
136 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
137 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
138 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
139 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
140 await pool.destroy()
141 })
142
143 it('Verify that pool options are validated', async () => {
144 expect(
145 () =>
146 new FixedThreadPool(
147 numberOfWorkers,
148 './tests/worker-files/thread/testWorker.js',
149 {
150 enableTasksQueue: true,
151 tasksQueueOptions: { concurrency: 0 }
152 }
153 )
154 ).toThrowError("Invalid worker tasks concurrency '0'")
155 expect(
156 () =>
157 new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js',
160 {
161 workerChoiceStrategy: 'invalidStrategy'
162 }
163 )
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
165 expect(
166 () =>
167 new FixedThreadPool(
168 numberOfWorkers,
169 './tests/worker-files/thread/testWorker.js',
170 {
171 workerChoiceStrategyOptions: { weights: {} }
172 }
173 )
174 ).toThrowError(
175 'Invalid worker choice strategy options: must have a weight for each worker node'
176 )
177 })
178
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool = new FixedThreadPool(
181 numberOfWorkers,
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
184 )
185 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
186 medRunTime: false,
187 medWaitTime: false
188 })
189 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
190 .workerChoiceStrategies) {
191 expect(workerChoiceStrategy.opts).toStrictEqual({
192 medRunTime: false,
193 medWaitTime: false
194 })
195 }
196 expect(
197 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
198 ).toStrictEqual({
199 runTime: true,
200 avgRunTime: true,
201 medRunTime: false,
202 waitTime: false,
203 avgWaitTime: false,
204 medWaitTime: false,
205 elu: false
206 })
207 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
208 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
209 medRunTime: true
210 })
211 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
212 .workerChoiceStrategies) {
213 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
214 }
215 expect(
216 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
217 ).toStrictEqual({
218 runTime: true,
219 avgRunTime: false,
220 medRunTime: true,
221 waitTime: false,
222 avgWaitTime: false,
223 medWaitTime: false,
224 elu: false
225 })
226 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
227 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
228 medRunTime: false
229 })
230 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
231 .workerChoiceStrategies) {
232 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
233 }
234 expect(
235 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
236 ).toStrictEqual({
237 runTime: true,
238 avgRunTime: true,
239 medRunTime: false,
240 waitTime: false,
241 avgWaitTime: false,
242 medWaitTime: false,
243 elu: false
244 })
245 await pool.destroy()
246 })
247
248 it('Verify that tasks queue can be enabled/disabled', async () => {
249 const pool = new FixedThreadPool(
250 numberOfWorkers,
251 './tests/worker-files/thread/testWorker.js'
252 )
253 expect(pool.opts.enableTasksQueue).toBe(false)
254 expect(pool.opts.tasksQueueOptions).toBeUndefined()
255 pool.enableTasksQueue(true)
256 expect(pool.opts.enableTasksQueue).toBe(true)
257 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
258 pool.enableTasksQueue(true, { concurrency: 2 })
259 expect(pool.opts.enableTasksQueue).toBe(true)
260 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
261 pool.enableTasksQueue(false)
262 expect(pool.opts.enableTasksQueue).toBe(false)
263 expect(pool.opts.tasksQueueOptions).toBeUndefined()
264 await pool.destroy()
265 })
266
267 it('Verify that tasks queue options can be set', async () => {
268 const pool = new FixedThreadPool(
269 numberOfWorkers,
270 './tests/worker-files/thread/testWorker.js',
271 { enableTasksQueue: true }
272 )
273 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
274 pool.setTasksQueueOptions({ concurrency: 2 })
275 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
276 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
277 "Invalid worker tasks concurrency '0'"
278 )
279 await pool.destroy()
280 })
281
282 it('Verify that pool info is set', async () => {
283 let pool = new FixedThreadPool(
284 numberOfWorkers,
285 './tests/worker-files/thread/testWorker.js'
286 )
287 expect(pool.info).toStrictEqual({
288 type: PoolTypes.fixed,
289 worker: WorkerTypes.thread,
290 minSize: numberOfWorkers,
291 maxSize: numberOfWorkers,
292 workerNodes: numberOfWorkers,
293 idleWorkerNodes: numberOfWorkers,
294 busyWorkerNodes: 0,
295 executedTasks: 0,
296 executingTasks: 0,
297 queuedTasks: 0,
298 maxQueuedTasks: 0,
299 failedTasks: 0
300 })
301 await pool.destroy()
302 pool = new DynamicClusterPool(
303 numberOfWorkers,
304 numberOfWorkers * 2,
305 './tests/worker-files/thread/testWorker.js'
306 )
307 expect(pool.info).toStrictEqual({
308 type: PoolTypes.dynamic,
309 worker: WorkerTypes.cluster,
310 minSize: numberOfWorkers,
311 maxSize: numberOfWorkers * 2,
312 workerNodes: numberOfWorkers,
313 idleWorkerNodes: numberOfWorkers,
314 busyWorkerNodes: 0,
315 executedTasks: 0,
316 executingTasks: 0,
317 queuedTasks: 0,
318 maxQueuedTasks: 0,
319 failedTasks: 0
320 })
321 await pool.destroy()
322 })
323
324 it('Simulate worker not found', async () => {
325 const pool = new StubPoolWithRemoveAllWorker(
326 numberOfWorkers,
327 './tests/worker-files/cluster/testWorker.js',
328 {
329 errorHandler: e => console.error(e)
330 }
331 )
332 expect(pool.workerNodes.length).toBe(numberOfWorkers)
333 // Simulate worker not found.
334 pool.removeAllWorker()
335 expect(pool.workerNodes.length).toBe(0)
336 await pool.destroy()
337 })
338
339 it('Verify that worker pool tasks usage are initialized', async () => {
340 const pool = new FixedClusterPool(
341 numberOfWorkers,
342 './tests/worker-files/cluster/testWorker.js'
343 )
344 for (const workerNode of pool.workerNodes) {
345 expect(workerNode.workerUsage).toStrictEqual({
346 tasks: {
347 executed: 0,
348 executing: 0,
349 queued: 0,
350 failed: 0
351 },
352 runTime: {
353 aggregation: 0,
354 average: 0,
355 median: 0,
356 history: expect.any(CircularArray)
357 },
358 waitTime: {
359 aggregation: 0,
360 average: 0,
361 median: 0,
362 history: expect.any(CircularArray)
363 },
364 elu: undefined
365 })
366 }
367 await pool.destroy()
368 })
369
370 it('Verify that worker pool tasks queue are initialized', async () => {
371 const pool = new FixedClusterPool(
372 numberOfWorkers,
373 './tests/worker-files/cluster/testWorker.js'
374 )
375 for (const workerNode of pool.workerNodes) {
376 expect(workerNode.tasksQueue).toBeDefined()
377 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
378 expect(workerNode.tasksQueue.size).toBe(0)
379 }
380 await pool.destroy()
381 })
382
383 it('Verify that worker pool tasks usage are computed', async () => {
384 const pool = new FixedClusterPool(
385 numberOfWorkers,
386 './tests/worker-files/cluster/testWorker.js'
387 )
388 const promises = new Set()
389 const maxMultiplier = 2
390 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
391 promises.add(pool.execute())
392 }
393 for (const workerNode of pool.workerNodes) {
394 expect(workerNode.workerUsage).toStrictEqual({
395 tasks: {
396 executed: 0,
397 executing: maxMultiplier,
398 queued: 0,
399 failed: 0
400 },
401 runTime: {
402 aggregation: 0,
403 average: 0,
404 median: 0,
405 history: expect.any(CircularArray)
406 },
407 waitTime: {
408 aggregation: 0,
409 average: 0,
410 median: 0,
411 history: expect.any(CircularArray)
412 },
413 elu: undefined
414 })
415 }
416 await Promise.all(promises)
417 for (const workerNode of pool.workerNodes) {
418 expect(workerNode.workerUsage).toStrictEqual({
419 tasks: {
420 executed: maxMultiplier,
421 executing: 0,
422 queued: 0,
423 failed: 0
424 },
425 runTime: {
426 aggregation: 0,
427 average: 0,
428 median: 0,
429 history: expect.any(CircularArray)
430 },
431 waitTime: {
432 aggregation: 0,
433 average: 0,
434 median: 0,
435 history: expect.any(CircularArray)
436 },
437 elu: undefined
438 })
439 }
440 await pool.destroy()
441 })
442
443 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
444 const pool = new DynamicThreadPool(
445 numberOfWorkers,
446 numberOfWorkers,
447 './tests/worker-files/thread/testWorker.js'
448 )
449 const promises = new Set()
450 const maxMultiplier = 2
451 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
452 promises.add(pool.execute())
453 }
454 await Promise.all(promises)
455 for (const workerNode of pool.workerNodes) {
456 expect(workerNode.workerUsage).toStrictEqual({
457 tasks: {
458 executed: expect.any(Number),
459 executing: 0,
460 queued: 0,
461 failed: 0
462 },
463 runTime: {
464 aggregation: 0,
465 average: 0,
466 median: 0,
467 history: expect.any(CircularArray)
468 },
469 waitTime: {
470 aggregation: 0,
471 average: 0,
472 median: 0,
473 history: expect.any(CircularArray)
474 },
475 elu: undefined
476 })
477 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
478 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
479 maxMultiplier
480 )
481 }
482 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
483 for (const workerNode of pool.workerNodes) {
484 expect(workerNode.workerUsage).toStrictEqual({
485 tasks: {
486 executed: 0,
487 executing: 0,
488 queued: 0,
489 failed: 0
490 },
491 runTime: {
492 aggregation: 0,
493 average: 0,
494 median: 0,
495 history: expect.any(CircularArray)
496 },
497 waitTime: {
498 aggregation: 0,
499 average: 0,
500 median: 0,
501 history: expect.any(CircularArray)
502 },
503 elu: undefined
504 })
505 expect(workerNode.workerUsage.runTime.history.length).toBe(0)
506 expect(workerNode.workerUsage.waitTime.history.length).toBe(0)
507 }
508 await pool.destroy()
509 })
510
511 it("Verify that pool event emitter 'full' event can register a callback", async () => {
512 const pool = new DynamicThreadPool(
513 numberOfWorkers,
514 numberOfWorkers,
515 './tests/worker-files/thread/testWorker.js'
516 )
517 const promises = new Set()
518 let poolFull = 0
519 let poolInfo
520 pool.emitter.on(PoolEvents.full, info => {
521 ++poolFull
522 poolInfo = info
523 })
524 for (let i = 0; i < numberOfWorkers * 2; i++) {
525 promises.add(pool.execute())
526 }
527 await Promise.all(promises)
528 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
529 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
530 expect(poolFull).toBe(numberOfWorkers * 2)
531 expect(poolInfo).toStrictEqual({
532 type: PoolTypes.dynamic,
533 worker: WorkerTypes.thread,
534 minSize: expect.any(Number),
535 maxSize: expect.any(Number),
536 workerNodes: expect.any(Number),
537 idleWorkerNodes: expect.any(Number),
538 busyWorkerNodes: expect.any(Number),
539 executedTasks: expect.any(Number),
540 executingTasks: expect.any(Number),
541 queuedTasks: expect.any(Number),
542 maxQueuedTasks: expect.any(Number),
543 failedTasks: expect.any(Number)
544 })
545 await pool.destroy()
546 })
547
548 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
549 const pool = new FixedThreadPool(
550 numberOfWorkers,
551 './tests/worker-files/thread/testWorker.js'
552 )
553 const promises = new Set()
554 let poolBusy = 0
555 let poolInfo
556 pool.emitter.on(PoolEvents.busy, info => {
557 ++poolBusy
558 poolInfo = info
559 })
560 for (let i = 0; i < numberOfWorkers * 2; i++) {
561 promises.add(pool.execute())
562 }
563 await Promise.all(promises)
564 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
565 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
566 expect(poolBusy).toBe(numberOfWorkers + 1)
567 expect(poolInfo).toStrictEqual({
568 type: PoolTypes.fixed,
569 worker: WorkerTypes.thread,
570 minSize: expect.any(Number),
571 maxSize: expect.any(Number),
572 workerNodes: expect.any(Number),
573 idleWorkerNodes: expect.any(Number),
574 busyWorkerNodes: expect.any(Number),
575 executedTasks: expect.any(Number),
576 executingTasks: expect.any(Number),
577 queuedTasks: expect.any(Number),
578 maxQueuedTasks: expect.any(Number),
579 failedTasks: expect.any(Number)
580 })
581 await pool.destroy()
582 })
583
584 it('Verify that multiple tasks worker is working', async () => {
585 const pool = new DynamicClusterPool(
586 numberOfWorkers,
587 numberOfWorkers * 2,
588 './tests/worker-files/cluster/testMultiTasksWorker.js'
589 )
590 const data = { n: 10 }
591 const result0 = await pool.execute(data)
592 expect(result0).toBe(false)
593 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
594 expect(result1).toBe(false)
595 const result2 = await pool.execute(data, 'factorial')
596 expect(result2).toBe(3628800)
597 const result3 = await pool.execute(data, 'fibonacci')
598 expect(result3).toBe(89)
599 })
600 })