fix: add maximum tasks queue size to worker usage data
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 WorkerChoiceStrategies,
4 DynamicThreadPool,
5 FixedThreadPool,
6 FixedClusterPool
7 } = require('../../../lib')
8 const { CircularArray } = require('../../../lib/circular-array')
9
10 describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeId
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).nextWorkerNodeId
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 useDynamicWorker: true
127 })
128 await pool.destroy()
129 pool = new DynamicThreadPool(
130 min,
131 max,
132 './tests/worker-files/thread/testWorker.js',
133 { workerChoiceStrategy }
134 )
135 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
136 useDynamicWorker: true
137 })
138 // We need to clean up the resources after our test
139 await pool.destroy()
140 })
141
142 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
143 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
144 let pool = new FixedThreadPool(
145 max,
146 './tests/worker-files/thread/testWorker.js',
147 { workerChoiceStrategy }
148 )
149 expect(
150 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
151 ).toStrictEqual({
152 runTime: {
153 aggregate: false,
154 average: false,
155 median: false
156 },
157 waitTime: {
158 aggregate: false,
159 average: false,
160 median: false
161 },
162 elu: {
163 aggregate: false,
164 average: false,
165 median: false
166 }
167 })
168 await pool.destroy()
169 pool = new DynamicThreadPool(
170 min,
171 max,
172 './tests/worker-files/thread/testWorker.js',
173 { workerChoiceStrategy }
174 )
175 expect(
176 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
177 ).toStrictEqual({
178 runTime: {
179 aggregate: false,
180 average: false,
181 median: false
182 },
183 waitTime: {
184 aggregate: false,
185 average: false,
186 median: false
187 },
188 elu: {
189 aggregate: false,
190 average: false,
191 median: false
192 }
193 })
194 // We need to clean up the resources after our test
195 await pool.destroy()
196 })
197
198 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
199 const pool = new FixedThreadPool(
200 max,
201 './tests/worker-files/thread/testWorker.js',
202 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
203 )
204 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
205 const promises = new Set()
206 const maxMultiplier = 2
207 for (let i = 0; i < max * maxMultiplier; i++) {
208 promises.add(pool.execute())
209 }
210 await Promise.all(promises)
211 for (const workerNode of pool.workerNodes) {
212 expect(workerNode.workerUsage).toStrictEqual({
213 tasks: {
214 executed: maxMultiplier,
215 executing: 0,
216 queued: 0,
217 maxQueued: 0,
218 failed: 0
219 },
220 runTime: {
221 aggregate: 0,
222 average: 0,
223 median: 0,
224 history: expect.any(CircularArray)
225 },
226 waitTime: {
227 aggregate: 0,
228 average: 0,
229 median: 0,
230 history: expect.any(CircularArray)
231 },
232 elu: {
233 idle: {
234 aggregate: 0,
235 average: 0,
236 median: 0,
237 history: expect.any(CircularArray)
238 },
239 active: {
240 aggregate: 0,
241 average: 0,
242 median: 0,
243 history: expect.any(CircularArray)
244 },
245 utilization: 0
246 }
247 })
248 }
249 expect(
250 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
251 WorkerChoiceStrategies.ROUND_ROBIN
252 ).nextWorkerNodeId
253 ).toBe(0)
254 // We need to clean up the resources after our test
255 await pool.destroy()
256 })
257
258 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
259 const pool = new DynamicThreadPool(
260 min,
261 max,
262 './tests/worker-files/thread/testWorker.js',
263 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
264 )
265 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
266 const promises = new Set()
267 const maxMultiplier = 2
268 for (let i = 0; i < max * maxMultiplier; i++) {
269 promises.add(pool.execute())
270 }
271 await Promise.all(promises)
272 for (const workerNode of pool.workerNodes) {
273 expect(workerNode.workerUsage).toStrictEqual({
274 tasks: {
275 executed: maxMultiplier,
276 executing: 0,
277 queued: 0,
278 maxQueued: 0,
279 failed: 0
280 },
281 runTime: {
282 aggregate: 0,
283 average: 0,
284 median: 0,
285 history: expect.any(CircularArray)
286 },
287 waitTime: {
288 aggregate: 0,
289 average: 0,
290 median: 0,
291 history: expect.any(CircularArray)
292 },
293 elu: {
294 idle: {
295 aggregate: 0,
296 average: 0,
297 median: 0,
298 history: expect.any(CircularArray)
299 },
300 active: {
301 aggregate: 0,
302 average: 0,
303 median: 0,
304 history: expect.any(CircularArray)
305 },
306 utilization: 0
307 }
308 })
309 }
310 expect(
311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
312 WorkerChoiceStrategies.ROUND_ROBIN
313 ).nextWorkerNodeId
314 ).toBe(0)
315 // We need to clean up the resources after our test
316 await pool.destroy()
317 })
318
319 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
320 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
321 let pool = new FixedClusterPool(
322 max,
323 './tests/worker-files/cluster/testWorker.js',
324 { workerChoiceStrategy }
325 )
326 let results = new Set()
327 for (let i = 0; i < max; i++) {
328 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
329 }
330 expect(results.size).toBe(max)
331 await pool.destroy()
332 pool = new FixedThreadPool(
333 max,
334 './tests/worker-files/thread/testWorker.js',
335 { workerChoiceStrategy }
336 )
337 results = new Set()
338 for (let i = 0; i < max; i++) {
339 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
340 }
341 expect(results.size).toBe(max)
342 await pool.destroy()
343 })
344
345 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
346 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
347 let pool = new FixedThreadPool(
348 max,
349 './tests/worker-files/thread/testWorker.js',
350 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
351 )
352 expect(
353 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
354 workerChoiceStrategy
355 ).nextWorkerNodeId
356 ).toBeDefined()
357 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
358 expect(
359 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
360 pool.workerChoiceStrategyContext.workerChoiceStrategy
361 ).nextWorkerNodeId
362 ).toBe(0)
363 await pool.destroy()
364 pool = new DynamicThreadPool(
365 min,
366 max,
367 './tests/worker-files/thread/testWorker.js',
368 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
369 )
370 expect(
371 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
372 workerChoiceStrategy
373 ).nextWorkerNodeId
374 ).toBeDefined()
375 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
376 expect(
377 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
378 pool.workerChoiceStrategyContext.workerChoiceStrategy
379 ).nextWorkerNodeId
380 ).toBe(0)
381 // We need to clean up the resources after our test
382 await pool.destroy()
383 })
384
385 it('Verify LEAST_USED strategy default policy', async () => {
386 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
387 let pool = new FixedThreadPool(
388 max,
389 './tests/worker-files/thread/testWorker.js',
390 { workerChoiceStrategy }
391 )
392 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
393 useDynamicWorker: false
394 })
395 await pool.destroy()
396 pool = new DynamicThreadPool(
397 min,
398 max,
399 './tests/worker-files/thread/testWorker.js',
400 { workerChoiceStrategy }
401 )
402 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
403 useDynamicWorker: false
404 })
405 // We need to clean up the resources after our test
406 await pool.destroy()
407 })
408
409 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
410 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
411 let pool = new FixedThreadPool(
412 max,
413 './tests/worker-files/thread/testWorker.js',
414 { workerChoiceStrategy }
415 )
416 expect(
417 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 ).toStrictEqual({
419 runTime: {
420 aggregate: false,
421 average: false,
422 median: false
423 },
424 waitTime: {
425 aggregate: false,
426 average: false,
427 median: false
428 },
429 elu: {
430 aggregate: false,
431 average: false,
432 median: false
433 }
434 })
435 await pool.destroy()
436 pool = new DynamicThreadPool(
437 min,
438 max,
439 './tests/worker-files/thread/testWorker.js',
440 { workerChoiceStrategy }
441 )
442 expect(
443 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 ).toStrictEqual({
445 runTime: {
446 aggregate: false,
447 average: false,
448 median: false
449 },
450 waitTime: {
451 aggregate: false,
452 average: false,
453 median: false
454 },
455 elu: {
456 aggregate: false,
457 average: false,
458 median: false
459 }
460 })
461 // We need to clean up the resources after our test
462 await pool.destroy()
463 })
464
465 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
466 const pool = new FixedThreadPool(
467 max,
468 './tests/worker-files/thread/testWorker.js',
469 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
470 )
471 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
472 const promises = new Set()
473 const maxMultiplier = 2
474 for (let i = 0; i < max * maxMultiplier; i++) {
475 promises.add(pool.execute())
476 }
477 await Promise.all(promises)
478 for (const workerNode of pool.workerNodes) {
479 expect(workerNode.workerUsage).toStrictEqual({
480 tasks: {
481 executed: expect.any(Number),
482 executing: 0,
483 queued: 0,
484 maxQueued: 0,
485 failed: 0
486 },
487 runTime: {
488 aggregate: 0,
489 average: 0,
490 median: 0,
491 history: expect.any(CircularArray)
492 },
493 waitTime: {
494 aggregate: 0,
495 average: 0,
496 median: 0,
497 history: expect.any(CircularArray)
498 },
499 elu: {
500 idle: {
501 aggregate: 0,
502 average: 0,
503 median: 0,
504 history: expect.any(CircularArray)
505 },
506 active: {
507 aggregate: 0,
508 average: 0,
509 median: 0,
510 history: expect.any(CircularArray)
511 },
512 utilization: 0
513 }
514 })
515 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
516 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
517 max * maxMultiplier
518 )
519 }
520 // We need to clean up the resources after our test
521 await pool.destroy()
522 })
523
524 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
525 const pool = new DynamicThreadPool(
526 min,
527 max,
528 './tests/worker-files/thread/testWorker.js',
529 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
530 )
531 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
532 const promises = new Set()
533 const maxMultiplier = 2
534 for (let i = 0; i < max * maxMultiplier; i++) {
535 promises.add(pool.execute())
536 }
537 await Promise.all(promises)
538 for (const workerNode of pool.workerNodes) {
539 expect(workerNode.workerUsage).toStrictEqual({
540 tasks: {
541 executed: expect.any(Number),
542 executing: 0,
543 queued: 0,
544 maxQueued: 0,
545 failed: 0
546 },
547 runTime: {
548 aggregate: 0,
549 average: 0,
550 median: 0,
551 history: expect.any(CircularArray)
552 },
553 waitTime: {
554 aggregate: 0,
555 average: 0,
556 median: 0,
557 history: expect.any(CircularArray)
558 },
559 elu: {
560 idle: {
561 aggregate: 0,
562 average: 0,
563 median: 0,
564 history: expect.any(CircularArray)
565 },
566 active: {
567 aggregate: 0,
568 average: 0,
569 median: 0,
570 history: expect.any(CircularArray)
571 },
572 utilization: 0
573 }
574 })
575 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
576 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
577 max * maxMultiplier
578 )
579 }
580 // We need to clean up the resources after our test
581 await pool.destroy()
582 })
583
584 it('Verify LEAST_BUSY strategy default policy', async () => {
585 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
586 let pool = new FixedThreadPool(
587 max,
588 './tests/worker-files/thread/testWorker.js',
589 { workerChoiceStrategy }
590 )
591 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
592 useDynamicWorker: false
593 })
594 await pool.destroy()
595 pool = new DynamicThreadPool(
596 min,
597 max,
598 './tests/worker-files/thread/testWorker.js',
599 { workerChoiceStrategy }
600 )
601 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
602 useDynamicWorker: false
603 })
604 // We need to clean up the resources after our test
605 await pool.destroy()
606 })
607
608 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
609 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
610 let pool = new FixedThreadPool(
611 max,
612 './tests/worker-files/thread/testWorker.js',
613 { workerChoiceStrategy }
614 )
615 expect(
616 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
617 ).toStrictEqual({
618 runTime: {
619 aggregate: true,
620 average: false,
621 median: false
622 },
623 waitTime: {
624 aggregate: true,
625 average: false,
626 median: false
627 },
628 elu: {
629 aggregate: false,
630 average: false,
631 median: false
632 }
633 })
634 await pool.destroy()
635 pool = new DynamicThreadPool(
636 min,
637 max,
638 './tests/worker-files/thread/testWorker.js',
639 { workerChoiceStrategy }
640 )
641 expect(
642 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
643 ).toStrictEqual({
644 runTime: {
645 aggregate: true,
646 average: false,
647 median: false
648 },
649 waitTime: {
650 aggregate: true,
651 average: false,
652 median: false
653 },
654 elu: {
655 aggregate: false,
656 average: false,
657 median: false
658 }
659 })
660 // We need to clean up the resources after our test
661 await pool.destroy()
662 })
663
664 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
665 const pool = new FixedThreadPool(
666 max,
667 './tests/worker-files/thread/testWorker.js',
668 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
669 )
670 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
671 const promises = new Set()
672 const maxMultiplier = 2
673 for (let i = 0; i < max * maxMultiplier; i++) {
674 promises.add(pool.execute())
675 }
676 await Promise.all(promises)
677 for (const workerNode of pool.workerNodes) {
678 expect(workerNode.workerUsage).toStrictEqual({
679 tasks: {
680 executed: expect.any(Number),
681 executing: 0,
682 queued: 0,
683 maxQueued: 0,
684 failed: 0
685 },
686 runTime: {
687 aggregate: expect.any(Number),
688 average: 0,
689 median: 0,
690 history: expect.any(CircularArray)
691 },
692 waitTime: {
693 aggregate: expect.any(Number),
694 average: 0,
695 median: 0,
696 history: expect.any(CircularArray)
697 },
698 elu: {
699 idle: {
700 aggregate: 0,
701 average: 0,
702 median: 0,
703 history: expect.any(CircularArray)
704 },
705 active: {
706 aggregate: 0,
707 average: 0,
708 median: 0,
709 history: expect.any(CircularArray)
710 },
711 utilization: 0
712 }
713 })
714 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
715 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
716 max * maxMultiplier
717 )
718 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
719 expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual(
720 0
721 )
722 }
723 // We need to clean up the resources after our test
724 await pool.destroy()
725 })
726
727 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
728 const pool = new DynamicThreadPool(
729 min,
730 max,
731 './tests/worker-files/thread/testWorker.js',
732 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
733 )
734 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
735 const promises = new Set()
736 const maxMultiplier = 2
737 for (let i = 0; i < max * maxMultiplier; i++) {
738 promises.add(pool.execute())
739 }
740 await Promise.all(promises)
741 for (const workerNode of pool.workerNodes) {
742 expect(workerNode.workerUsage).toStrictEqual({
743 tasks: {
744 executed: expect.any(Number),
745 executing: 0,
746 queued: 0,
747 maxQueued: 0,
748 failed: 0
749 },
750 runTime: {
751 aggregate: expect.any(Number),
752 average: 0,
753 median: 0,
754 history: expect.any(CircularArray)
755 },
756 waitTime: {
757 aggregate: expect.any(Number),
758 average: 0,
759 median: 0,
760 history: expect.any(CircularArray)
761 },
762 elu: {
763 idle: {
764 aggregate: 0,
765 average: 0,
766 median: 0,
767 history: expect.any(CircularArray)
768 },
769 active: {
770 aggregate: 0,
771 average: 0,
772 median: 0,
773 history: expect.any(CircularArray)
774 },
775 utilization: 0
776 }
777 })
778 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
779 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
780 max * maxMultiplier
781 )
782 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
783 expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual(
784 0
785 )
786 }
787 // We need to clean up the resources after our test
788 await pool.destroy()
789 })
790
791 it('Verify LEAST_ELU strategy default policy', async () => {
792 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
793 let pool = new FixedThreadPool(
794 max,
795 './tests/worker-files/thread/testWorker.js',
796 { workerChoiceStrategy }
797 )
798 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
799 useDynamicWorker: false
800 })
801 await pool.destroy()
802 pool = new DynamicThreadPool(
803 min,
804 max,
805 './tests/worker-files/thread/testWorker.js',
806 { workerChoiceStrategy }
807 )
808 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
809 useDynamicWorker: false
810 })
811 // We need to clean up the resources after our test
812 await pool.destroy()
813 })
814
815 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
816 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
817 let pool = new FixedThreadPool(
818 max,
819 './tests/worker-files/thread/testWorker.js',
820 { workerChoiceStrategy }
821 )
822 expect(
823 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
824 ).toStrictEqual({
825 runTime: {
826 aggregate: false,
827 average: false,
828 median: false
829 },
830 waitTime: {
831 aggregate: false,
832 average: false,
833 median: false
834 },
835 elu: {
836 aggregate: true,
837 average: false,
838 median: false
839 }
840 })
841 await pool.destroy()
842 pool = new DynamicThreadPool(
843 min,
844 max,
845 './tests/worker-files/thread/testWorker.js',
846 { workerChoiceStrategy }
847 )
848 expect(
849 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
850 ).toStrictEqual({
851 runTime: {
852 aggregate: false,
853 average: false,
854 median: false
855 },
856 waitTime: {
857 aggregate: false,
858 average: false,
859 median: false
860 },
861 elu: {
862 aggregate: true,
863 average: false,
864 median: false
865 }
866 })
867 // We need to clean up the resources after our test
868 await pool.destroy()
869 })
870
871 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
872 const pool = new FixedThreadPool(
873 max,
874 './tests/worker-files/thread/testWorker.js',
875 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
876 )
877 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
878 const promises = new Set()
879 const maxMultiplier = 2
880 for (let i = 0; i < max * maxMultiplier; i++) {
881 promises.add(pool.execute())
882 }
883 await Promise.all(promises)
884 for (const workerNode of pool.workerNodes) {
885 expect(workerNode.workerUsage).toStrictEqual({
886 tasks: {
887 executed: expect.any(Number),
888 executing: 0,
889 queued: 0,
890 maxQueued: 0,
891 failed: 0
892 },
893 runTime: {
894 aggregate: 0,
895 average: 0,
896 median: 0,
897 history: expect.any(CircularArray)
898 },
899 waitTime: {
900 aggregate: 0,
901 average: 0,
902 median: 0,
903 history: expect.any(CircularArray)
904 },
905 elu: {
906 idle: {
907 aggregate: 0,
908 average: 0,
909 median: 0,
910 history: expect.any(CircularArray)
911 },
912 active: {
913 aggregate: expect.any(Number),
914 average: 0,
915 median: 0,
916 history: expect.any(CircularArray)
917 },
918 utilization: expect.any(Number)
919 }
920 })
921 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
922 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
923 max * maxMultiplier
924 )
925 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
926 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
927 }
928 // We need to clean up the resources after our test
929 await pool.destroy()
930 })
931
932 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
933 const pool = new DynamicThreadPool(
934 min,
935 max,
936 './tests/worker-files/thread/testWorker.js',
937 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
938 )
939 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
940 const promises = new Set()
941 const maxMultiplier = 2
942 for (let i = 0; i < max * maxMultiplier; i++) {
943 promises.add(pool.execute())
944 }
945 await Promise.all(promises)
946 for (const workerNode of pool.workerNodes) {
947 expect(workerNode.workerUsage).toStrictEqual({
948 tasks: {
949 executed: expect.any(Number),
950 executing: 0,
951 queued: 0,
952 maxQueued: 0,
953 failed: 0
954 },
955 runTime: {
956 aggregate: 0,
957 average: 0,
958 median: 0,
959 history: expect.any(CircularArray)
960 },
961 waitTime: {
962 aggregate: 0,
963 average: 0,
964 median: 0,
965 history: expect.any(CircularArray)
966 },
967 elu: {
968 idle: {
969 aggregate: 0,
970 average: 0,
971 median: 0,
972 history: expect.any(CircularArray)
973 },
974 active: {
975 aggregate: expect.any(Number),
976 average: 0,
977 median: 0,
978 history: expect.any(CircularArray)
979 },
980 utilization: expect.any(Number)
981 }
982 })
983 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
984 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
985 max * maxMultiplier
986 )
987 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
988 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
989 }
990 // We need to clean up the resources after our test
991 await pool.destroy()
992 })
993
994 it('Verify FAIR_SHARE strategy default policy', async () => {
995 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
996 let pool = new FixedThreadPool(
997 max,
998 './tests/worker-files/thread/testWorker.js',
999 { workerChoiceStrategy }
1000 )
1001 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1002 useDynamicWorker: false
1003 })
1004 await pool.destroy()
1005 pool = new DynamicThreadPool(
1006 min,
1007 max,
1008 './tests/worker-files/thread/testWorker.js',
1009 { workerChoiceStrategy }
1010 )
1011 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1012 useDynamicWorker: false
1013 })
1014 // We need to clean up the resources after our test
1015 await pool.destroy()
1016 })
1017
1018 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1019 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1020 let pool = new FixedThreadPool(
1021 max,
1022 './tests/worker-files/thread/testWorker.js',
1023 { workerChoiceStrategy }
1024 )
1025 expect(
1026 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1027 ).toStrictEqual({
1028 runTime: {
1029 aggregate: true,
1030 average: true,
1031 median: false
1032 },
1033 waitTime: {
1034 aggregate: false,
1035 average: false,
1036 median: false
1037 },
1038 elu: {
1039 aggregate: true,
1040 average: true,
1041 median: false
1042 }
1043 })
1044 await pool.destroy()
1045 pool = new DynamicThreadPool(
1046 min,
1047 max,
1048 './tests/worker-files/thread/testWorker.js',
1049 { workerChoiceStrategy }
1050 )
1051 expect(
1052 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1053 ).toStrictEqual({
1054 runTime: {
1055 aggregate: true,
1056 average: true,
1057 median: false
1058 },
1059 waitTime: {
1060 aggregate: false,
1061 average: false,
1062 median: false
1063 },
1064 elu: {
1065 aggregate: true,
1066 average: true,
1067 median: false
1068 }
1069 })
1070 // We need to clean up the resources after our test
1071 await pool.destroy()
1072 })
1073
1074 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1075 const pool = new FixedThreadPool(
1076 max,
1077 './tests/worker-files/thread/testWorker.js',
1078 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1079 )
1080 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1081 const promises = new Set()
1082 const maxMultiplier = 2
1083 for (let i = 0; i < max * maxMultiplier; i++) {
1084 promises.add(pool.execute())
1085 }
1086 await Promise.all(promises)
1087 for (const workerNode of pool.workerNodes) {
1088 expect(workerNode.workerUsage).toStrictEqual({
1089 tasks: {
1090 executed: expect.any(Number),
1091 executing: 0,
1092 queued: 0,
1093 maxQueued: 0,
1094 failed: 0
1095 },
1096 runTime: {
1097 aggregate: expect.any(Number),
1098 average: expect.any(Number),
1099 median: 0,
1100 history: expect.any(CircularArray)
1101 },
1102 waitTime: {
1103 aggregate: 0,
1104 average: 0,
1105 median: 0,
1106 history: expect.any(CircularArray)
1107 },
1108 elu: {
1109 idle: {
1110 aggregate: 0,
1111 average: 0,
1112 median: 0,
1113 history: expect.any(CircularArray)
1114 },
1115 active: {
1116 aggregate: expect.any(Number),
1117 average: expect.any(Number),
1118 median: 0,
1119 history: expect.any(CircularArray)
1120 },
1121 utilization: expect.any(Number)
1122 }
1123 })
1124 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1125 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1126 max * maxMultiplier
1127 )
1128 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1129 expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
1130 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1131 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1132 }
1133 expect(
1134 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1135 pool.workerChoiceStrategyContext.workerChoiceStrategy
1136 ).workersVirtualTaskEndTimestamp.length
1137 ).toBe(pool.workerNodes.length)
1138 // We need to clean up the resources after our test
1139 await pool.destroy()
1140 })
1141
1142 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1143 const pool = new DynamicThreadPool(
1144 min,
1145 max,
1146 './tests/worker-files/thread/testWorker.js',
1147 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1148 )
1149 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1150 const promises = new Set()
1151 const maxMultiplier = 2
1152 for (let i = 0; i < max * maxMultiplier; i++) {
1153 promises.add(pool.execute())
1154 }
1155 await Promise.all(promises)
1156 for (const workerNode of pool.workerNodes) {
1157 expect(workerNode.workerUsage).toStrictEqual({
1158 tasks: {
1159 executed: expect.any(Number),
1160 executing: 0,
1161 queued: 0,
1162 maxQueued: 0,
1163 failed: 0
1164 },
1165 runTime: {
1166 aggregate: expect.any(Number),
1167 average: expect.any(Number),
1168 median: 0,
1169 history: expect.any(CircularArray)
1170 },
1171 waitTime: {
1172 aggregate: 0,
1173 average: 0,
1174 median: 0,
1175 history: expect.any(CircularArray)
1176 },
1177 elu: {
1178 idle: {
1179 aggregate: 0,
1180 average: 0,
1181 median: 0,
1182 history: expect.any(CircularArray)
1183 },
1184 active: {
1185 aggregate: expect.any(Number),
1186 average: expect.any(Number),
1187 median: 0,
1188 history: expect.any(CircularArray)
1189 },
1190 utilization: expect.any(Number)
1191 }
1192 })
1193 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1194 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1195 max * maxMultiplier
1196 )
1197 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1198 expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
1199 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1200 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1201 }
1202 expect(
1203 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1204 pool.workerChoiceStrategyContext.workerChoiceStrategy
1205 ).workersVirtualTaskEndTimestamp.length
1206 ).toBe(pool.workerNodes.length)
1207 // We need to clean up the resources after our test
1208 await pool.destroy()
1209 })
1210
1211 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1212 const pool = new DynamicThreadPool(
1213 min,
1214 max,
1215 './tests/worker-files/thread/testWorker.js',
1216 {
1217 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1218 workerChoiceStrategyOptions: {
1219 runTime: { median: true }
1220 }
1221 }
1222 )
1223 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1224 const promises = new Set()
1225 const maxMultiplier = 2
1226 for (let i = 0; i < max * maxMultiplier; i++) {
1227 promises.add(pool.execute())
1228 }
1229 await Promise.all(promises)
1230 for (const workerNode of pool.workerNodes) {
1231 expect(workerNode.workerUsage).toStrictEqual({
1232 tasks: {
1233 executed: expect.any(Number),
1234 executing: 0,
1235 queued: 0,
1236 maxQueued: 0,
1237 failed: 0
1238 },
1239 runTime: {
1240 aggregate: expect.any(Number),
1241 average: 0,
1242 median: expect.any(Number),
1243 history: expect.any(CircularArray)
1244 },
1245 waitTime: {
1246 aggregate: 0,
1247 average: 0,
1248 median: 0,
1249 history: expect.any(CircularArray)
1250 },
1251 elu: {
1252 idle: {
1253 aggregate: 0,
1254 average: 0,
1255 median: 0,
1256 history: expect.any(CircularArray)
1257 },
1258 active: {
1259 aggregate: expect.any(Number),
1260 average: expect.any(Number),
1261 median: 0,
1262 history: expect.any(CircularArray)
1263 },
1264 utilization: expect.any(Number)
1265 }
1266 })
1267 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1268 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1269 max * maxMultiplier
1270 )
1271 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1272 expect(workerNode.workerUsage.runTime.median).toBeGreaterThanOrEqual(0)
1273 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1274 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1275 }
1276 expect(
1277 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1278 pool.workerChoiceStrategyContext.workerChoiceStrategy
1279 ).workersVirtualTaskEndTimestamp.length
1280 ).toBe(pool.workerNodes.length)
1281 // We need to clean up the resources after our test
1282 await pool.destroy()
1283 })
1284
1285 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1286 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1287 let pool = new FixedThreadPool(
1288 max,
1289 './tests/worker-files/thread/testWorker.js'
1290 )
1291 expect(
1292 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1293 workerChoiceStrategy
1294 ).workersVirtualTaskEndTimestamp
1295 ).toBeInstanceOf(Array)
1296 expect(
1297 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1298 workerChoiceStrategy
1299 ).workersVirtualTaskEndTimestamp.length
1300 ).toBe(0)
1301 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1302 workerChoiceStrategy
1303 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1304 expect(
1305 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1306 workerChoiceStrategy
1307 ).workersVirtualTaskEndTimestamp.length
1308 ).toBe(1)
1309 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1310 expect(
1311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1312 workerChoiceStrategy
1313 ).workersVirtualTaskEndTimestamp
1314 ).toBeInstanceOf(Array)
1315 expect(
1316 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1317 workerChoiceStrategy
1318 ).workersVirtualTaskEndTimestamp.length
1319 ).toBe(0)
1320 await pool.destroy()
1321 pool = new DynamicThreadPool(
1322 min,
1323 max,
1324 './tests/worker-files/thread/testWorker.js'
1325 )
1326 expect(
1327 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1328 workerChoiceStrategy
1329 ).workersVirtualTaskEndTimestamp
1330 ).toBeInstanceOf(Array)
1331 expect(
1332 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1333 workerChoiceStrategy
1334 ).workersVirtualTaskEndTimestamp.length
1335 ).toBe(0)
1336 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1337 workerChoiceStrategy
1338 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1339 expect(
1340 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1341 workerChoiceStrategy
1342 ).workersVirtualTaskEndTimestamp.length
1343 ).toBe(1)
1344 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1345 expect(
1346 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1347 workerChoiceStrategy
1348 ).workersVirtualTaskEndTimestamp
1349 ).toBeInstanceOf(Array)
1350 expect(
1351 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1352 workerChoiceStrategy
1353 ).workersVirtualTaskEndTimestamp.length
1354 ).toBe(0)
1355 // We need to clean up the resources after our test
1356 await pool.destroy()
1357 })
1358
1359 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1360 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1361 let pool = new FixedThreadPool(
1362 max,
1363 './tests/worker-files/thread/testWorker.js',
1364 { workerChoiceStrategy }
1365 )
1366 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1367 useDynamicWorker: true
1368 })
1369 await pool.destroy()
1370 pool = new DynamicThreadPool(
1371 min,
1372 max,
1373 './tests/worker-files/thread/testWorker.js',
1374 { workerChoiceStrategy }
1375 )
1376 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1377 useDynamicWorker: true
1378 })
1379 // We need to clean up the resources after our test
1380 await pool.destroy()
1381 })
1382
1383 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1384 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1385 let pool = new FixedThreadPool(
1386 max,
1387 './tests/worker-files/thread/testWorker.js',
1388 { workerChoiceStrategy }
1389 )
1390 expect(
1391 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1392 ).toStrictEqual({
1393 runTime: {
1394 aggregate: true,
1395 average: true,
1396 median: false
1397 },
1398 waitTime: {
1399 aggregate: false,
1400 average: false,
1401 median: false
1402 },
1403 elu: {
1404 aggregate: false,
1405 average: false,
1406 median: false
1407 }
1408 })
1409 await pool.destroy()
1410 pool = new DynamicThreadPool(
1411 min,
1412 max,
1413 './tests/worker-files/thread/testWorker.js',
1414 { workerChoiceStrategy }
1415 )
1416 expect(
1417 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1418 ).toStrictEqual({
1419 runTime: {
1420 aggregate: true,
1421 average: true,
1422 median: false
1423 },
1424 waitTime: {
1425 aggregate: false,
1426 average: false,
1427 median: false
1428 },
1429 elu: {
1430 aggregate: false,
1431 average: false,
1432 median: false
1433 }
1434 })
1435 // We need to clean up the resources after our test
1436 await pool.destroy()
1437 })
1438
1439 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1440 const pool = new FixedThreadPool(
1441 max,
1442 './tests/worker-files/thread/testWorker.js',
1443 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1444 )
1445 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1446 const promises = new Set()
1447 const maxMultiplier = 2
1448 for (let i = 0; i < max * maxMultiplier; i++) {
1449 promises.add(pool.execute())
1450 }
1451 await Promise.all(promises)
1452 for (const workerNode of pool.workerNodes) {
1453 expect(workerNode.workerUsage).toStrictEqual({
1454 tasks: {
1455 executed: expect.any(Number),
1456 executing: 0,
1457 queued: 0,
1458 maxQueued: 0,
1459 failed: 0
1460 },
1461 runTime: {
1462 aggregate: expect.any(Number),
1463 average: expect.any(Number),
1464 median: 0,
1465 history: expect.any(CircularArray)
1466 },
1467 waitTime: {
1468 aggregate: 0,
1469 average: 0,
1470 median: 0,
1471 history: expect.any(CircularArray)
1472 },
1473 elu: {
1474 idle: {
1475 aggregate: 0,
1476 average: 0,
1477 median: 0,
1478 history: expect.any(CircularArray)
1479 },
1480 active: {
1481 aggregate: 0,
1482 average: 0,
1483 median: 0,
1484 history: expect.any(CircularArray)
1485 },
1486 utilization: 0
1487 }
1488 })
1489 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1490 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1491 max * maxMultiplier
1492 )
1493 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1494 expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
1495 }
1496 expect(
1497 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1498 pool.workerChoiceStrategyContext.workerChoiceStrategy
1499 ).defaultWorkerWeight
1500 ).toBeGreaterThan(0)
1501 expect(
1502 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1503 pool.workerChoiceStrategyContext.workerChoiceStrategy
1504 ).workerVirtualTaskRunTime
1505 ).toBeGreaterThanOrEqual(0)
1506 // We need to clean up the resources after our test
1507 await pool.destroy()
1508 })
1509
1510 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1511 const pool = new DynamicThreadPool(
1512 min,
1513 max,
1514 './tests/worker-files/thread/testWorker.js',
1515 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1516 )
1517 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1518 const promises = new Set()
1519 const maxMultiplier = 2
1520 for (let i = 0; i < max * maxMultiplier; i++) {
1521 promises.add(pool.execute())
1522 }
1523 await Promise.all(promises)
1524 for (const workerNode of pool.workerNodes) {
1525 expect(workerNode.workerUsage).toStrictEqual({
1526 tasks: {
1527 executed: expect.any(Number),
1528 executing: 0,
1529 queued: 0,
1530 maxQueued: 0,
1531 failed: 0
1532 },
1533 runTime: {
1534 aggregate: expect.any(Number),
1535 average: expect.any(Number),
1536 median: 0,
1537 history: expect.any(CircularArray)
1538 },
1539 waitTime: {
1540 aggregate: 0,
1541 average: 0,
1542 median: 0,
1543 history: expect.any(CircularArray)
1544 },
1545 elu: {
1546 idle: {
1547 aggregate: 0,
1548 average: 0,
1549 median: 0,
1550 history: expect.any(CircularArray)
1551 },
1552 active: {
1553 aggregate: 0,
1554 average: 0,
1555 median: 0,
1556 history: expect.any(CircularArray)
1557 },
1558 utilization: 0
1559 }
1560 })
1561 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1562 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1563 max * maxMultiplier
1564 )
1565 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1566 expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
1567 }
1568 expect(
1569 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1570 pool.workerChoiceStrategyContext.workerChoiceStrategy
1571 ).defaultWorkerWeight
1572 ).toBeGreaterThan(0)
1573 expect(
1574 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1575 pool.workerChoiceStrategyContext.workerChoiceStrategy
1576 ).workerVirtualTaskRunTime
1577 ).toBeGreaterThanOrEqual(0)
1578 // We need to clean up the resources after our test
1579 await pool.destroy()
1580 })
1581
1582 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1583 const pool = new DynamicThreadPool(
1584 min,
1585 max,
1586 './tests/worker-files/thread/testWorker.js',
1587 {
1588 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1589 workerChoiceStrategyOptions: {
1590 runTime: { median: true }
1591 }
1592 }
1593 )
1594 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1595 const promises = new Set()
1596 const maxMultiplier = 2
1597 for (let i = 0; i < max * maxMultiplier; i++) {
1598 promises.add(pool.execute())
1599 }
1600 await Promise.all(promises)
1601 for (const workerNode of pool.workerNodes) {
1602 expect(workerNode.workerUsage).toStrictEqual({
1603 tasks: {
1604 executed: expect.any(Number),
1605 executing: 0,
1606 queued: 0,
1607 maxQueued: 0,
1608 failed: 0
1609 },
1610 runTime: {
1611 aggregate: expect.any(Number),
1612 average: 0,
1613 median: expect.any(Number),
1614 history: expect.any(CircularArray)
1615 },
1616 waitTime: {
1617 aggregate: 0,
1618 average: 0,
1619 median: 0,
1620 history: expect.any(CircularArray)
1621 },
1622 elu: {
1623 idle: {
1624 aggregate: 0,
1625 average: 0,
1626 median: 0,
1627 history: expect.any(CircularArray)
1628 },
1629 active: {
1630 aggregate: 0,
1631 average: 0,
1632 median: 0,
1633 history: expect.any(CircularArray)
1634 },
1635 utilization: 0
1636 }
1637 })
1638 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1639 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1640 max * maxMultiplier
1641 )
1642 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1643 expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
1644 }
1645 expect(
1646 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1647 pool.workerChoiceStrategyContext.workerChoiceStrategy
1648 ).defaultWorkerWeight
1649 ).toBeGreaterThan(0)
1650 expect(
1651 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1652 pool.workerChoiceStrategyContext.workerChoiceStrategy
1653 ).workerVirtualTaskRunTime
1654 ).toBeGreaterThanOrEqual(0)
1655 // We need to clean up the resources after our test
1656 await pool.destroy()
1657 })
1658
1659 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1660 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1661 let pool = new FixedThreadPool(
1662 max,
1663 './tests/worker-files/thread/testWorker.js'
1664 )
1665 expect(
1666 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1667 workerChoiceStrategy
1668 ).nextWorkerNodeId
1669 ).toBeDefined()
1670 expect(
1671 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1672 workerChoiceStrategy
1673 ).defaultWorkerWeight
1674 ).toBeDefined()
1675 expect(
1676 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1677 workerChoiceStrategy
1678 ).workerVirtualTaskRunTime
1679 ).toBeDefined()
1680 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1681 expect(
1682 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1683 pool.workerChoiceStrategyContext.workerChoiceStrategy
1684 ).nextWorkerNodeId
1685 ).toBe(0)
1686 expect(
1687 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1688 pool.workerChoiceStrategyContext.workerChoiceStrategy
1689 ).defaultWorkerWeight
1690 ).toBeGreaterThan(0)
1691 expect(
1692 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1693 workerChoiceStrategy
1694 ).workerVirtualTaskRunTime
1695 ).toBe(0)
1696 await pool.destroy()
1697 pool = new DynamicThreadPool(
1698 min,
1699 max,
1700 './tests/worker-files/thread/testWorker.js'
1701 )
1702 expect(
1703 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1704 workerChoiceStrategy
1705 ).nextWorkerNodeId
1706 ).toBeDefined()
1707 expect(
1708 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1709 workerChoiceStrategy
1710 ).defaultWorkerWeight
1711 ).toBeDefined()
1712 expect(
1713 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1714 workerChoiceStrategy
1715 ).workerVirtualTaskRunTime
1716 ).toBeDefined()
1717 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1718 expect(
1719 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1720 pool.workerChoiceStrategyContext.workerChoiceStrategy
1721 ).nextWorkerNodeId
1722 ).toBe(0)
1723 expect(
1724 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1725 pool.workerChoiceStrategyContext.workerChoiceStrategy
1726 ).defaultWorkerWeight
1727 ).toBeGreaterThan(0)
1728 expect(
1729 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1730 workerChoiceStrategy
1731 ).workerVirtualTaskRunTime
1732 ).toBe(0)
1733 // We need to clean up the resources after our test
1734 await pool.destroy()
1735 })
1736
1737 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1738 const workerChoiceStrategy =
1739 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1740 let pool = new FixedThreadPool(
1741 max,
1742 './tests/worker-files/thread/testWorker.js',
1743 { workerChoiceStrategy }
1744 )
1745 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1746 useDynamicWorker: true
1747 })
1748 await pool.destroy()
1749 pool = new DynamicThreadPool(
1750 min,
1751 max,
1752 './tests/worker-files/thread/testWorker.js',
1753 { workerChoiceStrategy }
1754 )
1755 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1756 useDynamicWorker: true
1757 })
1758 // We need to clean up the resources after our test
1759 await pool.destroy()
1760 })
1761
1762 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1763 const workerChoiceStrategy =
1764 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1765 let pool = new FixedThreadPool(
1766 max,
1767 './tests/worker-files/thread/testWorker.js',
1768 { workerChoiceStrategy }
1769 )
1770 expect(
1771 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1772 ).toStrictEqual({
1773 runTime: {
1774 aggregate: false,
1775 average: false,
1776 median: false
1777 },
1778 waitTime: {
1779 aggregate: false,
1780 average: false,
1781 median: false
1782 },
1783 elu: {
1784 aggregate: false,
1785 average: false,
1786 median: false
1787 }
1788 })
1789 await pool.destroy()
1790 pool = new DynamicThreadPool(
1791 min,
1792 max,
1793 './tests/worker-files/thread/testWorker.js',
1794 { workerChoiceStrategy }
1795 )
1796 expect(
1797 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1798 ).toStrictEqual({
1799 runTime: {
1800 aggregate: false,
1801 average: false,
1802 median: false
1803 },
1804 waitTime: {
1805 aggregate: false,
1806 average: false,
1807 median: false
1808 },
1809 elu: {
1810 aggregate: false,
1811 average: false,
1812 median: false
1813 }
1814 })
1815 // We need to clean up the resources after our test
1816 await pool.destroy()
1817 })
1818
1819 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1820 const pool = new FixedThreadPool(
1821 max,
1822 './tests/worker-files/thread/testWorker.js',
1823 {
1824 workerChoiceStrategy:
1825 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1826 }
1827 )
1828 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1829 const promises = new Set()
1830 const maxMultiplier = 2
1831 for (let i = 0; i < max * maxMultiplier; i++) {
1832 promises.add(pool.execute())
1833 }
1834 await Promise.all(promises)
1835 for (const workerNode of pool.workerNodes) {
1836 expect(workerNode.workerUsage).toStrictEqual({
1837 tasks: {
1838 executed: maxMultiplier,
1839 executing: 0,
1840 queued: 0,
1841 maxQueued: 0,
1842 failed: 0
1843 },
1844 runTime: {
1845 aggregate: 0,
1846 average: 0,
1847 median: 0,
1848 history: expect.any(CircularArray)
1849 },
1850 waitTime: {
1851 aggregate: 0,
1852 average: 0,
1853 median: 0,
1854 history: expect.any(CircularArray)
1855 },
1856 elu: {
1857 idle: {
1858 aggregate: 0,
1859 average: 0,
1860 median: 0,
1861 history: expect.any(CircularArray)
1862 },
1863 active: {
1864 aggregate: 0,
1865 average: 0,
1866 median: 0,
1867 history: expect.any(CircularArray)
1868 },
1869 utilization: 0
1870 }
1871 })
1872 }
1873 expect(
1874 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1875 pool.workerChoiceStrategyContext.workerChoiceStrategy
1876 ).defaultWorkerWeight
1877 ).toBeGreaterThan(0)
1878 expect(
1879 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1880 pool.workerChoiceStrategyContext.workerChoiceStrategy
1881 ).roundId
1882 ).toBe(0)
1883 expect(
1884 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1885 pool.workerChoiceStrategyContext.workerChoiceStrategy
1886 ).nextWorkerNodeId
1887 ).toBe(0)
1888 expect(
1889 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1890 pool.workerChoiceStrategyContext.workerChoiceStrategy
1891 ).roundWeights
1892 ).toStrictEqual([
1893 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1894 pool.workerChoiceStrategyContext.workerChoiceStrategy
1895 ).defaultWorkerWeight
1896 ])
1897 // We need to clean up the resources after our test
1898 await pool.destroy()
1899 })
1900
1901 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1902 const pool = new DynamicThreadPool(
1903 min,
1904 max,
1905 './tests/worker-files/thread/testWorker.js',
1906 {
1907 workerChoiceStrategy:
1908 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1909 }
1910 )
1911 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1912 const promises = new Set()
1913 const maxMultiplier = 2
1914 for (let i = 0; i < max * maxMultiplier; i++) {
1915 promises.add(pool.execute())
1916 }
1917 await Promise.all(promises)
1918 for (const workerNode of pool.workerNodes) {
1919 expect(workerNode.workerUsage).toStrictEqual({
1920 tasks: {
1921 executed: maxMultiplier,
1922 executing: 0,
1923 queued: 0,
1924 maxQueued: 0,
1925 failed: 0
1926 },
1927 runTime: {
1928 aggregate: 0,
1929 average: 0,
1930 median: 0,
1931 history: expect.any(CircularArray)
1932 },
1933 waitTime: {
1934 aggregate: 0,
1935 average: 0,
1936 median: 0,
1937 history: expect.any(CircularArray)
1938 },
1939 elu: {
1940 idle: {
1941 aggregate: 0,
1942 average: 0,
1943 median: 0,
1944 history: expect.any(CircularArray)
1945 },
1946 active: {
1947 aggregate: 0,
1948 average: 0,
1949 median: 0,
1950 history: expect.any(CircularArray)
1951 },
1952 utilization: 0
1953 }
1954 })
1955 }
1956 expect(
1957 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1958 pool.workerChoiceStrategyContext.workerChoiceStrategy
1959 ).defaultWorkerWeight
1960 ).toBeGreaterThan(0)
1961 expect(
1962 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1963 pool.workerChoiceStrategyContext.workerChoiceStrategy
1964 ).roundId
1965 ).toBe(0)
1966 expect(
1967 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1968 pool.workerChoiceStrategyContext.workerChoiceStrategy
1969 ).nextWorkerNodeId
1970 ).toBe(0)
1971 expect(
1972 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1973 pool.workerChoiceStrategyContext.workerChoiceStrategy
1974 ).roundWeights
1975 ).toStrictEqual([
1976 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1977 pool.workerChoiceStrategyContext.workerChoiceStrategy
1978 ).defaultWorkerWeight
1979 ])
1980 // We need to clean up the resources after our test
1981 await pool.destroy()
1982 })
1983
1984 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1985 const workerChoiceStrategy =
1986 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1987 let pool = new FixedThreadPool(
1988 max,
1989 './tests/worker-files/thread/testWorker.js'
1990 )
1991 expect(
1992 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1993 workerChoiceStrategy
1994 ).roundId
1995 ).toBeDefined()
1996 expect(
1997 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1998 workerChoiceStrategy
1999 ).nextWorkerNodeId
2000 ).toBeDefined()
2001 expect(
2002 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2003 workerChoiceStrategy
2004 ).defaultWorkerWeight
2005 ).toBeDefined()
2006 expect(
2007 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2008 workerChoiceStrategy
2009 ).roundWeights
2010 ).toBeDefined()
2011 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2012 expect(
2013 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2014 workerChoiceStrategy
2015 ).roundId
2016 ).toBe(0)
2017 expect(
2018 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2019 pool.workerChoiceStrategyContext.workerChoiceStrategy
2020 ).nextWorkerNodeId
2021 ).toBe(0)
2022 expect(
2023 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2024 pool.workerChoiceStrategyContext.workerChoiceStrategy
2025 ).defaultWorkerWeight
2026 ).toBeGreaterThan(0)
2027 expect(
2028 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2029 workerChoiceStrategy
2030 ).roundWeights
2031 ).toStrictEqual([
2032 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2033 pool.workerChoiceStrategyContext.workerChoiceStrategy
2034 ).defaultWorkerWeight
2035 ])
2036 await pool.destroy()
2037 pool = new DynamicThreadPool(
2038 min,
2039 max,
2040 './tests/worker-files/thread/testWorker.js'
2041 )
2042 expect(
2043 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2044 workerChoiceStrategy
2045 ).roundId
2046 ).toBeDefined()
2047 expect(
2048 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2049 workerChoiceStrategy
2050 ).nextWorkerNodeId
2051 ).toBeDefined()
2052 expect(
2053 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2054 workerChoiceStrategy
2055 ).defaultWorkerWeight
2056 ).toBeDefined()
2057 expect(
2058 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2059 workerChoiceStrategy
2060 ).roundWeights
2061 ).toBeDefined()
2062 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2063 expect(
2064 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2065 pool.workerChoiceStrategyContext.workerChoiceStrategy
2066 ).nextWorkerNodeId
2067 ).toBe(0)
2068 expect(
2069 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategyContext.workerChoiceStrategy
2071 ).defaultWorkerWeight
2072 ).toBeGreaterThan(0)
2073 expect(
2074 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2075 workerChoiceStrategy
2076 ).roundWeights
2077 ).toStrictEqual([
2078 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2079 pool.workerChoiceStrategyContext.workerChoiceStrategy
2080 ).defaultWorkerWeight
2081 ])
2082 // We need to clean up the resources after our test
2083 await pool.destroy()
2084 })
2085
2086 it('Verify unknown strategy throw error', () => {
2087 expect(
2088 () =>
2089 new DynamicThreadPool(
2090 min,
2091 max,
2092 './tests/worker-files/thread/testWorker.js',
2093 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2094 )
2095 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2096 })
2097 })