Merge branch 'master' of github.com:poolifier/poolifier into feature/task-functions
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { EventEmitter } = require('node:events')
2 const { expect } = require('expect')
3 const sinon = require('sinon')
4 const {
5 DynamicClusterPool,
6 DynamicThreadPool,
7 FixedClusterPool,
8 FixedThreadPool,
9 PoolEvents,
10 PoolTypes,
11 WorkerChoiceStrategies,
12 WorkerTypes
13 } = require('../../../lib')
14 const { CircularArray } = require('../../../lib/circular-array')
15 const { Deque } = require('../../../lib/deque')
16 const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
17 const { version } = require('../../../package.json')
18 const { waitPoolEvents } = require('../../test-utils')
19 const { WorkerNode } = require('../../../lib/pools/worker-node')
20
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers = 2
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 afterEach(() => {
30 sinon.restore()
31 })
32
33 it('Simulate pool creation from a non main thread/process', () => {
34 expect(
35 () =>
36 new StubPoolWithIsMain(
37 numberOfWorkers,
38 './tests/worker-files/thread/testWorker.js',
39 {
40 errorHandler: e => console.error(e)
41 }
42 )
43 ).toThrowError(
44 new Error(
45 'Cannot start a pool from a worker with the same type as the pool'
46 )
47 )
48 })
49
50 it('Verify that pool statuses properties are set', async () => {
51 const pool = new FixedThreadPool(
52 numberOfWorkers,
53 './tests/worker-files/thread/testWorker.js'
54 )
55 expect(pool.starting).toBe(false)
56 expect(pool.started).toBe(true)
57 await pool.destroy()
58 })
59
60 it('Verify that filePath is checked', () => {
61 const expectedError = new Error(
62 'Please specify a file with a worker implementation'
63 )
64 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
65 expectedError
66 )
67 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
68 expectedError
69 )
70 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
71 expectedError
72 )
73 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
74 expectedError
75 )
76 expect(
77 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
79 })
80
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(() => new FixedThreadPool()).toThrowError(
83 new Error(
84 'Cannot instantiate a pool without specifying the number of workers'
85 )
86 )
87 })
88
89 it('Verify that a negative number of workers is checked', () => {
90 expect(
91 () =>
92 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
93 ).toThrowError(
94 new RangeError(
95 'Cannot instantiate a pool with a negative number of workers'
96 )
97 )
98 })
99
100 it('Verify that a non integer number of workers is checked', () => {
101 expect(
102 () =>
103 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
104 ).toThrowError(
105 new TypeError(
106 'Cannot instantiate a pool with a non safe integer number of workers'
107 )
108 )
109 })
110
111 it('Verify that dynamic pool sizing is checked', () => {
112 expect(
113 () =>
114 new DynamicClusterPool(
115 1,
116 undefined,
117 './tests/worker-files/cluster/testWorker.js'
118 )
119 ).toThrowError(
120 new TypeError(
121 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
122 )
123 )
124 expect(
125 () =>
126 new DynamicThreadPool(
127 0.5,
128 1,
129 './tests/worker-files/thread/testWorker.js'
130 )
131 ).toThrowError(
132 new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 )
136 expect(
137 () =>
138 new DynamicClusterPool(
139 0,
140 0.5,
141 './tests/worker-files/cluster/testWorker.js'
142 )
143 ).toThrowError(
144 new TypeError(
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
146 )
147 )
148 expect(
149 () =>
150 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
151 ).toThrowError(
152 new RangeError(
153 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
159 ).toThrowError(
160 new RangeError(
161 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
162 )
163 )
164 expect(
165 () =>
166 new DynamicClusterPool(
167 1,
168 1,
169 './tests/worker-files/cluster/testWorker.js'
170 )
171 ).toThrowError(
172 new RangeError(
173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
174 )
175 )
176 })
177
178 it('Verify that pool options are checked', async () => {
179 let pool = new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js'
182 )
183 expect(pool.emitter).toBeInstanceOf(EventEmitter)
184 expect(pool.opts).toStrictEqual({
185 startWorkers: true,
186 enableEvents: true,
187 restartWorkerOnError: true,
188 enableTasksQueue: false,
189 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
190 workerChoiceStrategyOptions: {
191 retries: 6,
192 runTime: { median: false },
193 waitTime: { median: false },
194 elu: { median: false }
195 }
196 })
197 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
198 retries: 6,
199 runTime: { median: false },
200 waitTime: { median: false },
201 elu: { median: false }
202 })
203 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
204 .workerChoiceStrategies) {
205 expect(workerChoiceStrategy.opts).toStrictEqual({
206 retries: 6,
207 runTime: { median: false },
208 waitTime: { median: false },
209 elu: { median: false }
210 })
211 }
212 await pool.destroy()
213 const testHandler = () => console.info('test handler executed')
214 pool = new FixedThreadPool(
215 numberOfWorkers,
216 './tests/worker-files/thread/testWorker.js',
217 {
218 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
219 workerChoiceStrategyOptions: {
220 runTime: { median: true },
221 weights: { 0: 300, 1: 200 }
222 },
223 enableEvents: false,
224 restartWorkerOnError: false,
225 enableTasksQueue: true,
226 tasksQueueOptions: { concurrency: 2 },
227 messageHandler: testHandler,
228 errorHandler: testHandler,
229 onlineHandler: testHandler,
230 exitHandler: testHandler
231 }
232 )
233 expect(pool.emitter).toBeUndefined()
234 expect(pool.opts).toStrictEqual({
235 startWorkers: true,
236 enableEvents: false,
237 restartWorkerOnError: false,
238 enableTasksQueue: true,
239 tasksQueueOptions: {
240 concurrency: 2,
241 size: 4,
242 taskStealing: true,
243 tasksStealingOnBackPressure: true
244 },
245 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
246 workerChoiceStrategyOptions: {
247 retries: 6,
248 runTime: { median: true },
249 waitTime: { median: false },
250 elu: { median: false },
251 weights: { 0: 300, 1: 200 }
252 },
253 onlineHandler: testHandler,
254 messageHandler: testHandler,
255 errorHandler: testHandler,
256 exitHandler: testHandler
257 })
258 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
259 retries: 6,
260 runTime: { median: true },
261 waitTime: { median: false },
262 elu: { median: false },
263 weights: { 0: 300, 1: 200 }
264 })
265 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
266 .workerChoiceStrategies) {
267 expect(workerChoiceStrategy.opts).toStrictEqual({
268 retries: 6,
269 runTime: { median: true },
270 waitTime: { median: false },
271 elu: { median: false },
272 weights: { 0: 300, 1: 200 }
273 })
274 }
275 await pool.destroy()
276 })
277
278 it('Verify that pool options are validated', async () => {
279 expect(
280 () =>
281 new FixedThreadPool(
282 numberOfWorkers,
283 './tests/worker-files/thread/testWorker.js',
284 {
285 workerChoiceStrategy: 'invalidStrategy'
286 }
287 )
288 ).toThrowError(
289 new Error("Invalid worker choice strategy 'invalidStrategy'")
290 )
291 expect(
292 () =>
293 new FixedThreadPool(
294 numberOfWorkers,
295 './tests/worker-files/thread/testWorker.js',
296 {
297 workerChoiceStrategyOptions: {
298 retries: 'invalidChoiceRetries'
299 }
300 }
301 )
302 ).toThrowError(
303 new TypeError(
304 'Invalid worker choice strategy options: retries must be an integer'
305 )
306 )
307 expect(
308 () =>
309 new FixedThreadPool(
310 numberOfWorkers,
311 './tests/worker-files/thread/testWorker.js',
312 {
313 workerChoiceStrategyOptions: {
314 retries: -1
315 }
316 }
317 )
318 ).toThrowError(
319 new RangeError(
320 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
321 )
322 )
323 expect(
324 () =>
325 new FixedThreadPool(
326 numberOfWorkers,
327 './tests/worker-files/thread/testWorker.js',
328 {
329 workerChoiceStrategyOptions: { weights: {} }
330 }
331 )
332 ).toThrowError(
333 new Error(
334 'Invalid worker choice strategy options: must have a weight for each worker node'
335 )
336 )
337 expect(
338 () =>
339 new FixedThreadPool(
340 numberOfWorkers,
341 './tests/worker-files/thread/testWorker.js',
342 {
343 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
344 }
345 )
346 ).toThrowError(
347 new Error(
348 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
349 )
350 )
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
355 './tests/worker-files/thread/testWorker.js',
356 {
357 enableTasksQueue: true,
358 tasksQueueOptions: 'invalidTasksQueueOptions'
359 }
360 )
361 ).toThrowError(
362 new TypeError('Invalid tasks queue options: must be a plain object')
363 )
364 expect(
365 () =>
366 new FixedThreadPool(
367 numberOfWorkers,
368 './tests/worker-files/thread/testWorker.js',
369 {
370 enableTasksQueue: true,
371 tasksQueueOptions: { concurrency: 0 }
372 }
373 )
374 ).toThrowError(
375 new RangeError(
376 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
377 )
378 )
379 expect(
380 () =>
381 new FixedThreadPool(
382 numberOfWorkers,
383 './tests/worker-files/thread/testWorker.js',
384 {
385 enableTasksQueue: true,
386 tasksQueueOptions: { concurrency: -1 }
387 }
388 )
389 ).toThrowError(
390 new RangeError(
391 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
392 )
393 )
394 expect(
395 () =>
396 new FixedThreadPool(
397 numberOfWorkers,
398 './tests/worker-files/thread/testWorker.js',
399 {
400 enableTasksQueue: true,
401 tasksQueueOptions: { concurrency: 0.2 }
402 }
403 )
404 ).toThrowError(
405 new TypeError('Invalid worker node tasks concurrency: must be an integer')
406 )
407 expect(
408 () =>
409 new FixedThreadPool(
410 numberOfWorkers,
411 './tests/worker-files/thread/testWorker.js',
412 {
413 enableTasksQueue: true,
414 tasksQueueOptions: { size: 0 }
415 }
416 )
417 ).toThrowError(
418 new RangeError(
419 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
420 )
421 )
422 expect(
423 () =>
424 new FixedThreadPool(
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.js',
427 {
428 enableTasksQueue: true,
429 tasksQueueOptions: { size: -1 }
430 }
431 )
432 ).toThrowError(
433 new RangeError(
434 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
435 )
436 )
437 expect(
438 () =>
439 new FixedThreadPool(
440 numberOfWorkers,
441 './tests/worker-files/thread/testWorker.js',
442 {
443 enableTasksQueue: true,
444 tasksQueueOptions: { size: 0.2 }
445 }
446 )
447 ).toThrowError(
448 new TypeError('Invalid worker node tasks queue size: must be an integer')
449 )
450 })
451
452 it('Verify that pool worker choice strategy options can be set', async () => {
453 const pool = new FixedThreadPool(
454 numberOfWorkers,
455 './tests/worker-files/thread/testWorker.js',
456 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
457 )
458 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
459 retries: 6,
460 runTime: { median: false },
461 waitTime: { median: false },
462 elu: { median: false }
463 })
464 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
465 retries: 6,
466 runTime: { median: false },
467 waitTime: { median: false },
468 elu: { median: false }
469 })
470 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
471 .workerChoiceStrategies) {
472 expect(workerChoiceStrategy.opts).toStrictEqual({
473 retries: 6,
474 runTime: { median: false },
475 waitTime: { median: false },
476 elu: { median: false }
477 })
478 }
479 expect(
480 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 ).toStrictEqual({
482 runTime: {
483 aggregate: true,
484 average: true,
485 median: false
486 },
487 waitTime: {
488 aggregate: false,
489 average: false,
490 median: false
491 },
492 elu: {
493 aggregate: true,
494 average: true,
495 median: false
496 }
497 })
498 pool.setWorkerChoiceStrategyOptions({
499 runTime: { median: true },
500 elu: { median: true }
501 })
502 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
503 retries: 6,
504 runTime: { median: true },
505 waitTime: { median: false },
506 elu: { median: true }
507 })
508 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
509 retries: 6,
510 runTime: { median: true },
511 waitTime: { median: false },
512 elu: { median: true }
513 })
514 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
515 .workerChoiceStrategies) {
516 expect(workerChoiceStrategy.opts).toStrictEqual({
517 retries: 6,
518 runTime: { median: true },
519 waitTime: { median: false },
520 elu: { median: true }
521 })
522 }
523 expect(
524 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
525 ).toStrictEqual({
526 runTime: {
527 aggregate: true,
528 average: false,
529 median: true
530 },
531 waitTime: {
532 aggregate: false,
533 average: false,
534 median: false
535 },
536 elu: {
537 aggregate: true,
538 average: false,
539 median: true
540 }
541 })
542 pool.setWorkerChoiceStrategyOptions({
543 runTime: { median: false },
544 elu: { median: false }
545 })
546 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
547 retries: 6,
548 runTime: { median: false },
549 waitTime: { median: false },
550 elu: { median: false }
551 })
552 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
553 retries: 6,
554 runTime: { median: false },
555 waitTime: { median: false },
556 elu: { median: false }
557 })
558 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
559 .workerChoiceStrategies) {
560 expect(workerChoiceStrategy.opts).toStrictEqual({
561 retries: 6,
562 runTime: { median: false },
563 waitTime: { median: false },
564 elu: { median: false }
565 })
566 }
567 expect(
568 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
569 ).toStrictEqual({
570 runTime: {
571 aggregate: true,
572 average: true,
573 median: false
574 },
575 waitTime: {
576 aggregate: false,
577 average: false,
578 median: false
579 },
580 elu: {
581 aggregate: true,
582 average: true,
583 median: false
584 }
585 })
586 expect(() =>
587 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
588 ).toThrowError(
589 new TypeError(
590 'Invalid worker choice strategy options: must be a plain object'
591 )
592 )
593 expect(() =>
594 pool.setWorkerChoiceStrategyOptions({
595 retries: 'invalidChoiceRetries'
596 })
597 ).toThrowError(
598 new TypeError(
599 'Invalid worker choice strategy options: retries must be an integer'
600 )
601 )
602 expect(() =>
603 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
604 ).toThrowError(
605 new RangeError(
606 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
607 )
608 )
609 expect(() =>
610 pool.setWorkerChoiceStrategyOptions({ weights: {} })
611 ).toThrowError(
612 new Error(
613 'Invalid worker choice strategy options: must have a weight for each worker node'
614 )
615 )
616 expect(() =>
617 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
618 ).toThrowError(
619 new Error(
620 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
621 )
622 )
623 await pool.destroy()
624 })
625
626 it('Verify that pool tasks queue can be enabled/disabled', async () => {
627 const pool = new FixedThreadPool(
628 numberOfWorkers,
629 './tests/worker-files/thread/testWorker.js'
630 )
631 expect(pool.opts.enableTasksQueue).toBe(false)
632 expect(pool.opts.tasksQueueOptions).toBeUndefined()
633 pool.enableTasksQueue(true)
634 expect(pool.opts.enableTasksQueue).toBe(true)
635 expect(pool.opts.tasksQueueOptions).toStrictEqual({
636 concurrency: 1,
637 size: 4,
638 taskStealing: true,
639 tasksStealingOnBackPressure: true
640 })
641 pool.enableTasksQueue(true, { concurrency: 2 })
642 expect(pool.opts.enableTasksQueue).toBe(true)
643 expect(pool.opts.tasksQueueOptions).toStrictEqual({
644 concurrency: 2,
645 size: 4,
646 taskStealing: true,
647 tasksStealingOnBackPressure: true
648 })
649 pool.enableTasksQueue(false)
650 expect(pool.opts.enableTasksQueue).toBe(false)
651 expect(pool.opts.tasksQueueOptions).toBeUndefined()
652 await pool.destroy()
653 })
654
655 it('Verify that pool tasks queue options can be set', async () => {
656 const pool = new FixedThreadPool(
657 numberOfWorkers,
658 './tests/worker-files/thread/testWorker.js',
659 { enableTasksQueue: true }
660 )
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 1,
663 size: 4,
664 taskStealing: true,
665 tasksStealingOnBackPressure: true
666 })
667 pool.setTasksQueueOptions({ concurrency: 2 })
668 expect(pool.opts.tasksQueueOptions).toStrictEqual({
669 concurrency: 2,
670 size: 4,
671 taskStealing: true,
672 tasksStealingOnBackPressure: true
673 })
674 expect(() =>
675 pool.setTasksQueueOptions('invalidTasksQueueOptions')
676 ).toThrowError(
677 new TypeError('Invalid tasks queue options: must be a plain object')
678 )
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
680 new RangeError(
681 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
682 )
683 )
684 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
685 new RangeError(
686 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
687 )
688 )
689 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
690 new TypeError('Invalid worker node tasks concurrency: must be an integer')
691 )
692 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
693 new RangeError(
694 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
695 )
696 )
697 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
698 new RangeError(
699 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
700 )
701 )
702 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
703 new TypeError('Invalid worker node tasks queue size: must be an integer')
704 )
705 await pool.destroy()
706 })
707
708 it('Verify that pool info is set', async () => {
709 let pool = new FixedThreadPool(
710 numberOfWorkers,
711 './tests/worker-files/thread/testWorker.js'
712 )
713 expect(pool.info).toStrictEqual({
714 version,
715 type: PoolTypes.fixed,
716 worker: WorkerTypes.thread,
717 started: true,
718 ready: true,
719 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
720 minSize: numberOfWorkers,
721 maxSize: numberOfWorkers,
722 workerNodes: numberOfWorkers,
723 idleWorkerNodes: numberOfWorkers,
724 busyWorkerNodes: 0,
725 executedTasks: 0,
726 executingTasks: 0,
727 failedTasks: 0
728 })
729 await pool.destroy()
730 pool = new DynamicClusterPool(
731 Math.floor(numberOfWorkers / 2),
732 numberOfWorkers,
733 './tests/worker-files/cluster/testWorker.js'
734 )
735 expect(pool.info).toStrictEqual({
736 version,
737 type: PoolTypes.dynamic,
738 worker: WorkerTypes.cluster,
739 started: true,
740 ready: true,
741 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
742 minSize: Math.floor(numberOfWorkers / 2),
743 maxSize: numberOfWorkers,
744 workerNodes: Math.floor(numberOfWorkers / 2),
745 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 busyWorkerNodes: 0,
747 executedTasks: 0,
748 executingTasks: 0,
749 failedTasks: 0
750 })
751 await pool.destroy()
752 })
753
754 it('Verify that pool worker tasks usage are initialized', async () => {
755 const pool = new FixedClusterPool(
756 numberOfWorkers,
757 './tests/worker-files/cluster/testWorker.js'
758 )
759 for (const workerNode of pool.workerNodes) {
760 expect(workerNode).toBeInstanceOf(WorkerNode)
761 expect(workerNode.usage).toStrictEqual({
762 tasks: {
763 executed: 0,
764 executing: 0,
765 queued: 0,
766 maxQueued: 0,
767 stolen: 0,
768 failed: 0
769 },
770 runTime: {
771 history: new CircularArray()
772 },
773 waitTime: {
774 history: new CircularArray()
775 },
776 elu: {
777 idle: {
778 history: new CircularArray()
779 },
780 active: {
781 history: new CircularArray()
782 }
783 }
784 })
785 }
786 await pool.destroy()
787 })
788
789 it('Verify that pool worker tasks queue are initialized', async () => {
790 let pool = new FixedClusterPool(
791 numberOfWorkers,
792 './tests/worker-files/cluster/testWorker.js'
793 )
794 for (const workerNode of pool.workerNodes) {
795 expect(workerNode).toBeInstanceOf(WorkerNode)
796 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
797 expect(workerNode.tasksQueue.size).toBe(0)
798 expect(workerNode.tasksQueue.maxSize).toBe(0)
799 }
800 await pool.destroy()
801 pool = new DynamicThreadPool(
802 Math.floor(numberOfWorkers / 2),
803 numberOfWorkers,
804 './tests/worker-files/thread/testWorker.js'
805 )
806 for (const workerNode of pool.workerNodes) {
807 expect(workerNode).toBeInstanceOf(WorkerNode)
808 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
809 expect(workerNode.tasksQueue.size).toBe(0)
810 expect(workerNode.tasksQueue.maxSize).toBe(0)
811 }
812 await pool.destroy()
813 })
814
815 it('Verify that pool worker info are initialized', async () => {
816 let pool = new FixedClusterPool(
817 numberOfWorkers,
818 './tests/worker-files/cluster/testWorker.js'
819 )
820 for (const workerNode of pool.workerNodes) {
821 expect(workerNode).toBeInstanceOf(WorkerNode)
822 expect(workerNode.info).toStrictEqual({
823 id: expect.any(Number),
824 type: WorkerTypes.cluster,
825 dynamic: false,
826 ready: true
827 })
828 }
829 await pool.destroy()
830 pool = new DynamicThreadPool(
831 Math.floor(numberOfWorkers / 2),
832 numberOfWorkers,
833 './tests/worker-files/thread/testWorker.js'
834 )
835 for (const workerNode of pool.workerNodes) {
836 expect(workerNode).toBeInstanceOf(WorkerNode)
837 expect(workerNode.info).toStrictEqual({
838 id: expect.any(Number),
839 type: WorkerTypes.thread,
840 dynamic: false,
841 ready: true
842 })
843 }
844 await pool.destroy()
845 })
846
847 it('Verify that pool can be started after initialization', async () => {
848 const pool = new FixedClusterPool(
849 numberOfWorkers,
850 './tests/worker-files/cluster/testWorker.js',
851 {
852 startWorkers: false
853 }
854 )
855 expect(pool.info.started).toBe(false)
856 expect(pool.info.ready).toBe(false)
857 expect(pool.workerNodes).toStrictEqual([])
858 await expect(pool.execute()).rejects.toThrowError(
859 new Error('Cannot execute a task on not started pool')
860 )
861 pool.start()
862 expect(pool.info.started).toBe(true)
863 expect(pool.info.ready).toBe(true)
864 expect(pool.workerNodes.length).toBe(numberOfWorkers)
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 }
868 await pool.destroy()
869 })
870
871 it('Verify that pool execute() arguments are checked', async () => {
872 const pool = new FixedClusterPool(
873 numberOfWorkers,
874 './tests/worker-files/cluster/testWorker.js'
875 )
876 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
877 new TypeError('name argument must be a string')
878 )
879 await expect(pool.execute(undefined, '')).rejects.toThrowError(
880 new TypeError('name argument must not be an empty string')
881 )
882 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
883 new TypeError('transferList argument must be an array')
884 )
885 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
886 "Task function 'unknown' not found"
887 )
888 await pool.destroy()
889 await expect(pool.execute()).rejects.toThrowError(
890 new Error('Cannot execute a task on not started pool')
891 )
892 })
893
894 it('Verify that pool worker tasks usage are computed', async () => {
895 const pool = new FixedClusterPool(
896 numberOfWorkers,
897 './tests/worker-files/cluster/testWorker.js'
898 )
899 const promises = new Set()
900 const maxMultiplier = 2
901 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
902 promises.add(pool.execute())
903 }
904 for (const workerNode of pool.workerNodes) {
905 expect(workerNode.usage).toStrictEqual({
906 tasks: {
907 executed: 0,
908 executing: maxMultiplier,
909 queued: 0,
910 maxQueued: 0,
911 stolen: 0,
912 failed: 0
913 },
914 runTime: {
915 history: expect.any(CircularArray)
916 },
917 waitTime: {
918 history: expect.any(CircularArray)
919 },
920 elu: {
921 idle: {
922 history: expect.any(CircularArray)
923 },
924 active: {
925 history: expect.any(CircularArray)
926 }
927 }
928 })
929 }
930 await Promise.all(promises)
931 for (const workerNode of pool.workerNodes) {
932 expect(workerNode.usage).toStrictEqual({
933 tasks: {
934 executed: maxMultiplier,
935 executing: 0,
936 queued: 0,
937 maxQueued: 0,
938 stolen: 0,
939 failed: 0
940 },
941 runTime: {
942 history: expect.any(CircularArray)
943 },
944 waitTime: {
945 history: expect.any(CircularArray)
946 },
947 elu: {
948 idle: {
949 history: expect.any(CircularArray)
950 },
951 active: {
952 history: expect.any(CircularArray)
953 }
954 }
955 })
956 }
957 await pool.destroy()
958 })
959
960 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
961 const pool = new DynamicThreadPool(
962 Math.floor(numberOfWorkers / 2),
963 numberOfWorkers,
964 './tests/worker-files/thread/testWorker.js'
965 )
966 const promises = new Set()
967 const maxMultiplier = 2
968 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
969 promises.add(pool.execute())
970 }
971 await Promise.all(promises)
972 for (const workerNode of pool.workerNodes) {
973 expect(workerNode.usage).toStrictEqual({
974 tasks: {
975 executed: expect.any(Number),
976 executing: 0,
977 queued: 0,
978 maxQueued: 0,
979 stolen: 0,
980 failed: 0
981 },
982 runTime: {
983 history: expect.any(CircularArray)
984 },
985 waitTime: {
986 history: expect.any(CircularArray)
987 },
988 elu: {
989 idle: {
990 history: expect.any(CircularArray)
991 },
992 active: {
993 history: expect.any(CircularArray)
994 }
995 }
996 })
997 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
998 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
999 numberOfWorkers * maxMultiplier
1000 )
1001 expect(workerNode.usage.runTime.history.length).toBe(0)
1002 expect(workerNode.usage.waitTime.history.length).toBe(0)
1003 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1004 expect(workerNode.usage.elu.active.history.length).toBe(0)
1005 }
1006 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1007 for (const workerNode of pool.workerNodes) {
1008 expect(workerNode.usage).toStrictEqual({
1009 tasks: {
1010 executed: 0,
1011 executing: 0,
1012 queued: 0,
1013 maxQueued: 0,
1014 stolen: 0,
1015 failed: 0
1016 },
1017 runTime: {
1018 history: expect.any(CircularArray)
1019 },
1020 waitTime: {
1021 history: expect.any(CircularArray)
1022 },
1023 elu: {
1024 idle: {
1025 history: expect.any(CircularArray)
1026 },
1027 active: {
1028 history: expect.any(CircularArray)
1029 }
1030 }
1031 })
1032 expect(workerNode.usage.runTime.history.length).toBe(0)
1033 expect(workerNode.usage.waitTime.history.length).toBe(0)
1034 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1035 expect(workerNode.usage.elu.active.history.length).toBe(0)
1036 }
1037 await pool.destroy()
1038 })
1039
1040 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1041 const pool = new DynamicClusterPool(
1042 Math.floor(numberOfWorkers / 2),
1043 numberOfWorkers,
1044 './tests/worker-files/cluster/testWorker.js'
1045 )
1046 let poolInfo
1047 let poolReady = 0
1048 pool.emitter.on(PoolEvents.ready, info => {
1049 ++poolReady
1050 poolInfo = info
1051 })
1052 await waitPoolEvents(pool, PoolEvents.ready, 1)
1053 expect(poolReady).toBe(1)
1054 expect(poolInfo).toStrictEqual({
1055 version,
1056 type: PoolTypes.dynamic,
1057 worker: WorkerTypes.cluster,
1058 started: true,
1059 ready: true,
1060 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1061 minSize: expect.any(Number),
1062 maxSize: expect.any(Number),
1063 workerNodes: expect.any(Number),
1064 idleWorkerNodes: expect.any(Number),
1065 busyWorkerNodes: expect.any(Number),
1066 executedTasks: expect.any(Number),
1067 executingTasks: expect.any(Number),
1068 failedTasks: expect.any(Number)
1069 })
1070 await pool.destroy()
1071 })
1072
1073 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1074 const pool = new FixedThreadPool(
1075 numberOfWorkers,
1076 './tests/worker-files/thread/testWorker.js'
1077 )
1078 const promises = new Set()
1079 let poolBusy = 0
1080 let poolInfo
1081 pool.emitter.on(PoolEvents.busy, info => {
1082 ++poolBusy
1083 poolInfo = info
1084 })
1085 for (let i = 0; i < numberOfWorkers * 2; i++) {
1086 promises.add(pool.execute())
1087 }
1088 await Promise.all(promises)
1089 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1090 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1091 expect(poolBusy).toBe(numberOfWorkers + 1)
1092 expect(poolInfo).toStrictEqual({
1093 version,
1094 type: PoolTypes.fixed,
1095 worker: WorkerTypes.thread,
1096 started: true,
1097 ready: true,
1098 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1099 minSize: expect.any(Number),
1100 maxSize: expect.any(Number),
1101 workerNodes: expect.any(Number),
1102 idleWorkerNodes: expect.any(Number),
1103 busyWorkerNodes: expect.any(Number),
1104 executedTasks: expect.any(Number),
1105 executingTasks: expect.any(Number),
1106 failedTasks: expect.any(Number)
1107 })
1108 await pool.destroy()
1109 })
1110
1111 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1112 const pool = new DynamicThreadPool(
1113 Math.floor(numberOfWorkers / 2),
1114 numberOfWorkers,
1115 './tests/worker-files/thread/testWorker.js'
1116 )
1117 const promises = new Set()
1118 let poolFull = 0
1119 let poolInfo
1120 pool.emitter.on(PoolEvents.full, info => {
1121 ++poolFull
1122 poolInfo = info
1123 })
1124 for (let i = 0; i < numberOfWorkers * 2; i++) {
1125 promises.add(pool.execute())
1126 }
1127 await Promise.all(promises)
1128 expect(poolFull).toBe(1)
1129 expect(poolInfo).toStrictEqual({
1130 version,
1131 type: PoolTypes.dynamic,
1132 worker: WorkerTypes.thread,
1133 started: true,
1134 ready: true,
1135 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 minSize: expect.any(Number),
1137 maxSize: expect.any(Number),
1138 workerNodes: expect.any(Number),
1139 idleWorkerNodes: expect.any(Number),
1140 busyWorkerNodes: expect.any(Number),
1141 executedTasks: expect.any(Number),
1142 executingTasks: expect.any(Number),
1143 failedTasks: expect.any(Number)
1144 })
1145 await pool.destroy()
1146 })
1147
1148 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1149 const pool = new FixedThreadPool(
1150 numberOfWorkers,
1151 './tests/worker-files/thread/testWorker.js',
1152 {
1153 enableTasksQueue: true
1154 }
1155 )
1156 sinon.stub(pool, 'hasBackPressure').returns(true)
1157 const promises = new Set()
1158 let poolBackPressure = 0
1159 let poolInfo
1160 pool.emitter.on(PoolEvents.backPressure, info => {
1161 ++poolBackPressure
1162 poolInfo = info
1163 })
1164 for (let i = 0; i < numberOfWorkers + 1; i++) {
1165 promises.add(pool.execute())
1166 }
1167 await Promise.all(promises)
1168 expect(poolBackPressure).toBe(1)
1169 expect(poolInfo).toStrictEqual({
1170 version,
1171 type: PoolTypes.fixed,
1172 worker: WorkerTypes.thread,
1173 started: true,
1174 ready: true,
1175 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1176 minSize: expect.any(Number),
1177 maxSize: expect.any(Number),
1178 workerNodes: expect.any(Number),
1179 idleWorkerNodes: expect.any(Number),
1180 busyWorkerNodes: expect.any(Number),
1181 executedTasks: expect.any(Number),
1182 executingTasks: expect.any(Number),
1183 maxQueuedTasks: expect.any(Number),
1184 queuedTasks: expect.any(Number),
1185 backPressure: true,
1186 stolenTasks: expect.any(Number),
1187 failedTasks: expect.any(Number)
1188 })
1189 expect(pool.hasBackPressure.called).toBe(true)
1190 await pool.destroy()
1191 })
1192
1193 it('Verify that listTaskFunctions() is working', async () => {
1194 const dynamicThreadPool = new DynamicThreadPool(
1195 Math.floor(numberOfWorkers / 2),
1196 numberOfWorkers,
1197 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1198 )
1199 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1200 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1201 DEFAULT_TASK_NAME,
1202 'jsonIntegerSerialization',
1203 'factorial',
1204 'fibonacci'
1205 ])
1206 const fixedClusterPool = new FixedClusterPool(
1207 numberOfWorkers,
1208 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1209 )
1210 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1211 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1212 DEFAULT_TASK_NAME,
1213 'jsonIntegerSerialization',
1214 'factorial',
1215 'fibonacci'
1216 ])
1217 await dynamicThreadPool.destroy()
1218 await fixedClusterPool.destroy()
1219 })
1220
1221 it('Verify that multiple task functions worker is working', async () => {
1222 const pool = new DynamicClusterPool(
1223 Math.floor(numberOfWorkers / 2),
1224 numberOfWorkers,
1225 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1226 )
1227 const data = { n: 10 }
1228 const result0 = await pool.execute(data)
1229 expect(result0).toStrictEqual({ ok: 1 })
1230 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1231 expect(result1).toStrictEqual({ ok: 1 })
1232 const result2 = await pool.execute(data, 'factorial')
1233 expect(result2).toBe(3628800)
1234 const result3 = await pool.execute(data, 'fibonacci')
1235 expect(result3).toBe(55)
1236 expect(pool.info.executingTasks).toBe(0)
1237 expect(pool.info.executedTasks).toBe(4)
1238 for (const workerNode of pool.workerNodes) {
1239 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1240 DEFAULT_TASK_NAME,
1241 'jsonIntegerSerialization',
1242 'factorial',
1243 'fibonacci'
1244 ])
1245 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1246 for (const name of pool.listTaskFunctionNames()) {
1247 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1248 tasks: {
1249 executed: expect.any(Number),
1250 executing: 0,
1251 failed: 0,
1252 queued: 0,
1253 stolen: 0
1254 },
1255 runTime: {
1256 history: expect.any(CircularArray)
1257 },
1258 waitTime: {
1259 history: expect.any(CircularArray)
1260 },
1261 elu: {
1262 idle: {
1263 history: expect.any(CircularArray)
1264 },
1265 active: {
1266 history: expect.any(CircularArray)
1267 }
1268 }
1269 })
1270 expect(
1271 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1272 ).toBeGreaterThan(0)
1273 }
1274 expect(
1275 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1276 ).toStrictEqual(
1277 workerNode.getTaskFunctionWorkerUsage(
1278 workerNode.info.taskFunctionNames[1]
1279 )
1280 )
1281 }
1282 await pool.destroy()
1283 })
1284
1285 it('Verify sendKillMessageToWorker()', async () => {
1286 const pool = new DynamicClusterPool(
1287 Math.floor(numberOfWorkers / 2),
1288 numberOfWorkers,
1289 './tests/worker-files/cluster/testWorker.js'
1290 )
1291 const workerNodeKey = 0
1292 await expect(
1293 pool.sendKillMessageToWorker(
1294 workerNodeKey,
1295 pool.workerNodes[workerNodeKey].info.id
1296 )
1297 ).resolves.toBeUndefined()
1298 await pool.destroy()
1299 })
1300 })