37729cee58bfabc542629a070d5d7445a30f9bd5
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const sinon = require('sinon')
3 const {
4 DynamicClusterPool,
5 DynamicThreadPool,
6 FixedClusterPool,
7 FixedThreadPool,
8 PoolEvents,
9 PoolTypes,
10 WorkerChoiceStrategies,
11 WorkerTypes
12 } = require('../../../lib')
13 const { CircularArray } = require('../../../lib/circular-array')
14 const { Deque } = require('../../../lib/deque')
15 const { version } = require('../../../package.json')
16 const { waitPoolEvents } = require('../../test-utils')
17
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers = 2
20 class StubPoolWithIsMain extends FixedThreadPool {
21 isMain () {
22 return false
23 }
24 }
25
26 afterEach(() => {
27 sinon.restore()
28 })
29
30 it('Simulate pool creation from a non main thread/process', () => {
31 expect(
32 () =>
33 new StubPoolWithIsMain(
34 numberOfWorkers,
35 './tests/worker-files/thread/testWorker.js',
36 {
37 errorHandler: (e) => console.error(e)
38 }
39 )
40 ).toThrowError(
41 new Error(
42 'Cannot start a pool from a worker with the same type as the pool'
43 )
44 )
45 })
46
47 it('Verify that filePath is checked', () => {
48 const expectedError = new Error(
49 'Please specify a file with a worker implementation'
50 )
51 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
52 expectedError
53 )
54 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
55 expectedError
56 )
57 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
58 expectedError
59 )
60 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
61 expectedError
62 )
63 expect(
64 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
65 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
66 })
67
68 it('Verify that numberOfWorkers is checked', () => {
69 expect(() => new FixedThreadPool()).toThrowError(
70 new Error(
71 'Cannot instantiate a pool without specifying the number of workers'
72 )
73 )
74 })
75
76 it('Verify that a negative number of workers is checked', () => {
77 expect(
78 () =>
79 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
80 ).toThrowError(
81 new RangeError(
82 'Cannot instantiate a pool with a negative number of workers'
83 )
84 )
85 })
86
87 it('Verify that a non integer number of workers is checked', () => {
88 expect(
89 () =>
90 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
91 ).toThrowError(
92 new TypeError(
93 'Cannot instantiate a pool with a non safe integer number of workers'
94 )
95 )
96 })
97
98 it('Verify that dynamic pool sizing is checked', () => {
99 expect(
100 () =>
101 new DynamicClusterPool(
102 1,
103 undefined,
104 './tests/worker-files/cluster/testWorker.js'
105 )
106 ).toThrowError(
107 new TypeError(
108 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
109 )
110 )
111 expect(
112 () =>
113 new DynamicThreadPool(
114 0.5,
115 1,
116 './tests/worker-files/thread/testWorker.js'
117 )
118 ).toThrowError(
119 new TypeError(
120 'Cannot instantiate a pool with a non safe integer number of workers'
121 )
122 )
123 expect(
124 () =>
125 new DynamicClusterPool(
126 0,
127 0.5,
128 './tests/worker-files/cluster/testWorker.js'
129 )
130 ).toThrowError(
131 new TypeError(
132 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
133 )
134 )
135 expect(
136 () =>
137 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
138 ).toThrowError(
139 new RangeError(
140 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
141 )
142 )
143 expect(
144 () =>
145 new DynamicClusterPool(
146 1,
147 1,
148 './tests/worker-files/cluster/testWorker.js'
149 )
150 ).toThrowError(
151 new RangeError(
152 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
153 )
154 )
155 expect(
156 () =>
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
158 ).toThrowError(
159 new RangeError(
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
161 )
162 )
163 })
164
165 it('Verify that pool options are checked', async () => {
166 let pool = new FixedThreadPool(
167 numberOfWorkers,
168 './tests/worker-files/thread/testWorker.js'
169 )
170 expect(pool.emitter).toBeDefined()
171 expect(pool.opts.enableEvents).toBe(true)
172 expect(pool.opts.restartWorkerOnError).toBe(true)
173 expect(pool.opts.enableTasksQueue).toBe(false)
174 expect(pool.opts.tasksQueueOptions).toBeUndefined()
175 expect(pool.opts.workerChoiceStrategy).toBe(
176 WorkerChoiceStrategies.ROUND_ROBIN
177 )
178 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
179 choiceRetries: 6,
180 runTime: { median: false },
181 waitTime: { median: false },
182 elu: { median: false }
183 })
184 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
185 choiceRetries: 6,
186 runTime: { median: false },
187 waitTime: { median: false },
188 elu: { median: false }
189 })
190 expect(pool.opts.messageHandler).toBeUndefined()
191 expect(pool.opts.errorHandler).toBeUndefined()
192 expect(pool.opts.onlineHandler).toBeUndefined()
193 expect(pool.opts.exitHandler).toBeUndefined()
194 await pool.destroy()
195 const testHandler = () => console.info('test handler executed')
196 pool = new FixedThreadPool(
197 numberOfWorkers,
198 './tests/worker-files/thread/testWorker.js',
199 {
200 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
201 workerChoiceStrategyOptions: {
202 runTime: { median: true },
203 weights: { 0: 300, 1: 200 }
204 },
205 enableEvents: false,
206 restartWorkerOnError: false,
207 enableTasksQueue: true,
208 tasksQueueOptions: { concurrency: 2 },
209 messageHandler: testHandler,
210 errorHandler: testHandler,
211 onlineHandler: testHandler,
212 exitHandler: testHandler
213 }
214 )
215 expect(pool.emitter).toBeUndefined()
216 expect(pool.opts.enableEvents).toBe(false)
217 expect(pool.opts.restartWorkerOnError).toBe(false)
218 expect(pool.opts.enableTasksQueue).toBe(true)
219 expect(pool.opts.tasksQueueOptions).toStrictEqual({
220 concurrency: 2,
221 size: 4
222 })
223 expect(pool.opts.workerChoiceStrategy).toBe(
224 WorkerChoiceStrategies.LEAST_USED
225 )
226 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
227 choiceRetries: 6,
228 runTime: { median: true },
229 waitTime: { median: false },
230 elu: { median: false },
231 weights: { 0: 300, 1: 200 }
232 })
233 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 choiceRetries: 6,
235 runTime: { median: true },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: { 0: 300, 1: 200 }
239 })
240 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
241 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
242 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
243 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
244 await pool.destroy()
245 })
246
247 it('Verify that pool options are validated', async () => {
248 expect(
249 () =>
250 new FixedThreadPool(
251 numberOfWorkers,
252 './tests/worker-files/thread/testWorker.js',
253 {
254 workerChoiceStrategy: 'invalidStrategy'
255 }
256 )
257 ).toThrowError(
258 new Error("Invalid worker choice strategy 'invalidStrategy'")
259 )
260 expect(
261 () =>
262 new FixedThreadPool(
263 numberOfWorkers,
264 './tests/worker-files/thread/testWorker.js',
265 {
266 workerChoiceStrategyOptions: {
267 choiceRetries: 'invalidChoiceRetries'
268 }
269 }
270 )
271 ).toThrowError(
272 new TypeError(
273 'Invalid worker choice strategy options: choice retries must be an integer'
274 )
275 )
276 expect(
277 () =>
278 new FixedThreadPool(
279 numberOfWorkers,
280 './tests/worker-files/thread/testWorker.js',
281 {
282 workerChoiceStrategyOptions: {
283 choiceRetries: -1
284 }
285 }
286 )
287 ).toThrowError(
288 new RangeError(
289 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
290 )
291 )
292 expect(
293 () =>
294 new FixedThreadPool(
295 numberOfWorkers,
296 './tests/worker-files/thread/testWorker.js',
297 {
298 workerChoiceStrategyOptions: { weights: {} }
299 }
300 )
301 ).toThrowError(
302 new Error(
303 'Invalid worker choice strategy options: must have a weight for each worker node'
304 )
305 )
306 expect(
307 () =>
308 new FixedThreadPool(
309 numberOfWorkers,
310 './tests/worker-files/thread/testWorker.js',
311 {
312 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
313 }
314 )
315 ).toThrowError(
316 new Error(
317 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
318 )
319 )
320 expect(
321 () =>
322 new FixedThreadPool(
323 numberOfWorkers,
324 './tests/worker-files/thread/testWorker.js',
325 {
326 enableTasksQueue: true,
327 tasksQueueOptions: 'invalidTasksQueueOptions'
328 }
329 )
330 ).toThrowError(
331 new TypeError('Invalid tasks queue options: must be a plain object')
332 )
333 expect(
334 () =>
335 new FixedThreadPool(
336 numberOfWorkers,
337 './tests/worker-files/thread/testWorker.js',
338 {
339 enableTasksQueue: true,
340 tasksQueueOptions: { concurrency: 0 }
341 }
342 )
343 ).toThrowError(
344 new RangeError(
345 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
346 )
347 )
348 expect(
349 () =>
350 new FixedThreadPool(
351 numberOfWorkers,
352 './tests/worker-files/thread/testWorker.js',
353 {
354 enableTasksQueue: true,
355 tasksQueueOptions: { concurrency: -1 }
356 }
357 )
358 ).toThrowError(
359 new RangeError(
360 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
361 )
362 )
363 expect(
364 () =>
365 new FixedThreadPool(
366 numberOfWorkers,
367 './tests/worker-files/thread/testWorker.js',
368 {
369 enableTasksQueue: true,
370 tasksQueueOptions: { concurrency: 0.2 }
371 }
372 )
373 ).toThrowError(
374 new TypeError('Invalid worker node tasks concurrency: must be an integer')
375 )
376 expect(
377 () =>
378 new FixedThreadPool(
379 numberOfWorkers,
380 './tests/worker-files/thread/testWorker.js',
381 {
382 enableTasksQueue: true,
383 tasksQueueOptions: { queueMaxSize: 2 }
384 }
385 )
386 ).toThrowError(
387 new Error(
388 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
389 )
390 )
391 expect(
392 () =>
393 new FixedThreadPool(
394 numberOfWorkers,
395 './tests/worker-files/thread/testWorker.js',
396 {
397 enableTasksQueue: true,
398 tasksQueueOptions: { size: 0 }
399 }
400 )
401 ).toThrowError(
402 new RangeError(
403 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
404 )
405 )
406 expect(
407 () =>
408 new FixedThreadPool(
409 numberOfWorkers,
410 './tests/worker-files/thread/testWorker.js',
411 {
412 enableTasksQueue: true,
413 tasksQueueOptions: { size: -1 }
414 }
415 )
416 ).toThrowError(
417 new RangeError(
418 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
419 )
420 )
421 expect(
422 () =>
423 new FixedThreadPool(
424 numberOfWorkers,
425 './tests/worker-files/thread/testWorker.js',
426 {
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0.2 }
429 }
430 )
431 ).toThrowError(
432 new TypeError('Invalid worker node tasks queue size: must be an integer')
433 )
434 })
435
436 it('Verify that pool worker choice strategy options can be set', async () => {
437 const pool = new FixedThreadPool(
438 numberOfWorkers,
439 './tests/worker-files/thread/testWorker.js',
440 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
441 )
442 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
443 choiceRetries: 6,
444 runTime: { median: false },
445 waitTime: { median: false },
446 elu: { median: false }
447 })
448 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449 choiceRetries: 6,
450 runTime: { median: false },
451 waitTime: { median: false },
452 elu: { median: false }
453 })
454 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
455 .workerChoiceStrategies) {
456 expect(workerChoiceStrategy.opts).toStrictEqual({
457 choiceRetries: 6,
458 runTime: { median: false },
459 waitTime: { median: false },
460 elu: { median: false }
461 })
462 }
463 expect(
464 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
465 ).toStrictEqual({
466 runTime: {
467 aggregate: true,
468 average: true,
469 median: false
470 },
471 waitTime: {
472 aggregate: false,
473 average: false,
474 median: false
475 },
476 elu: {
477 aggregate: true,
478 average: true,
479 median: false
480 }
481 })
482 pool.setWorkerChoiceStrategyOptions({
483 runTime: { median: true },
484 elu: { median: true }
485 })
486 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
487 choiceRetries: 6,
488 runTime: { median: true },
489 waitTime: { median: false },
490 elu: { median: true }
491 })
492 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
493 choiceRetries: 6,
494 runTime: { median: true },
495 waitTime: { median: false },
496 elu: { median: true }
497 })
498 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
499 .workerChoiceStrategies) {
500 expect(workerChoiceStrategy.opts).toStrictEqual({
501 choiceRetries: 6,
502 runTime: { median: true },
503 waitTime: { median: false },
504 elu: { median: true }
505 })
506 }
507 expect(
508 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
509 ).toStrictEqual({
510 runTime: {
511 aggregate: true,
512 average: false,
513 median: true
514 },
515 waitTime: {
516 aggregate: false,
517 average: false,
518 median: false
519 },
520 elu: {
521 aggregate: true,
522 average: false,
523 median: true
524 }
525 })
526 pool.setWorkerChoiceStrategyOptions({
527 runTime: { median: false },
528 elu: { median: false }
529 })
530 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
531 choiceRetries: 6,
532 runTime: { median: false },
533 waitTime: { median: false },
534 elu: { median: false }
535 })
536 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
537 choiceRetries: 6,
538 runTime: { median: false },
539 waitTime: { median: false },
540 elu: { median: false }
541 })
542 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
543 .workerChoiceStrategies) {
544 expect(workerChoiceStrategy.opts).toStrictEqual({
545 choiceRetries: 6,
546 runTime: { median: false },
547 waitTime: { median: false },
548 elu: { median: false }
549 })
550 }
551 expect(
552 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
553 ).toStrictEqual({
554 runTime: {
555 aggregate: true,
556 average: true,
557 median: false
558 },
559 waitTime: {
560 aggregate: false,
561 average: false,
562 median: false
563 },
564 elu: {
565 aggregate: true,
566 average: true,
567 median: false
568 }
569 })
570 expect(() =>
571 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
572 ).toThrowError(
573 new TypeError(
574 'Invalid worker choice strategy options: must be a plain object'
575 )
576 )
577 expect(() =>
578 pool.setWorkerChoiceStrategyOptions({
579 choiceRetries: 'invalidChoiceRetries'
580 })
581 ).toThrowError(
582 new TypeError(
583 'Invalid worker choice strategy options: choice retries must be an integer'
584 )
585 )
586 expect(() =>
587 pool.setWorkerChoiceStrategyOptions({ choiceRetries: -1 })
588 ).toThrowError(
589 new RangeError(
590 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
591 )
592 )
593 expect(() =>
594 pool.setWorkerChoiceStrategyOptions({ weights: {} })
595 ).toThrowError(
596 new Error(
597 'Invalid worker choice strategy options: must have a weight for each worker node'
598 )
599 )
600 expect(() =>
601 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
602 ).toThrowError(
603 new Error(
604 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
605 )
606 )
607 await pool.destroy()
608 })
609
610 it('Verify that pool tasks queue can be enabled/disabled', async () => {
611 const pool = new FixedThreadPool(
612 numberOfWorkers,
613 './tests/worker-files/thread/testWorker.js'
614 )
615 expect(pool.opts.enableTasksQueue).toBe(false)
616 expect(pool.opts.tasksQueueOptions).toBeUndefined()
617 pool.enableTasksQueue(true)
618 expect(pool.opts.enableTasksQueue).toBe(true)
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 concurrency: 1,
621 size: 4
622 })
623 pool.enableTasksQueue(true, { concurrency: 2 })
624 expect(pool.opts.enableTasksQueue).toBe(true)
625 expect(pool.opts.tasksQueueOptions).toStrictEqual({
626 concurrency: 2,
627 size: 4
628 })
629 pool.enableTasksQueue(false)
630 expect(pool.opts.enableTasksQueue).toBe(false)
631 expect(pool.opts.tasksQueueOptions).toBeUndefined()
632 await pool.destroy()
633 })
634
635 it('Verify that pool tasks queue options can be set', async () => {
636 const pool = new FixedThreadPool(
637 numberOfWorkers,
638 './tests/worker-files/thread/testWorker.js',
639 { enableTasksQueue: true }
640 )
641 expect(pool.opts.tasksQueueOptions).toStrictEqual({
642 concurrency: 1,
643 size: 4
644 })
645 pool.setTasksQueueOptions({ concurrency: 2 })
646 expect(pool.opts.tasksQueueOptions).toStrictEqual({
647 concurrency: 2,
648 size: 4
649 })
650 expect(() =>
651 pool.setTasksQueueOptions('invalidTasksQueueOptions')
652 ).toThrowError(
653 new TypeError('Invalid tasks queue options: must be a plain object')
654 )
655 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
656 new RangeError(
657 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
658 )
659 )
660 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
661 new RangeError(
662 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
663 )
664 )
665 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
666 new TypeError('Invalid worker node tasks concurrency: must be an integer')
667 )
668 expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError(
669 new Error(
670 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
671 )
672 )
673 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
674 new RangeError(
675 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
676 )
677 )
678 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
679 new RangeError(
680 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
681 )
682 )
683 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
684 new TypeError('Invalid worker node tasks queue size: must be an integer')
685 )
686 await pool.destroy()
687 })
688
689 it('Verify that pool info is set', async () => {
690 let pool = new FixedThreadPool(
691 numberOfWorkers,
692 './tests/worker-files/thread/testWorker.js'
693 )
694 expect(pool.info).toStrictEqual({
695 version,
696 type: PoolTypes.fixed,
697 worker: WorkerTypes.thread,
698 ready: true,
699 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
700 minSize: numberOfWorkers,
701 maxSize: numberOfWorkers,
702 workerNodes: numberOfWorkers,
703 idleWorkerNodes: numberOfWorkers,
704 busyWorkerNodes: 0,
705 executedTasks: 0,
706 executingTasks: 0,
707 failedTasks: 0
708 })
709 await pool.destroy()
710 pool = new DynamicClusterPool(
711 Math.floor(numberOfWorkers / 2),
712 numberOfWorkers,
713 './tests/worker-files/cluster/testWorker.js'
714 )
715 expect(pool.info).toStrictEqual({
716 version,
717 type: PoolTypes.dynamic,
718 worker: WorkerTypes.cluster,
719 ready: true,
720 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
721 minSize: Math.floor(numberOfWorkers / 2),
722 maxSize: numberOfWorkers,
723 workerNodes: Math.floor(numberOfWorkers / 2),
724 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
725 busyWorkerNodes: 0,
726 executedTasks: 0,
727 executingTasks: 0,
728 failedTasks: 0
729 })
730 await pool.destroy()
731 })
732
733 it('Verify that pool worker tasks usage are initialized', async () => {
734 const pool = new FixedClusterPool(
735 numberOfWorkers,
736 './tests/worker-files/cluster/testWorker.js'
737 )
738 for (const workerNode of pool.workerNodes) {
739 expect(workerNode.usage).toStrictEqual({
740 tasks: {
741 executed: 0,
742 executing: 0,
743 queued: 0,
744 maxQueued: 0,
745 stolen: 0,
746 failed: 0
747 },
748 runTime: {
749 history: expect.any(CircularArray)
750 },
751 waitTime: {
752 history: expect.any(CircularArray)
753 },
754 elu: {
755 idle: {
756 history: expect.any(CircularArray)
757 },
758 active: {
759 history: expect.any(CircularArray)
760 }
761 }
762 })
763 }
764 await pool.destroy()
765 })
766
767 it('Verify that pool worker tasks queue are initialized', async () => {
768 let pool = new FixedClusterPool(
769 numberOfWorkers,
770 './tests/worker-files/cluster/testWorker.js'
771 )
772 for (const workerNode of pool.workerNodes) {
773 expect(workerNode.tasksQueue).toBeDefined()
774 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
775 expect(workerNode.tasksQueue.size).toBe(0)
776 expect(workerNode.tasksQueue.maxSize).toBe(0)
777 }
778 await pool.destroy()
779 pool = new DynamicThreadPool(
780 Math.floor(numberOfWorkers / 2),
781 numberOfWorkers,
782 './tests/worker-files/thread/testWorker.js'
783 )
784 for (const workerNode of pool.workerNodes) {
785 expect(workerNode.tasksQueue).toBeDefined()
786 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
787 expect(workerNode.tasksQueue.size).toBe(0)
788 expect(workerNode.tasksQueue.maxSize).toBe(0)
789 }
790 })
791
792 it('Verify that pool worker info are initialized', async () => {
793 let pool = new FixedClusterPool(
794 numberOfWorkers,
795 './tests/worker-files/cluster/testWorker.js'
796 )
797 for (const workerNode of pool.workerNodes) {
798 expect(workerNode.info).toStrictEqual({
799 id: expect.any(Number),
800 type: WorkerTypes.cluster,
801 dynamic: false,
802 ready: true
803 })
804 }
805 await pool.destroy()
806 pool = new DynamicThreadPool(
807 Math.floor(numberOfWorkers / 2),
808 numberOfWorkers,
809 './tests/worker-files/thread/testWorker.js'
810 )
811 for (const workerNode of pool.workerNodes) {
812 expect(workerNode.info).toStrictEqual({
813 id: expect.any(Number),
814 type: WorkerTypes.thread,
815 dynamic: false,
816 ready: true
817 })
818 }
819 })
820
821 it('Verify that pool execute() arguments are checked', async () => {
822 const pool = new FixedClusterPool(
823 numberOfWorkers,
824 './tests/worker-files/cluster/testWorker.js'
825 )
826 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
827 new TypeError('name argument must be a string')
828 )
829 await expect(pool.execute(undefined, '')).rejects.toThrowError(
830 new TypeError('name argument must not be an empty string')
831 )
832 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
833 new TypeError('transferList argument must be an array')
834 )
835 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
836 "Task function 'unknown' not found"
837 )
838 await pool.destroy()
839 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
840 new Error('Cannot execute a task on destroyed pool')
841 )
842 })
843
844 it('Verify that pool worker tasks usage are computed', async () => {
845 const pool = new FixedClusterPool(
846 numberOfWorkers,
847 './tests/worker-files/cluster/testWorker.js'
848 )
849 const promises = new Set()
850 const maxMultiplier = 2
851 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
852 promises.add(pool.execute())
853 }
854 for (const workerNode of pool.workerNodes) {
855 expect(workerNode.usage).toStrictEqual({
856 tasks: {
857 executed: 0,
858 executing: maxMultiplier,
859 queued: 0,
860 maxQueued: 0,
861 stolen: 0,
862 failed: 0
863 },
864 runTime: {
865 history: expect.any(CircularArray)
866 },
867 waitTime: {
868 history: expect.any(CircularArray)
869 },
870 elu: {
871 idle: {
872 history: expect.any(CircularArray)
873 },
874 active: {
875 history: expect.any(CircularArray)
876 }
877 }
878 })
879 }
880 await Promise.all(promises)
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode.usage).toStrictEqual({
883 tasks: {
884 executed: maxMultiplier,
885 executing: 0,
886 queued: 0,
887 maxQueued: 0,
888 stolen: 0,
889 failed: 0
890 },
891 runTime: {
892 history: expect.any(CircularArray)
893 },
894 waitTime: {
895 history: expect.any(CircularArray)
896 },
897 elu: {
898 idle: {
899 history: expect.any(CircularArray)
900 },
901 active: {
902 history: expect.any(CircularArray)
903 }
904 }
905 })
906 }
907 await pool.destroy()
908 })
909
910 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
911 const pool = new DynamicThreadPool(
912 Math.floor(numberOfWorkers / 2),
913 numberOfWorkers,
914 './tests/worker-files/thread/testWorker.js'
915 )
916 const promises = new Set()
917 const maxMultiplier = 2
918 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
919 promises.add(pool.execute())
920 }
921 await Promise.all(promises)
922 for (const workerNode of pool.workerNodes) {
923 expect(workerNode.usage).toStrictEqual({
924 tasks: {
925 executed: expect.any(Number),
926 executing: 0,
927 queued: 0,
928 maxQueued: 0,
929 stolen: 0,
930 failed: 0
931 },
932 runTime: {
933 history: expect.any(CircularArray)
934 },
935 waitTime: {
936 history: expect.any(CircularArray)
937 },
938 elu: {
939 idle: {
940 history: expect.any(CircularArray)
941 },
942 active: {
943 history: expect.any(CircularArray)
944 }
945 }
946 })
947 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
948 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
949 numberOfWorkers * maxMultiplier
950 )
951 expect(workerNode.usage.runTime.history.length).toBe(0)
952 expect(workerNode.usage.waitTime.history.length).toBe(0)
953 expect(workerNode.usage.elu.idle.history.length).toBe(0)
954 expect(workerNode.usage.elu.active.history.length).toBe(0)
955 }
956 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
957 for (const workerNode of pool.workerNodes) {
958 expect(workerNode.usage).toStrictEqual({
959 tasks: {
960 executed: 0,
961 executing: 0,
962 queued: 0,
963 maxQueued: 0,
964 stolen: 0,
965 failed: 0
966 },
967 runTime: {
968 history: expect.any(CircularArray)
969 },
970 waitTime: {
971 history: expect.any(CircularArray)
972 },
973 elu: {
974 idle: {
975 history: expect.any(CircularArray)
976 },
977 active: {
978 history: expect.any(CircularArray)
979 }
980 }
981 })
982 expect(workerNode.usage.runTime.history.length).toBe(0)
983 expect(workerNode.usage.waitTime.history.length).toBe(0)
984 expect(workerNode.usage.elu.idle.history.length).toBe(0)
985 expect(workerNode.usage.elu.active.history.length).toBe(0)
986 }
987 await pool.destroy()
988 })
989
990 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
991 const pool = new DynamicClusterPool(
992 Math.floor(numberOfWorkers / 2),
993 numberOfWorkers,
994 './tests/worker-files/cluster/testWorker.js'
995 )
996 let poolInfo
997 let poolReady = 0
998 pool.emitter.on(PoolEvents.ready, (info) => {
999 ++poolReady
1000 poolInfo = info
1001 })
1002 await waitPoolEvents(pool, PoolEvents.ready, 1)
1003 expect(poolReady).toBe(1)
1004 expect(poolInfo).toStrictEqual({
1005 version,
1006 type: PoolTypes.dynamic,
1007 worker: WorkerTypes.cluster,
1008 ready: true,
1009 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1010 minSize: expect.any(Number),
1011 maxSize: expect.any(Number),
1012 workerNodes: expect.any(Number),
1013 idleWorkerNodes: expect.any(Number),
1014 busyWorkerNodes: expect.any(Number),
1015 executedTasks: expect.any(Number),
1016 executingTasks: expect.any(Number),
1017 failedTasks: expect.any(Number)
1018 })
1019 await pool.destroy()
1020 })
1021
1022 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1023 const pool = new FixedThreadPool(
1024 numberOfWorkers,
1025 './tests/worker-files/thread/testWorker.js'
1026 )
1027 const promises = new Set()
1028 let poolBusy = 0
1029 let poolInfo
1030 pool.emitter.on(PoolEvents.busy, (info) => {
1031 ++poolBusy
1032 poolInfo = info
1033 })
1034 for (let i = 0; i < numberOfWorkers * 2; i++) {
1035 promises.add(pool.execute())
1036 }
1037 await Promise.all(promises)
1038 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1039 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1040 expect(poolBusy).toBe(numberOfWorkers + 1)
1041 expect(poolInfo).toStrictEqual({
1042 version,
1043 type: PoolTypes.fixed,
1044 worker: WorkerTypes.thread,
1045 ready: expect.any(Boolean),
1046 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1047 minSize: expect.any(Number),
1048 maxSize: expect.any(Number),
1049 workerNodes: expect.any(Number),
1050 idleWorkerNodes: expect.any(Number),
1051 busyWorkerNodes: expect.any(Number),
1052 executedTasks: expect.any(Number),
1053 executingTasks: expect.any(Number),
1054 failedTasks: expect.any(Number)
1055 })
1056 await pool.destroy()
1057 })
1058
1059 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1060 const pool = new DynamicThreadPool(
1061 Math.floor(numberOfWorkers / 2),
1062 numberOfWorkers,
1063 './tests/worker-files/thread/testWorker.js'
1064 )
1065 const promises = new Set()
1066 let poolFull = 0
1067 let poolInfo
1068 pool.emitter.on(PoolEvents.full, (info) => {
1069 ++poolFull
1070 poolInfo = info
1071 })
1072 for (let i = 0; i < numberOfWorkers * 2; i++) {
1073 promises.add(pool.execute())
1074 }
1075 await Promise.all(promises)
1076 expect(poolFull).toBe(1)
1077 expect(poolInfo).toStrictEqual({
1078 version,
1079 type: PoolTypes.dynamic,
1080 worker: WorkerTypes.thread,
1081 ready: expect.any(Boolean),
1082 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1083 minSize: expect.any(Number),
1084 maxSize: expect.any(Number),
1085 workerNodes: expect.any(Number),
1086 idleWorkerNodes: expect.any(Number),
1087 busyWorkerNodes: expect.any(Number),
1088 executedTasks: expect.any(Number),
1089 executingTasks: expect.any(Number),
1090 failedTasks: expect.any(Number)
1091 })
1092 await pool.destroy()
1093 })
1094
1095 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1096 const pool = new FixedThreadPool(
1097 numberOfWorkers,
1098 './tests/worker-files/thread/testWorker.js',
1099 {
1100 enableTasksQueue: true
1101 }
1102 )
1103 sinon.stub(pool, 'hasBackPressure').returns(true)
1104 const promises = new Set()
1105 let poolBackPressure = 0
1106 let poolInfo
1107 pool.emitter.on(PoolEvents.backPressure, (info) => {
1108 ++poolBackPressure
1109 poolInfo = info
1110 })
1111 for (let i = 0; i < numberOfWorkers + 1; i++) {
1112 promises.add(pool.execute())
1113 }
1114 await Promise.all(promises)
1115 expect(poolBackPressure).toBe(1)
1116 expect(poolInfo).toStrictEqual({
1117 version,
1118 type: PoolTypes.fixed,
1119 worker: WorkerTypes.thread,
1120 ready: expect.any(Boolean),
1121 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1122 minSize: expect.any(Number),
1123 maxSize: expect.any(Number),
1124 workerNodes: expect.any(Number),
1125 idleWorkerNodes: expect.any(Number),
1126 busyWorkerNodes: expect.any(Number),
1127 executedTasks: expect.any(Number),
1128 executingTasks: expect.any(Number),
1129 maxQueuedTasks: expect.any(Number),
1130 queuedTasks: expect.any(Number),
1131 backPressure: true,
1132 stolenTasks: expect.any(Number),
1133 failedTasks: expect.any(Number)
1134 })
1135 expect(pool.hasBackPressure.called).toBe(true)
1136 await pool.destroy()
1137 })
1138
1139 it('Verify that listTaskFunctions() is working', async () => {
1140 const dynamicThreadPool = new DynamicThreadPool(
1141 Math.floor(numberOfWorkers / 2),
1142 numberOfWorkers,
1143 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1144 )
1145 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1146 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
1147 'default',
1148 'jsonIntegerSerialization',
1149 'factorial',
1150 'fibonacci'
1151 ])
1152 const fixedClusterPool = new FixedClusterPool(
1153 numberOfWorkers,
1154 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1155 )
1156 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1157 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
1158 'default',
1159 'jsonIntegerSerialization',
1160 'factorial',
1161 'fibonacci'
1162 ])
1163 })
1164
1165 it('Verify that multiple task functions worker is working', async () => {
1166 const pool = new DynamicClusterPool(
1167 Math.floor(numberOfWorkers / 2),
1168 numberOfWorkers,
1169 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1170 )
1171 const data = { n: 10 }
1172 const result0 = await pool.execute(data)
1173 expect(result0).toStrictEqual({ ok: 1 })
1174 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1175 expect(result1).toStrictEqual({ ok: 1 })
1176 const result2 = await pool.execute(data, 'factorial')
1177 expect(result2).toBe(3628800)
1178 const result3 = await pool.execute(data, 'fibonacci')
1179 expect(result3).toBe(55)
1180 expect(pool.info.executingTasks).toBe(0)
1181 expect(pool.info.executedTasks).toBe(4)
1182 for (const workerNode of pool.workerNodes) {
1183 expect(workerNode.info.taskFunctions).toStrictEqual([
1184 'default',
1185 'jsonIntegerSerialization',
1186 'factorial',
1187 'fibonacci'
1188 ])
1189 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1190 for (const name of pool.listTaskFunctions()) {
1191 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1192 tasks: {
1193 executed: expect.any(Number),
1194 executing: expect.any(Number),
1195 failed: 0,
1196 queued: 0,
1197 stolen: 0
1198 },
1199 runTime: {
1200 history: expect.any(CircularArray)
1201 },
1202 waitTime: {
1203 history: expect.any(CircularArray)
1204 },
1205 elu: {
1206 idle: {
1207 history: expect.any(CircularArray)
1208 },
1209 active: {
1210 history: expect.any(CircularArray)
1211 }
1212 }
1213 })
1214 expect(
1215 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
1216 ).toBeGreaterThanOrEqual(0)
1217 }
1218 }
1219 })
1220 })