perf: optimize worker choice strategies
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
... / ...
CommitLineData
1const { expect } = require('expect')
2const {
3 WorkerChoiceStrategies,
4 DynamicThreadPool,
5 FixedThreadPool,
6 FixedClusterPool
7} = require('../../../lib')
8const { CircularArray } = require('../../../lib/circular-array')
9
10describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeId
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).nextWorkerNodeId
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 useDynamicWorker: true
127 })
128 await pool.destroy()
129 pool = new DynamicThreadPool(
130 min,
131 max,
132 './tests/worker-files/thread/testWorker.js',
133 { workerChoiceStrategy }
134 )
135 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
136 useDynamicWorker: true
137 })
138 // We need to clean up the resources after our test
139 await pool.destroy()
140 })
141
142 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
143 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
144 let pool = new FixedThreadPool(
145 max,
146 './tests/worker-files/thread/testWorker.js',
147 { workerChoiceStrategy }
148 )
149 expect(
150 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
151 ).toStrictEqual({
152 runTime: {
153 aggregate: false,
154 average: false,
155 median: false
156 },
157 waitTime: {
158 aggregate: false,
159 average: false,
160 median: false
161 },
162 elu: {
163 aggregate: false,
164 average: false,
165 median: false
166 }
167 })
168 await pool.destroy()
169 pool = new DynamicThreadPool(
170 min,
171 max,
172 './tests/worker-files/thread/testWorker.js',
173 { workerChoiceStrategy }
174 )
175 expect(
176 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
177 ).toStrictEqual({
178 runTime: {
179 aggregate: false,
180 average: false,
181 median: false
182 },
183 waitTime: {
184 aggregate: false,
185 average: false,
186 median: false
187 },
188 elu: {
189 aggregate: false,
190 average: false,
191 median: false
192 }
193 })
194 // We need to clean up the resources after our test
195 await pool.destroy()
196 })
197
198 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
199 const pool = new FixedThreadPool(
200 max,
201 './tests/worker-files/thread/testWorker.js',
202 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
203 )
204 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
205 const promises = new Set()
206 const maxMultiplier = 2
207 for (let i = 0; i < max * maxMultiplier; i++) {
208 promises.add(pool.execute())
209 }
210 await Promise.all(promises)
211 for (const workerNode of pool.workerNodes) {
212 expect(workerNode.workerUsage).toStrictEqual({
213 tasks: {
214 executed: maxMultiplier,
215 executing: 0,
216 queued: 0,
217 failed: 0
218 },
219 runTime: {
220 aggregate: 0,
221 average: 0,
222 median: 0,
223 history: expect.any(CircularArray)
224 },
225 waitTime: {
226 aggregate: 0,
227 average: 0,
228 median: 0,
229 history: expect.any(CircularArray)
230 },
231 elu: {
232 idle: {
233 aggregate: 0,
234 average: 0,
235 median: 0,
236 history: expect.any(CircularArray)
237 },
238 active: {
239 aggregate: 0,
240 average: 0,
241 median: 0,
242 history: expect.any(CircularArray)
243 },
244 utilization: 0
245 }
246 })
247 }
248 expect(
249 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
250 WorkerChoiceStrategies.ROUND_ROBIN
251 ).nextWorkerNodeId
252 ).toBe(0)
253 // We need to clean up the resources after our test
254 await pool.destroy()
255 })
256
257 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
258 const pool = new DynamicThreadPool(
259 min,
260 max,
261 './tests/worker-files/thread/testWorker.js',
262 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
263 )
264 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
265 const promises = new Set()
266 const maxMultiplier = 2
267 for (let i = 0; i < max * maxMultiplier; i++) {
268 promises.add(pool.execute())
269 }
270 await Promise.all(promises)
271 for (const workerNode of pool.workerNodes) {
272 expect(workerNode.workerUsage).toStrictEqual({
273 tasks: {
274 executed: maxMultiplier,
275 executing: 0,
276 queued: 0,
277 failed: 0
278 },
279 runTime: {
280 aggregate: 0,
281 average: 0,
282 median: 0,
283 history: expect.any(CircularArray)
284 },
285 waitTime: {
286 aggregate: 0,
287 average: 0,
288 median: 0,
289 history: expect.any(CircularArray)
290 },
291 elu: {
292 idle: {
293 aggregate: 0,
294 average: 0,
295 median: 0,
296 history: expect.any(CircularArray)
297 },
298 active: {
299 aggregate: 0,
300 average: 0,
301 median: 0,
302 history: expect.any(CircularArray)
303 },
304 utilization: 0
305 }
306 })
307 }
308 expect(
309 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
310 WorkerChoiceStrategies.ROUND_ROBIN
311 ).nextWorkerNodeId
312 ).toBe(0)
313 // We need to clean up the resources after our test
314 await pool.destroy()
315 })
316
317 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
318 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
319 let pool = new FixedClusterPool(
320 max,
321 './tests/worker-files/cluster/testWorker.js',
322 { workerChoiceStrategy }
323 )
324 let results = new Set()
325 for (let i = 0; i < max; i++) {
326 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
327 }
328 expect(results.size).toBe(max)
329 await pool.destroy()
330 pool = new FixedThreadPool(
331 max,
332 './tests/worker-files/thread/testWorker.js',
333 { workerChoiceStrategy }
334 )
335 results = new Set()
336 for (let i = 0; i < max; i++) {
337 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
338 }
339 expect(results.size).toBe(max)
340 await pool.destroy()
341 })
342
343 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
344 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
345 let pool = new FixedThreadPool(
346 max,
347 './tests/worker-files/thread/testWorker.js',
348 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
349 )
350 expect(
351 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
352 workerChoiceStrategy
353 ).nextWorkerNodeId
354 ).toBeDefined()
355 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
356 expect(
357 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
358 pool.workerChoiceStrategyContext.workerChoiceStrategy
359 ).nextWorkerNodeId
360 ).toBe(0)
361 await pool.destroy()
362 pool = new DynamicThreadPool(
363 min,
364 max,
365 './tests/worker-files/thread/testWorker.js',
366 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
367 )
368 expect(
369 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
370 workerChoiceStrategy
371 ).nextWorkerNodeId
372 ).toBeDefined()
373 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
374 expect(
375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
376 pool.workerChoiceStrategyContext.workerChoiceStrategy
377 ).nextWorkerNodeId
378 ).toBe(0)
379 // We need to clean up the resources after our test
380 await pool.destroy()
381 })
382
383 it('Verify LEAST_USED strategy default policy', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
385 let pool = new FixedThreadPool(
386 max,
387 './tests/worker-files/thread/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
391 useDynamicWorker: false
392 })
393 await pool.destroy()
394 pool = new DynamicThreadPool(
395 min,
396 max,
397 './tests/worker-files/thread/testWorker.js',
398 { workerChoiceStrategy }
399 )
400 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
401 useDynamicWorker: false
402 })
403 // We need to clean up the resources after our test
404 await pool.destroy()
405 })
406
407 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
408 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
409 let pool = new FixedThreadPool(
410 max,
411 './tests/worker-files/thread/testWorker.js',
412 { workerChoiceStrategy }
413 )
414 expect(
415 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 ).toStrictEqual({
417 runTime: {
418 aggregate: false,
419 average: false,
420 median: false
421 },
422 waitTime: {
423 aggregate: false,
424 average: false,
425 median: false
426 },
427 elu: {
428 aggregate: false,
429 average: false,
430 median: false
431 }
432 })
433 await pool.destroy()
434 pool = new DynamicThreadPool(
435 min,
436 max,
437 './tests/worker-files/thread/testWorker.js',
438 { workerChoiceStrategy }
439 )
440 expect(
441 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
442 ).toStrictEqual({
443 runTime: {
444 aggregate: false,
445 average: false,
446 median: false
447 },
448 waitTime: {
449 aggregate: false,
450 average: false,
451 median: false
452 },
453 elu: {
454 aggregate: false,
455 average: false,
456 median: false
457 }
458 })
459 // We need to clean up the resources after our test
460 await pool.destroy()
461 })
462
463 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
464 const pool = new FixedThreadPool(
465 max,
466 './tests/worker-files/thread/testWorker.js',
467 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
468 )
469 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
470 const promises = new Set()
471 const maxMultiplier = 2
472 for (let i = 0; i < max * maxMultiplier; i++) {
473 promises.add(pool.execute())
474 }
475 await Promise.all(promises)
476 for (const workerNode of pool.workerNodes) {
477 expect(workerNode.workerUsage).toStrictEqual({
478 tasks: {
479 executed: maxMultiplier,
480 executing: 0,
481 queued: 0,
482 failed: 0
483 },
484 runTime: {
485 aggregate: 0,
486 average: 0,
487 median: 0,
488 history: expect.any(CircularArray)
489 },
490 waitTime: {
491 aggregate: 0,
492 average: 0,
493 median: 0,
494 history: expect.any(CircularArray)
495 },
496 elu: {
497 idle: {
498 aggregate: 0,
499 average: 0,
500 median: 0,
501 history: expect.any(CircularArray)
502 },
503 active: {
504 aggregate: 0,
505 average: 0,
506 median: 0,
507 history: expect.any(CircularArray)
508 },
509 utilization: 0
510 }
511 })
512 }
513 // We need to clean up the resources after our test
514 await pool.destroy()
515 })
516
517 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
518 const pool = new DynamicThreadPool(
519 min,
520 max,
521 './tests/worker-files/thread/testWorker.js',
522 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
523 )
524 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
525 const promises = new Set()
526 const maxMultiplier = 2
527 for (let i = 0; i < max * maxMultiplier; i++) {
528 promises.add(pool.execute())
529 }
530 await Promise.all(promises)
531 for (const workerNode of pool.workerNodes) {
532 expect(workerNode.workerUsage).toStrictEqual({
533 tasks: {
534 executed: maxMultiplier,
535 executing: 0,
536 queued: 0,
537 failed: 0
538 },
539 runTime: {
540 aggregate: 0,
541 average: 0,
542 median: 0,
543 history: expect.any(CircularArray)
544 },
545 waitTime: {
546 aggregate: 0,
547 average: 0,
548 median: 0,
549 history: expect.any(CircularArray)
550 },
551 elu: {
552 idle: {
553 aggregate: 0,
554 average: 0,
555 median: 0,
556 history: expect.any(CircularArray)
557 },
558 active: {
559 aggregate: 0,
560 average: 0,
561 median: 0,
562 history: expect.any(CircularArray)
563 },
564 utilization: 0
565 }
566 })
567 }
568 // We need to clean up the resources after our test
569 await pool.destroy()
570 })
571
572 it('Verify LEAST_BUSY strategy default policy', async () => {
573 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
574 let pool = new FixedThreadPool(
575 max,
576 './tests/worker-files/thread/testWorker.js',
577 { workerChoiceStrategy }
578 )
579 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
580 useDynamicWorker: false
581 })
582 await pool.destroy()
583 pool = new DynamicThreadPool(
584 min,
585 max,
586 './tests/worker-files/thread/testWorker.js',
587 { workerChoiceStrategy }
588 )
589 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
590 useDynamicWorker: false
591 })
592 // We need to clean up the resources after our test
593 await pool.destroy()
594 })
595
596 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
597 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
598 let pool = new FixedThreadPool(
599 max,
600 './tests/worker-files/thread/testWorker.js',
601 { workerChoiceStrategy }
602 )
603 expect(
604 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
605 ).toStrictEqual({
606 runTime: {
607 aggregate: true,
608 average: false,
609 median: false
610 },
611 waitTime: {
612 aggregate: true,
613 average: false,
614 median: false
615 },
616 elu: {
617 aggregate: false,
618 average: false,
619 median: false
620 }
621 })
622 await pool.destroy()
623 pool = new DynamicThreadPool(
624 min,
625 max,
626 './tests/worker-files/thread/testWorker.js',
627 { workerChoiceStrategy }
628 )
629 expect(
630 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
631 ).toStrictEqual({
632 runTime: {
633 aggregate: true,
634 average: false,
635 median: false
636 },
637 waitTime: {
638 aggregate: true,
639 average: false,
640 median: false
641 },
642 elu: {
643 aggregate: false,
644 average: false,
645 median: false
646 }
647 })
648 // We need to clean up the resources after our test
649 await pool.destroy()
650 })
651
652 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
653 const pool = new FixedThreadPool(
654 max,
655 './tests/worker-files/thread/testWorker.js',
656 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
657 )
658 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
659 const promises = new Set()
660 const maxMultiplier = 2
661 for (let i = 0; i < max * maxMultiplier; i++) {
662 promises.add(pool.execute())
663 }
664 await Promise.all(promises)
665 for (const workerNode of pool.workerNodes) {
666 expect(workerNode.workerUsage).toStrictEqual({
667 tasks: {
668 executed: expect.any(Number),
669 executing: 0,
670 queued: 0,
671 failed: 0
672 },
673 runTime: {
674 aggregate: expect.any(Number),
675 average: 0,
676 median: 0,
677 history: expect.any(CircularArray)
678 },
679 waitTime: {
680 aggregate: expect.any(Number),
681 average: 0,
682 median: 0,
683 history: expect.any(CircularArray)
684 },
685 elu: {
686 idle: {
687 aggregate: 0,
688 average: 0,
689 median: 0,
690 history: expect.any(CircularArray)
691 },
692 active: {
693 aggregate: 0,
694 average: 0,
695 median: 0,
696 history: expect.any(CircularArray)
697 },
698 utilization: 0
699 }
700 })
701 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
702 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
703 max * maxMultiplier
704 )
705 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
706 expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual(
707 0
708 )
709 }
710 // We need to clean up the resources after our test
711 await pool.destroy()
712 })
713
714 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
715 const pool = new DynamicThreadPool(
716 min,
717 max,
718 './tests/worker-files/thread/testWorker.js',
719 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
720 )
721 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
722 const promises = new Set()
723 const maxMultiplier = 2
724 for (let i = 0; i < max * maxMultiplier; i++) {
725 promises.add(pool.execute())
726 }
727 await Promise.all(promises)
728 for (const workerNode of pool.workerNodes) {
729 expect(workerNode.workerUsage).toStrictEqual({
730 tasks: {
731 executed: expect.any(Number),
732 executing: 0,
733 queued: 0,
734 failed: 0
735 },
736 runTime: {
737 aggregate: expect.any(Number),
738 average: 0,
739 median: 0,
740 history: expect.any(CircularArray)
741 },
742 waitTime: {
743 aggregate: expect.any(Number),
744 average: 0,
745 median: 0,
746 history: expect.any(CircularArray)
747 },
748 elu: {
749 idle: {
750 aggregate: 0,
751 average: 0,
752 median: 0,
753 history: expect.any(CircularArray)
754 },
755 active: {
756 aggregate: 0,
757 average: 0,
758 median: 0,
759 history: expect.any(CircularArray)
760 },
761 utilization: 0
762 }
763 })
764 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
765 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
766 max * maxMultiplier
767 )
768 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
769 expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThan(0)
770 }
771 // We need to clean up the resources after our test
772 await pool.destroy()
773 })
774
775 it('Verify LEAST_ELU strategy default policy', async () => {
776 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
777 let pool = new FixedThreadPool(
778 max,
779 './tests/worker-files/thread/testWorker.js',
780 { workerChoiceStrategy }
781 )
782 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
783 useDynamicWorker: false
784 })
785 await pool.destroy()
786 pool = new DynamicThreadPool(
787 min,
788 max,
789 './tests/worker-files/thread/testWorker.js',
790 { workerChoiceStrategy }
791 )
792 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
793 useDynamicWorker: false
794 })
795 // We need to clean up the resources after our test
796 await pool.destroy()
797 })
798
799 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
800 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
801 let pool = new FixedThreadPool(
802 max,
803 './tests/worker-files/thread/testWorker.js',
804 { workerChoiceStrategy }
805 )
806 expect(
807 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
808 ).toStrictEqual({
809 runTime: {
810 aggregate: false,
811 average: false,
812 median: false
813 },
814 waitTime: {
815 aggregate: false,
816 average: false,
817 median: false
818 },
819 elu: {
820 aggregate: true,
821 average: false,
822 median: false
823 }
824 })
825 await pool.destroy()
826 pool = new DynamicThreadPool(
827 min,
828 max,
829 './tests/worker-files/thread/testWorker.js',
830 { workerChoiceStrategy }
831 )
832 expect(
833 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
834 ).toStrictEqual({
835 runTime: {
836 aggregate: false,
837 average: false,
838 median: false
839 },
840 waitTime: {
841 aggregate: false,
842 average: false,
843 median: false
844 },
845 elu: {
846 aggregate: true,
847 average: false,
848 median: false
849 }
850 })
851 // We need to clean up the resources after our test
852 await pool.destroy()
853 })
854
855 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
856 const pool = new FixedThreadPool(
857 max,
858 './tests/worker-files/thread/testWorker.js',
859 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
860 )
861 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
862 const promises = new Set()
863 const maxMultiplier = 2
864 for (let i = 0; i < max * maxMultiplier; i++) {
865 promises.add(pool.execute())
866 }
867 await Promise.all(promises)
868 for (const workerNode of pool.workerNodes) {
869 expect(workerNode.workerUsage).toStrictEqual({
870 tasks: {
871 executed: expect.any(Number),
872 executing: 0,
873 queued: 0,
874 failed: 0
875 },
876 runTime: {
877 aggregate: 0,
878 average: 0,
879 median: 0,
880 history: expect.any(CircularArray)
881 },
882 waitTime: {
883 aggregate: 0,
884 average: 0,
885 median: 0,
886 history: expect.any(CircularArray)
887 },
888 elu: {
889 idle: {
890 aggregate: 0,
891 average: 0,
892 median: 0,
893 history: expect.any(CircularArray)
894 },
895 active: {
896 aggregate: expect.any(Number),
897 average: 0,
898 median: 0,
899 history: expect.any(CircularArray)
900 },
901 utilization: expect.any(Number)
902 }
903 })
904 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
905 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
906 max * maxMultiplier
907 )
908 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
909 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
910 }
911 // We need to clean up the resources after our test
912 await pool.destroy()
913 })
914
915 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
916 const pool = new DynamicThreadPool(
917 min,
918 max,
919 './tests/worker-files/thread/testWorker.js',
920 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
921 )
922 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
923 const promises = new Set()
924 const maxMultiplier = 2
925 for (let i = 0; i < max * maxMultiplier; i++) {
926 promises.add(pool.execute())
927 }
928 await Promise.all(promises)
929 for (const workerNode of pool.workerNodes) {
930 expect(workerNode.workerUsage).toStrictEqual({
931 tasks: {
932 executed: expect.any(Number),
933 executing: 0,
934 queued: 0,
935 failed: 0
936 },
937 runTime: {
938 aggregate: 0,
939 average: 0,
940 median: 0,
941 history: expect.any(CircularArray)
942 },
943 waitTime: {
944 aggregate: 0,
945 average: 0,
946 median: 0,
947 history: expect.any(CircularArray)
948 },
949 elu: {
950 idle: {
951 aggregate: 0,
952 average: 0,
953 median: 0,
954 history: expect.any(CircularArray)
955 },
956 active: {
957 aggregate: expect.any(Number),
958 average: 0,
959 median: 0,
960 history: expect.any(CircularArray)
961 },
962 utilization: expect.any(Number)
963 }
964 })
965 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
966 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
967 max * maxMultiplier
968 )
969 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
970 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
971 }
972 // We need to clean up the resources after our test
973 await pool.destroy()
974 })
975
976 it('Verify FAIR_SHARE strategy default policy', async () => {
977 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
978 let pool = new FixedThreadPool(
979 max,
980 './tests/worker-files/thread/testWorker.js',
981 { workerChoiceStrategy }
982 )
983 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
984 useDynamicWorker: false
985 })
986 await pool.destroy()
987 pool = new DynamicThreadPool(
988 min,
989 max,
990 './tests/worker-files/thread/testWorker.js',
991 { workerChoiceStrategy }
992 )
993 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
994 useDynamicWorker: false
995 })
996 // We need to clean up the resources after our test
997 await pool.destroy()
998 })
999
1000 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1001 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1002 let pool = new FixedThreadPool(
1003 max,
1004 './tests/worker-files/thread/testWorker.js',
1005 { workerChoiceStrategy }
1006 )
1007 expect(
1008 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1009 ).toStrictEqual({
1010 runTime: {
1011 aggregate: true,
1012 average: true,
1013 median: false
1014 },
1015 waitTime: {
1016 aggregate: false,
1017 average: false,
1018 median: false
1019 },
1020 elu: {
1021 aggregate: true,
1022 average: true,
1023 median: false
1024 }
1025 })
1026 await pool.destroy()
1027 pool = new DynamicThreadPool(
1028 min,
1029 max,
1030 './tests/worker-files/thread/testWorker.js',
1031 { workerChoiceStrategy }
1032 )
1033 expect(
1034 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1035 ).toStrictEqual({
1036 runTime: {
1037 aggregate: true,
1038 average: true,
1039 median: false
1040 },
1041 waitTime: {
1042 aggregate: false,
1043 average: false,
1044 median: false
1045 },
1046 elu: {
1047 aggregate: true,
1048 average: true,
1049 median: false
1050 }
1051 })
1052 // We need to clean up the resources after our test
1053 await pool.destroy()
1054 })
1055
1056 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1057 const pool = new FixedThreadPool(
1058 max,
1059 './tests/worker-files/thread/testWorker.js',
1060 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1061 )
1062 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1063 const promises = new Set()
1064 const maxMultiplier = 2
1065 for (let i = 0; i < max * maxMultiplier; i++) {
1066 promises.add(pool.execute())
1067 }
1068 await Promise.all(promises)
1069 for (const workerNode of pool.workerNodes) {
1070 expect(workerNode.workerUsage).toStrictEqual({
1071 tasks: {
1072 executed: expect.any(Number),
1073 executing: 0,
1074 queued: 0,
1075 failed: 0
1076 },
1077 runTime: {
1078 aggregate: expect.any(Number),
1079 average: expect.any(Number),
1080 median: 0,
1081 history: expect.any(CircularArray)
1082 },
1083 waitTime: {
1084 aggregate: 0,
1085 average: 0,
1086 median: 0,
1087 history: expect.any(CircularArray)
1088 },
1089 elu: {
1090 idle: {
1091 aggregate: 0,
1092 average: 0,
1093 median: 0,
1094 history: expect.any(CircularArray)
1095 },
1096 active: {
1097 aggregate: expect.any(Number),
1098 average: expect.any(Number),
1099 median: 0,
1100 history: expect.any(CircularArray)
1101 },
1102 utilization: expect.any(Number)
1103 }
1104 })
1105 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1106 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1107 max * maxMultiplier
1108 )
1109 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1110 expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
1111 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1112 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1113 }
1114 expect(
1115 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1116 pool.workerChoiceStrategyContext.workerChoiceStrategy
1117 ).workersVirtualTaskEndTimestamp.length
1118 ).toBe(pool.workerNodes.length)
1119 // We need to clean up the resources after our test
1120 await pool.destroy()
1121 })
1122
1123 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1124 const pool = new DynamicThreadPool(
1125 min,
1126 max,
1127 './tests/worker-files/thread/testWorker.js',
1128 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1129 )
1130 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1131 const promises = new Set()
1132 const maxMultiplier = 2
1133 for (let i = 0; i < max * maxMultiplier; i++) {
1134 promises.add(pool.execute())
1135 }
1136 await Promise.all(promises)
1137 for (const workerNode of pool.workerNodes) {
1138 expect(workerNode.workerUsage).toStrictEqual({
1139 tasks: {
1140 executed: expect.any(Number),
1141 executing: 0,
1142 queued: 0,
1143 failed: 0
1144 },
1145 runTime: {
1146 aggregate: expect.any(Number),
1147 average: expect.any(Number),
1148 median: 0,
1149 history: expect.any(CircularArray)
1150 },
1151 waitTime: {
1152 aggregate: 0,
1153 average: 0,
1154 median: 0,
1155 history: expect.any(CircularArray)
1156 },
1157 elu: {
1158 idle: {
1159 aggregate: 0,
1160 average: 0,
1161 median: 0,
1162 history: expect.any(CircularArray)
1163 },
1164 active: {
1165 aggregate: expect.any(Number),
1166 average: expect.any(Number),
1167 median: 0,
1168 history: expect.any(CircularArray)
1169 },
1170 utilization: expect.any(Number)
1171 }
1172 })
1173 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1174 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1175 max * maxMultiplier
1176 )
1177 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1178 expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
1179 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1180 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1181 }
1182 expect(
1183 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1184 pool.workerChoiceStrategyContext.workerChoiceStrategy
1185 ).workersVirtualTaskEndTimestamp.length
1186 ).toBe(pool.workerNodes.length)
1187 // We need to clean up the resources after our test
1188 await pool.destroy()
1189 })
1190
1191 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1192 const pool = new DynamicThreadPool(
1193 min,
1194 max,
1195 './tests/worker-files/thread/testWorker.js',
1196 {
1197 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1198 workerChoiceStrategyOptions: {
1199 runTime: { median: true }
1200 }
1201 }
1202 )
1203 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1204 const promises = new Set()
1205 const maxMultiplier = 2
1206 for (let i = 0; i < max * maxMultiplier; i++) {
1207 promises.add(pool.execute())
1208 }
1209 await Promise.all(promises)
1210 for (const workerNode of pool.workerNodes) {
1211 expect(workerNode.workerUsage).toStrictEqual({
1212 tasks: {
1213 executed: expect.any(Number),
1214 executing: 0,
1215 queued: 0,
1216 failed: 0
1217 },
1218 runTime: {
1219 aggregate: expect.any(Number),
1220 average: 0,
1221 median: expect.any(Number),
1222 history: expect.any(CircularArray)
1223 },
1224 waitTime: {
1225 aggregate: 0,
1226 average: 0,
1227 median: 0,
1228 history: expect.any(CircularArray)
1229 },
1230 elu: {
1231 idle: {
1232 aggregate: 0,
1233 average: 0,
1234 median: 0,
1235 history: expect.any(CircularArray)
1236 },
1237 active: {
1238 aggregate: expect.any(Number),
1239 average: expect.any(Number),
1240 median: 0,
1241 history: expect.any(CircularArray)
1242 },
1243 utilization: expect.any(Number)
1244 }
1245 })
1246 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1247 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1248 max * maxMultiplier
1249 )
1250 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1251 expect(workerNode.workerUsage.runTime.median).toBeGreaterThanOrEqual(0)
1252 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1253 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1254 }
1255 expect(
1256 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1257 pool.workerChoiceStrategyContext.workerChoiceStrategy
1258 ).workersVirtualTaskEndTimestamp.length
1259 ).toBe(pool.workerNodes.length)
1260 // We need to clean up the resources after our test
1261 await pool.destroy()
1262 })
1263
1264 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1265 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1266 let pool = new FixedThreadPool(
1267 max,
1268 './tests/worker-files/thread/testWorker.js'
1269 )
1270 expect(
1271 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1272 workerChoiceStrategy
1273 ).workersVirtualTaskEndTimestamp
1274 ).toBeInstanceOf(Array)
1275 expect(
1276 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1277 workerChoiceStrategy
1278 ).workersVirtualTaskEndTimestamp.length
1279 ).toBe(0)
1280 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1281 workerChoiceStrategy
1282 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1283 expect(
1284 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1285 workerChoiceStrategy
1286 ).workersVirtualTaskEndTimestamp.length
1287 ).toBe(1)
1288 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1289 expect(
1290 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1291 workerChoiceStrategy
1292 ).workersVirtualTaskEndTimestamp
1293 ).toBeInstanceOf(Array)
1294 expect(
1295 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1296 workerChoiceStrategy
1297 ).workersVirtualTaskEndTimestamp.length
1298 ).toBe(0)
1299 await pool.destroy()
1300 pool = new DynamicThreadPool(
1301 min,
1302 max,
1303 './tests/worker-files/thread/testWorker.js'
1304 )
1305 expect(
1306 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1307 workerChoiceStrategy
1308 ).workersVirtualTaskEndTimestamp
1309 ).toBeInstanceOf(Array)
1310 expect(
1311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1312 workerChoiceStrategy
1313 ).workersVirtualTaskEndTimestamp.length
1314 ).toBe(0)
1315 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1316 workerChoiceStrategy
1317 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1318 expect(
1319 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1320 workerChoiceStrategy
1321 ).workersVirtualTaskEndTimestamp.length
1322 ).toBe(1)
1323 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1324 expect(
1325 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1326 workerChoiceStrategy
1327 ).workersVirtualTaskEndTimestamp
1328 ).toBeInstanceOf(Array)
1329 expect(
1330 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1331 workerChoiceStrategy
1332 ).workersVirtualTaskEndTimestamp.length
1333 ).toBe(0)
1334 // We need to clean up the resources after our test
1335 await pool.destroy()
1336 })
1337
1338 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1339 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1340 let pool = new FixedThreadPool(
1341 max,
1342 './tests/worker-files/thread/testWorker.js',
1343 { workerChoiceStrategy }
1344 )
1345 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1346 useDynamicWorker: true
1347 })
1348 await pool.destroy()
1349 pool = new DynamicThreadPool(
1350 min,
1351 max,
1352 './tests/worker-files/thread/testWorker.js',
1353 { workerChoiceStrategy }
1354 )
1355 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1356 useDynamicWorker: true
1357 })
1358 // We need to clean up the resources after our test
1359 await pool.destroy()
1360 })
1361
1362 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1363 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1364 let pool = new FixedThreadPool(
1365 max,
1366 './tests/worker-files/thread/testWorker.js',
1367 { workerChoiceStrategy }
1368 )
1369 expect(
1370 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1371 ).toStrictEqual({
1372 runTime: {
1373 aggregate: true,
1374 average: true,
1375 median: false
1376 },
1377 waitTime: {
1378 aggregate: false,
1379 average: false,
1380 median: false
1381 },
1382 elu: {
1383 aggregate: false,
1384 average: false,
1385 median: false
1386 }
1387 })
1388 await pool.destroy()
1389 pool = new DynamicThreadPool(
1390 min,
1391 max,
1392 './tests/worker-files/thread/testWorker.js',
1393 { workerChoiceStrategy }
1394 )
1395 expect(
1396 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1397 ).toStrictEqual({
1398 runTime: {
1399 aggregate: true,
1400 average: true,
1401 median: false
1402 },
1403 waitTime: {
1404 aggregate: false,
1405 average: false,
1406 median: false
1407 },
1408 elu: {
1409 aggregate: false,
1410 average: false,
1411 median: false
1412 }
1413 })
1414 // We need to clean up the resources after our test
1415 await pool.destroy()
1416 })
1417
1418 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1419 const pool = new FixedThreadPool(
1420 max,
1421 './tests/worker-files/thread/testWorker.js',
1422 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1423 )
1424 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1425 const promises = new Set()
1426 const maxMultiplier = 2
1427 for (let i = 0; i < max * maxMultiplier; i++) {
1428 promises.add(pool.execute())
1429 }
1430 await Promise.all(promises)
1431 for (const workerNode of pool.workerNodes) {
1432 expect(workerNode.workerUsage).toStrictEqual({
1433 tasks: {
1434 executed: expect.any(Number),
1435 executing: 0,
1436 queued: 0,
1437 failed: 0
1438 },
1439 runTime: {
1440 aggregate: expect.any(Number),
1441 average: expect.any(Number),
1442 median: 0,
1443 history: expect.any(CircularArray)
1444 },
1445 waitTime: {
1446 aggregate: 0,
1447 average: 0,
1448 median: 0,
1449 history: expect.any(CircularArray)
1450 },
1451 elu: {
1452 idle: {
1453 aggregate: 0,
1454 average: 0,
1455 median: 0,
1456 history: expect.any(CircularArray)
1457 },
1458 active: {
1459 aggregate: 0,
1460 average: 0,
1461 median: 0,
1462 history: expect.any(CircularArray)
1463 },
1464 utilization: 0
1465 }
1466 })
1467 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1468 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1469 max * maxMultiplier
1470 )
1471 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1472 expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
1473 }
1474 expect(
1475 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1476 pool.workerChoiceStrategyContext.workerChoiceStrategy
1477 ).defaultWorkerWeight
1478 ).toBeGreaterThan(0)
1479 expect(
1480 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1481 pool.workerChoiceStrategyContext.workerChoiceStrategy
1482 ).workerVirtualTaskRunTime
1483 ).toBeGreaterThanOrEqual(0)
1484 // We need to clean up the resources after our test
1485 await pool.destroy()
1486 })
1487
1488 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1489 const pool = new DynamicThreadPool(
1490 min,
1491 max,
1492 './tests/worker-files/thread/testWorker.js',
1493 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1494 )
1495 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1496 const promises = new Set()
1497 const maxMultiplier = 2
1498 for (let i = 0; i < max * maxMultiplier; i++) {
1499 promises.add(pool.execute())
1500 }
1501 await Promise.all(promises)
1502 for (const workerNode of pool.workerNodes) {
1503 expect(workerNode.workerUsage).toStrictEqual({
1504 tasks: {
1505 executed: expect.any(Number),
1506 executing: 0,
1507 queued: 0,
1508 failed: 0
1509 },
1510 runTime: {
1511 aggregate: expect.any(Number),
1512 average: expect.any(Number),
1513 median: 0,
1514 history: expect.any(CircularArray)
1515 },
1516 waitTime: {
1517 aggregate: 0,
1518 average: 0,
1519 median: 0,
1520 history: expect.any(CircularArray)
1521 },
1522 elu: {
1523 idle: {
1524 aggregate: 0,
1525 average: 0,
1526 median: 0,
1527 history: expect.any(CircularArray)
1528 },
1529 active: {
1530 aggregate: 0,
1531 average: 0,
1532 median: 0,
1533 history: expect.any(CircularArray)
1534 },
1535 utilization: 0
1536 }
1537 })
1538 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1539 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1540 max * maxMultiplier
1541 )
1542 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1543 expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
1544 }
1545 expect(
1546 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1547 pool.workerChoiceStrategyContext.workerChoiceStrategy
1548 ).defaultWorkerWeight
1549 ).toBeGreaterThan(0)
1550 expect(
1551 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1552 pool.workerChoiceStrategyContext.workerChoiceStrategy
1553 ).workerVirtualTaskRunTime
1554 ).toBeGreaterThanOrEqual(0)
1555 // We need to clean up the resources after our test
1556 await pool.destroy()
1557 })
1558
1559 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1560 const pool = new DynamicThreadPool(
1561 min,
1562 max,
1563 './tests/worker-files/thread/testWorker.js',
1564 {
1565 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1566 workerChoiceStrategyOptions: {
1567 runTime: { median: true }
1568 }
1569 }
1570 )
1571 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1572 const promises = new Set()
1573 const maxMultiplier = 2
1574 for (let i = 0; i < max * maxMultiplier; i++) {
1575 promises.add(pool.execute())
1576 }
1577 await Promise.all(promises)
1578 for (const workerNode of pool.workerNodes) {
1579 expect(workerNode.workerUsage).toStrictEqual({
1580 tasks: {
1581 executed: expect.any(Number),
1582 executing: 0,
1583 queued: 0,
1584 failed: 0
1585 },
1586 runTime: {
1587 aggregate: expect.any(Number),
1588 average: 0,
1589 median: expect.any(Number),
1590 history: expect.any(CircularArray)
1591 },
1592 waitTime: {
1593 aggregate: 0,
1594 average: 0,
1595 median: 0,
1596 history: expect.any(CircularArray)
1597 },
1598 elu: {
1599 idle: {
1600 aggregate: 0,
1601 average: 0,
1602 median: 0,
1603 history: expect.any(CircularArray)
1604 },
1605 active: {
1606 aggregate: 0,
1607 average: 0,
1608 median: 0,
1609 history: expect.any(CircularArray)
1610 },
1611 utilization: 0
1612 }
1613 })
1614 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1615 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1616 max * maxMultiplier
1617 )
1618 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1619 expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
1620 }
1621 expect(
1622 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1623 pool.workerChoiceStrategyContext.workerChoiceStrategy
1624 ).defaultWorkerWeight
1625 ).toBeGreaterThan(0)
1626 expect(
1627 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1628 pool.workerChoiceStrategyContext.workerChoiceStrategy
1629 ).workerVirtualTaskRunTime
1630 ).toBeGreaterThanOrEqual(0)
1631 // We need to clean up the resources after our test
1632 await pool.destroy()
1633 })
1634
1635 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1636 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1637 let pool = new FixedThreadPool(
1638 max,
1639 './tests/worker-files/thread/testWorker.js'
1640 )
1641 expect(
1642 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1643 workerChoiceStrategy
1644 ).nextWorkerNodeId
1645 ).toBeDefined()
1646 expect(
1647 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1648 workerChoiceStrategy
1649 ).defaultWorkerWeight
1650 ).toBeDefined()
1651 expect(
1652 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1653 workerChoiceStrategy
1654 ).workerVirtualTaskRunTime
1655 ).toBeDefined()
1656 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1657 expect(
1658 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1659 pool.workerChoiceStrategyContext.workerChoiceStrategy
1660 ).nextWorkerNodeId
1661 ).toBe(0)
1662 expect(
1663 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1664 pool.workerChoiceStrategyContext.workerChoiceStrategy
1665 ).defaultWorkerWeight
1666 ).toBeGreaterThan(0)
1667 expect(
1668 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1669 workerChoiceStrategy
1670 ).workerVirtualTaskRunTime
1671 ).toBe(0)
1672 await pool.destroy()
1673 pool = new DynamicThreadPool(
1674 min,
1675 max,
1676 './tests/worker-files/thread/testWorker.js'
1677 )
1678 expect(
1679 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1680 workerChoiceStrategy
1681 ).nextWorkerNodeId
1682 ).toBeDefined()
1683 expect(
1684 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1685 workerChoiceStrategy
1686 ).defaultWorkerWeight
1687 ).toBeDefined()
1688 expect(
1689 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1690 workerChoiceStrategy
1691 ).workerVirtualTaskRunTime
1692 ).toBeDefined()
1693 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1694 expect(
1695 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1696 pool.workerChoiceStrategyContext.workerChoiceStrategy
1697 ).nextWorkerNodeId
1698 ).toBe(0)
1699 expect(
1700 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1701 pool.workerChoiceStrategyContext.workerChoiceStrategy
1702 ).defaultWorkerWeight
1703 ).toBeGreaterThan(0)
1704 expect(
1705 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1706 workerChoiceStrategy
1707 ).workerVirtualTaskRunTime
1708 ).toBe(0)
1709 // We need to clean up the resources after our test
1710 await pool.destroy()
1711 })
1712
1713 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1714 const workerChoiceStrategy =
1715 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1716 let pool = new FixedThreadPool(
1717 max,
1718 './tests/worker-files/thread/testWorker.js',
1719 { workerChoiceStrategy }
1720 )
1721 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1722 useDynamicWorker: true
1723 })
1724 await pool.destroy()
1725 pool = new DynamicThreadPool(
1726 min,
1727 max,
1728 './tests/worker-files/thread/testWorker.js',
1729 { workerChoiceStrategy }
1730 )
1731 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1732 useDynamicWorker: true
1733 })
1734 // We need to clean up the resources after our test
1735 await pool.destroy()
1736 })
1737
1738 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1739 const workerChoiceStrategy =
1740 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1741 let pool = new FixedThreadPool(
1742 max,
1743 './tests/worker-files/thread/testWorker.js',
1744 { workerChoiceStrategy }
1745 )
1746 expect(
1747 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1748 ).toStrictEqual({
1749 runTime: {
1750 aggregate: false,
1751 average: false,
1752 median: false
1753 },
1754 waitTime: {
1755 aggregate: false,
1756 average: false,
1757 median: false
1758 },
1759 elu: {
1760 aggregate: false,
1761 average: false,
1762 median: false
1763 }
1764 })
1765 await pool.destroy()
1766 pool = new DynamicThreadPool(
1767 min,
1768 max,
1769 './tests/worker-files/thread/testWorker.js',
1770 { workerChoiceStrategy }
1771 )
1772 expect(
1773 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1774 ).toStrictEqual({
1775 runTime: {
1776 aggregate: false,
1777 average: false,
1778 median: false
1779 },
1780 waitTime: {
1781 aggregate: false,
1782 average: false,
1783 median: false
1784 },
1785 elu: {
1786 aggregate: false,
1787 average: false,
1788 median: false
1789 }
1790 })
1791 // We need to clean up the resources after our test
1792 await pool.destroy()
1793 })
1794
1795 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1796 const pool = new FixedThreadPool(
1797 max,
1798 './tests/worker-files/thread/testWorker.js',
1799 {
1800 workerChoiceStrategy:
1801 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1802 }
1803 )
1804 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1805 const promises = new Set()
1806 const maxMultiplier = 2
1807 for (let i = 0; i < max * maxMultiplier; i++) {
1808 promises.add(pool.execute())
1809 }
1810 await Promise.all(promises)
1811 for (const workerNode of pool.workerNodes) {
1812 expect(workerNode.workerUsage).toStrictEqual({
1813 tasks: {
1814 executed: maxMultiplier,
1815 executing: 0,
1816 queued: 0,
1817 failed: 0
1818 },
1819 runTime: {
1820 aggregate: 0,
1821 average: 0,
1822 median: 0,
1823 history: expect.any(CircularArray)
1824 },
1825 waitTime: {
1826 aggregate: 0,
1827 average: 0,
1828 median: 0,
1829 history: expect.any(CircularArray)
1830 },
1831 elu: {
1832 idle: {
1833 aggregate: 0,
1834 average: 0,
1835 median: 0,
1836 history: expect.any(CircularArray)
1837 },
1838 active: {
1839 aggregate: 0,
1840 average: 0,
1841 median: 0,
1842 history: expect.any(CircularArray)
1843 },
1844 utilization: 0
1845 }
1846 })
1847 }
1848 expect(
1849 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1850 pool.workerChoiceStrategyContext.workerChoiceStrategy
1851 ).defaultWorkerWeight
1852 ).toBeGreaterThan(0)
1853 expect(
1854 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1855 pool.workerChoiceStrategyContext.workerChoiceStrategy
1856 ).roundId
1857 ).toBe(0)
1858 expect(
1859 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1860 pool.workerChoiceStrategyContext.workerChoiceStrategy
1861 ).nextWorkerNodeId
1862 ).toBe(0)
1863 expect(
1864 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1865 pool.workerChoiceStrategyContext.workerChoiceStrategy
1866 ).roundWeights
1867 ).toStrictEqual([
1868 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1869 pool.workerChoiceStrategyContext.workerChoiceStrategy
1870 ).defaultWorkerWeight
1871 ])
1872 // We need to clean up the resources after our test
1873 await pool.destroy()
1874 })
1875
1876 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1877 const pool = new DynamicThreadPool(
1878 min,
1879 max,
1880 './tests/worker-files/thread/testWorker.js',
1881 {
1882 workerChoiceStrategy:
1883 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1884 }
1885 )
1886 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1887 const promises = new Set()
1888 const maxMultiplier = 2
1889 for (let i = 0; i < max * maxMultiplier; i++) {
1890 promises.add(pool.execute())
1891 }
1892 await Promise.all(promises)
1893 for (const workerNode of pool.workerNodes) {
1894 expect(workerNode.workerUsage).toStrictEqual({
1895 tasks: {
1896 executed: maxMultiplier,
1897 executing: 0,
1898 queued: 0,
1899 failed: 0
1900 },
1901 runTime: {
1902 aggregate: 0,
1903 average: 0,
1904 median: 0,
1905 history: expect.any(CircularArray)
1906 },
1907 waitTime: {
1908 aggregate: 0,
1909 average: 0,
1910 median: 0,
1911 history: expect.any(CircularArray)
1912 },
1913 elu: {
1914 idle: {
1915 aggregate: 0,
1916 average: 0,
1917 median: 0,
1918 history: expect.any(CircularArray)
1919 },
1920 active: {
1921 aggregate: 0,
1922 average: 0,
1923 median: 0,
1924 history: expect.any(CircularArray)
1925 },
1926 utilization: 0
1927 }
1928 })
1929 }
1930 expect(
1931 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1932 pool.workerChoiceStrategyContext.workerChoiceStrategy
1933 ).defaultWorkerWeight
1934 ).toBeGreaterThan(0)
1935 expect(
1936 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1937 pool.workerChoiceStrategyContext.workerChoiceStrategy
1938 ).roundId
1939 ).toBe(0)
1940 expect(
1941 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1942 pool.workerChoiceStrategyContext.workerChoiceStrategy
1943 ).nextWorkerNodeId
1944 ).toBe(0)
1945 expect(
1946 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1947 pool.workerChoiceStrategyContext.workerChoiceStrategy
1948 ).roundWeights
1949 ).toStrictEqual([
1950 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1951 pool.workerChoiceStrategyContext.workerChoiceStrategy
1952 ).defaultWorkerWeight
1953 ])
1954 // We need to clean up the resources after our test
1955 await pool.destroy()
1956 })
1957
1958 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1959 const workerChoiceStrategy =
1960 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1961 let pool = new FixedThreadPool(
1962 max,
1963 './tests/worker-files/thread/testWorker.js'
1964 )
1965 expect(
1966 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1967 workerChoiceStrategy
1968 ).roundId
1969 ).toBeDefined()
1970 expect(
1971 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1972 workerChoiceStrategy
1973 ).nextWorkerNodeId
1974 ).toBeDefined()
1975 expect(
1976 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1977 workerChoiceStrategy
1978 ).defaultWorkerWeight
1979 ).toBeDefined()
1980 expect(
1981 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1982 workerChoiceStrategy
1983 ).roundWeights
1984 ).toBeDefined()
1985 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1986 expect(
1987 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1988 workerChoiceStrategy
1989 ).roundId
1990 ).toBe(0)
1991 expect(
1992 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1993 pool.workerChoiceStrategyContext.workerChoiceStrategy
1994 ).nextWorkerNodeId
1995 ).toBe(0)
1996 expect(
1997 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1998 pool.workerChoiceStrategyContext.workerChoiceStrategy
1999 ).defaultWorkerWeight
2000 ).toBeGreaterThan(0)
2001 expect(
2002 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2003 workerChoiceStrategy
2004 ).roundWeights
2005 ).toStrictEqual([
2006 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2007 pool.workerChoiceStrategyContext.workerChoiceStrategy
2008 ).defaultWorkerWeight
2009 ])
2010 await pool.destroy()
2011 pool = new DynamicThreadPool(
2012 min,
2013 max,
2014 './tests/worker-files/thread/testWorker.js'
2015 )
2016 expect(
2017 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2018 workerChoiceStrategy
2019 ).roundId
2020 ).toBeDefined()
2021 expect(
2022 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2023 workerChoiceStrategy
2024 ).nextWorkerNodeId
2025 ).toBeDefined()
2026 expect(
2027 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2028 workerChoiceStrategy
2029 ).defaultWorkerWeight
2030 ).toBeDefined()
2031 expect(
2032 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2033 workerChoiceStrategy
2034 ).roundWeights
2035 ).toBeDefined()
2036 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2037 expect(
2038 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2039 pool.workerChoiceStrategyContext.workerChoiceStrategy
2040 ).nextWorkerNodeId
2041 ).toBe(0)
2042 expect(
2043 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2044 pool.workerChoiceStrategyContext.workerChoiceStrategy
2045 ).defaultWorkerWeight
2046 ).toBeGreaterThan(0)
2047 expect(
2048 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2049 workerChoiceStrategy
2050 ).roundWeights
2051 ).toStrictEqual([
2052 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2053 pool.workerChoiceStrategyContext.workerChoiceStrategy
2054 ).defaultWorkerWeight
2055 ])
2056 // We need to clean up the resources after our test
2057 await pool.destroy()
2058 })
2059
2060 it('Verify unknown strategy throw error', () => {
2061 expect(
2062 () =>
2063 new DynamicThreadPool(
2064 min,
2065 max,
2066 './tests/worker-files/thread/testWorker.js',
2067 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2068 )
2069 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2070 })
2071})