fix: fix internal measurements handling
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 WorkerChoiceStrategies,
4 DynamicThreadPool,
5 FixedThreadPool,
6 FixedClusterPool
7 } = require('../../../lib')
8 const { CircularArray } = require('../../../lib/circular-array')
9
10 describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeId
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).nextWorkerNodeId
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 useDynamicWorker: true
127 })
128 await pool.destroy()
129 pool = new DynamicThreadPool(
130 min,
131 max,
132 './tests/worker-files/thread/testWorker.js',
133 { workerChoiceStrategy }
134 )
135 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
136 useDynamicWorker: true
137 })
138 // We need to clean up the resources after our test
139 await pool.destroy()
140 })
141
142 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
143 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
144 let pool = new FixedThreadPool(
145 max,
146 './tests/worker-files/thread/testWorker.js',
147 { workerChoiceStrategy }
148 )
149 expect(
150 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
151 ).toStrictEqual({
152 runTime: {
153 aggregate: false,
154 average: false,
155 median: false
156 },
157 waitTime: {
158 aggregate: false,
159 average: false,
160 median: false
161 },
162 elu: {
163 aggregate: false,
164 average: false,
165 median: false
166 }
167 })
168 await pool.destroy()
169 pool = new DynamicThreadPool(
170 min,
171 max,
172 './tests/worker-files/thread/testWorker.js',
173 { workerChoiceStrategy }
174 )
175 expect(
176 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
177 ).toStrictEqual({
178 runTime: {
179 aggregate: false,
180 average: false,
181 median: false
182 },
183 waitTime: {
184 aggregate: false,
185 average: false,
186 median: false
187 },
188 elu: {
189 aggregate: false,
190 average: false,
191 median: false
192 }
193 })
194 // We need to clean up the resources after our test
195 await pool.destroy()
196 })
197
198 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
199 const pool = new FixedThreadPool(
200 max,
201 './tests/worker-files/thread/testWorker.js',
202 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
203 )
204 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
205 const promises = new Set()
206 const maxMultiplier = 2
207 for (let i = 0; i < max * maxMultiplier; i++) {
208 promises.add(pool.execute())
209 }
210 await Promise.all(promises)
211 for (const workerNode of pool.workerNodes) {
212 expect(workerNode.usage).toStrictEqual({
213 tasks: {
214 executed: maxMultiplier,
215 executing: 0,
216 queued: 0,
217 maxQueued: 0,
218 failed: 0
219 },
220 runTime: {
221 history: expect.any(CircularArray)
222 },
223 waitTime: {
224 history: expect.any(CircularArray)
225 },
226 elu: {
227 idle: {
228 history: expect.any(CircularArray)
229 },
230 active: {
231 history: expect.any(CircularArray)
232 }
233 }
234 })
235 }
236 expect(
237 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
238 WorkerChoiceStrategies.ROUND_ROBIN
239 ).nextWorkerNodeId
240 ).toBe(0)
241 // We need to clean up the resources after our test
242 await pool.destroy()
243 })
244
245 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
246 const pool = new DynamicThreadPool(
247 min,
248 max,
249 './tests/worker-files/thread/testWorker.js',
250 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
251 )
252 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
253 const promises = new Set()
254 const maxMultiplier = 2
255 for (let i = 0; i < max * maxMultiplier; i++) {
256 promises.add(pool.execute())
257 }
258 await Promise.all(promises)
259 for (const workerNode of pool.workerNodes) {
260 expect(workerNode.usage).toStrictEqual({
261 tasks: {
262 executed: maxMultiplier,
263 executing: 0,
264 queued: 0,
265 maxQueued: 0,
266 failed: 0
267 },
268 runTime: {
269 history: expect.any(CircularArray)
270 },
271 waitTime: {
272 history: expect.any(CircularArray)
273 },
274 elu: {
275 idle: {
276 history: expect.any(CircularArray)
277 },
278 active: {
279 history: expect.any(CircularArray)
280 }
281 }
282 })
283 }
284 expect(
285 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
286 WorkerChoiceStrategies.ROUND_ROBIN
287 ).nextWorkerNodeId
288 ).toBe(0)
289 // We need to clean up the resources after our test
290 await pool.destroy()
291 })
292
293 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
294 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
295 let pool = new FixedClusterPool(
296 max,
297 './tests/worker-files/cluster/testWorker.js',
298 { workerChoiceStrategy }
299 )
300 let results = new Set()
301 for (let i = 0; i < max; i++) {
302 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
303 }
304 expect(results.size).toBe(max)
305 await pool.destroy()
306 pool = new FixedThreadPool(
307 max,
308 './tests/worker-files/thread/testWorker.js',
309 { workerChoiceStrategy }
310 )
311 results = new Set()
312 for (let i = 0; i < max; i++) {
313 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
314 }
315 expect(results.size).toBe(max)
316 await pool.destroy()
317 })
318
319 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
320 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
321 let pool = new FixedThreadPool(
322 max,
323 './tests/worker-files/thread/testWorker.js',
324 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
325 )
326 expect(
327 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
328 workerChoiceStrategy
329 ).nextWorkerNodeId
330 ).toBeDefined()
331 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
332 expect(
333 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
334 pool.workerChoiceStrategyContext.workerChoiceStrategy
335 ).nextWorkerNodeId
336 ).toBe(0)
337 await pool.destroy()
338 pool = new DynamicThreadPool(
339 min,
340 max,
341 './tests/worker-files/thread/testWorker.js',
342 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
343 )
344 expect(
345 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
346 workerChoiceStrategy
347 ).nextWorkerNodeId
348 ).toBeDefined()
349 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
350 expect(
351 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
352 pool.workerChoiceStrategyContext.workerChoiceStrategy
353 ).nextWorkerNodeId
354 ).toBe(0)
355 // We need to clean up the resources after our test
356 await pool.destroy()
357 })
358
359 it('Verify LEAST_USED strategy default policy', async () => {
360 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
361 let pool = new FixedThreadPool(
362 max,
363 './tests/worker-files/thread/testWorker.js',
364 { workerChoiceStrategy }
365 )
366 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
367 useDynamicWorker: false
368 })
369 await pool.destroy()
370 pool = new DynamicThreadPool(
371 min,
372 max,
373 './tests/worker-files/thread/testWorker.js',
374 { workerChoiceStrategy }
375 )
376 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
377 useDynamicWorker: false
378 })
379 // We need to clean up the resources after our test
380 await pool.destroy()
381 })
382
383 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
385 let pool = new FixedThreadPool(
386 max,
387 './tests/worker-files/thread/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 expect(
391 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 ).toStrictEqual({
393 runTime: {
394 aggregate: false,
395 average: false,
396 median: false
397 },
398 waitTime: {
399 aggregate: false,
400 average: false,
401 median: false
402 },
403 elu: {
404 aggregate: false,
405 average: false,
406 median: false
407 }
408 })
409 await pool.destroy()
410 pool = new DynamicThreadPool(
411 min,
412 max,
413 './tests/worker-files/thread/testWorker.js',
414 { workerChoiceStrategy }
415 )
416 expect(
417 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 ).toStrictEqual({
419 runTime: {
420 aggregate: false,
421 average: false,
422 median: false
423 },
424 waitTime: {
425 aggregate: false,
426 average: false,
427 median: false
428 },
429 elu: {
430 aggregate: false,
431 average: false,
432 median: false
433 }
434 })
435 // We need to clean up the resources after our test
436 await pool.destroy()
437 })
438
439 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
440 const pool = new FixedThreadPool(
441 max,
442 './tests/worker-files/thread/testWorker.js',
443 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
444 )
445 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
446 const promises = new Set()
447 const maxMultiplier = 2
448 for (let i = 0; i < max * maxMultiplier; i++) {
449 promises.add(pool.execute())
450 }
451 await Promise.all(promises)
452 for (const workerNode of pool.workerNodes) {
453 expect(workerNode.usage).toStrictEqual({
454 tasks: {
455 executed: expect.any(Number),
456 executing: 0,
457 queued: 0,
458 maxQueued: 0,
459 failed: 0
460 },
461 runTime: {
462 history: expect.any(CircularArray)
463 },
464 waitTime: {
465 history: expect.any(CircularArray)
466 },
467 elu: {
468 idle: {
469 history: expect.any(CircularArray)
470 },
471 active: {
472 history: expect.any(CircularArray)
473 }
474 }
475 })
476 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
477 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
478 max * maxMultiplier
479 )
480 }
481 // We need to clean up the resources after our test
482 await pool.destroy()
483 })
484
485 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
486 const pool = new DynamicThreadPool(
487 min,
488 max,
489 './tests/worker-files/thread/testWorker.js',
490 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
491 )
492 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
493 const promises = new Set()
494 const maxMultiplier = 2
495 for (let i = 0; i < max * maxMultiplier; i++) {
496 promises.add(pool.execute())
497 }
498 await Promise.all(promises)
499 for (const workerNode of pool.workerNodes) {
500 expect(workerNode.usage).toStrictEqual({
501 tasks: {
502 executed: expect.any(Number),
503 executing: 0,
504 queued: 0,
505 maxQueued: 0,
506 failed: 0
507 },
508 runTime: {
509 history: expect.any(CircularArray)
510 },
511 waitTime: {
512 history: expect.any(CircularArray)
513 },
514 elu: {
515 idle: {
516 history: expect.any(CircularArray)
517 },
518 active: {
519 history: expect.any(CircularArray)
520 }
521 }
522 })
523 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
524 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
525 max * maxMultiplier
526 )
527 }
528 // We need to clean up the resources after our test
529 await pool.destroy()
530 })
531
532 it('Verify LEAST_BUSY strategy default policy', async () => {
533 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
534 let pool = new FixedThreadPool(
535 max,
536 './tests/worker-files/thread/testWorker.js',
537 { workerChoiceStrategy }
538 )
539 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
540 useDynamicWorker: false
541 })
542 await pool.destroy()
543 pool = new DynamicThreadPool(
544 min,
545 max,
546 './tests/worker-files/thread/testWorker.js',
547 { workerChoiceStrategy }
548 )
549 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
550 useDynamicWorker: false
551 })
552 // We need to clean up the resources after our test
553 await pool.destroy()
554 })
555
556 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
557 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
558 let pool = new FixedThreadPool(
559 max,
560 './tests/worker-files/thread/testWorker.js',
561 { workerChoiceStrategy }
562 )
563 expect(
564 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
565 ).toStrictEqual({
566 runTime: {
567 aggregate: true,
568 average: false,
569 median: false
570 },
571 waitTime: {
572 aggregate: true,
573 average: false,
574 median: false
575 },
576 elu: {
577 aggregate: false,
578 average: false,
579 median: false
580 }
581 })
582 await pool.destroy()
583 pool = new DynamicThreadPool(
584 min,
585 max,
586 './tests/worker-files/thread/testWorker.js',
587 { workerChoiceStrategy }
588 )
589 expect(
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
591 ).toStrictEqual({
592 runTime: {
593 aggregate: true,
594 average: false,
595 median: false
596 },
597 waitTime: {
598 aggregate: true,
599 average: false,
600 median: false
601 },
602 elu: {
603 aggregate: false,
604 average: false,
605 median: false
606 }
607 })
608 // We need to clean up the resources after our test
609 await pool.destroy()
610 })
611
612 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
613 const pool = new FixedThreadPool(
614 max,
615 './tests/worker-files/thread/testWorker.js',
616 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
617 )
618 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
619 const promises = new Set()
620 const maxMultiplier = 2
621 for (let i = 0; i < max * maxMultiplier; i++) {
622 promises.add(pool.execute())
623 }
624 await Promise.all(promises)
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.usage).toStrictEqual({
627 tasks: {
628 executed: expect.any(Number),
629 executing: 0,
630 queued: 0,
631 maxQueued: 0,
632 failed: 0
633 },
634 runTime: {
635 aggregate: expect.any(Number),
636 maximum: expect.any(Number),
637 minimum: expect.any(Number),
638 history: expect.any(CircularArray)
639 },
640 waitTime: {
641 aggregate: expect.any(Number),
642 maximum: expect.any(Number),
643 minimum: expect.any(Number),
644 history: expect.any(CircularArray)
645 },
646 elu: {
647 idle: {
648 history: expect.any(CircularArray)
649 },
650 active: {
651 history: expect.any(CircularArray)
652 }
653 }
654 })
655 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
656 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
657 max * maxMultiplier
658 )
659 expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
660 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0)
661 }
662 // We need to clean up the resources after our test
663 await pool.destroy()
664 })
665
666 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
667 const pool = new DynamicThreadPool(
668 min,
669 max,
670 './tests/worker-files/thread/testWorker.js',
671 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
672 )
673 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
674 const promises = new Set()
675 const maxMultiplier = 2
676 for (let i = 0; i < max * maxMultiplier; i++) {
677 promises.add(pool.execute())
678 }
679 await Promise.all(promises)
680 for (const workerNode of pool.workerNodes) {
681 expect(workerNode.usage).toStrictEqual({
682 tasks: {
683 executed: expect.any(Number),
684 executing: 0,
685 queued: 0,
686 maxQueued: 0,
687 failed: 0
688 },
689 runTime: {
690 aggregate: expect.any(Number),
691 maximum: expect.any(Number),
692 minimum: expect.any(Number),
693 history: expect.any(CircularArray)
694 },
695 waitTime: {
696 aggregate: expect.any(Number),
697 maximum: expect.any(Number),
698 minimum: expect.any(Number),
699 history: expect.any(CircularArray)
700 },
701 elu: {
702 idle: {
703 history: expect.any(CircularArray)
704 },
705 active: {
706 history: expect.any(CircularArray)
707 }
708 }
709 })
710 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
711 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
712 max * maxMultiplier
713 )
714 expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
715 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0)
716 }
717 // We need to clean up the resources after our test
718 await pool.destroy()
719 })
720
721 it('Verify LEAST_ELU strategy default policy', async () => {
722 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
723 let pool = new FixedThreadPool(
724 max,
725 './tests/worker-files/thread/testWorker.js',
726 { workerChoiceStrategy }
727 )
728 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
729 useDynamicWorker: false
730 })
731 await pool.destroy()
732 pool = new DynamicThreadPool(
733 min,
734 max,
735 './tests/worker-files/thread/testWorker.js',
736 { workerChoiceStrategy }
737 )
738 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
739 useDynamicWorker: false
740 })
741 // We need to clean up the resources after our test
742 await pool.destroy()
743 })
744
745 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
746 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
747 let pool = new FixedThreadPool(
748 max,
749 './tests/worker-files/thread/testWorker.js',
750 { workerChoiceStrategy }
751 )
752 expect(
753 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
754 ).toStrictEqual({
755 runTime: {
756 aggregate: false,
757 average: false,
758 median: false
759 },
760 waitTime: {
761 aggregate: false,
762 average: false,
763 median: false
764 },
765 elu: {
766 aggregate: true,
767 average: false,
768 median: false
769 }
770 })
771 await pool.destroy()
772 pool = new DynamicThreadPool(
773 min,
774 max,
775 './tests/worker-files/thread/testWorker.js',
776 { workerChoiceStrategy }
777 )
778 expect(
779 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
780 ).toStrictEqual({
781 runTime: {
782 aggregate: false,
783 average: false,
784 median: false
785 },
786 waitTime: {
787 aggregate: false,
788 average: false,
789 median: false
790 },
791 elu: {
792 aggregate: true,
793 average: false,
794 median: false
795 }
796 })
797 // We need to clean up the resources after our test
798 await pool.destroy()
799 })
800
801 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
802 const pool = new FixedThreadPool(
803 max,
804 './tests/worker-files/thread/testWorker.js',
805 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
806 )
807 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
808 const promises = new Set()
809 const maxMultiplier = 2
810 for (let i = 0; i < max * maxMultiplier; i++) {
811 promises.add(pool.execute())
812 }
813 await Promise.all(promises)
814 for (const workerNode of pool.workerNodes) {
815 expect(workerNode.usage).toMatchObject({
816 tasks: {
817 executed: expect.any(Number),
818 executing: 0,
819 queued: 0,
820 maxQueued: 0,
821 failed: 0
822 },
823 runTime: {
824 history: expect.any(CircularArray)
825 },
826 waitTime: {
827 history: expect.any(CircularArray)
828 },
829 elu: {
830 idle: expect.objectContaining({
831 history: expect.any(CircularArray)
832 }),
833 active: expect.objectContaining({
834 history: expect.any(CircularArray)
835 })
836 }
837 })
838 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
839 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
840 max * maxMultiplier
841 )
842 if (workerNode.usage.elu.utilization == null) {
843 expect(workerNode.usage.elu.utilization).toBeUndefined()
844 } else {
845 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
846 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
847 }
848 }
849 // We need to clean up the resources after our test
850 await pool.destroy()
851 })
852
853 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
854 const pool = new DynamicThreadPool(
855 min,
856 max,
857 './tests/worker-files/thread/testWorker.js',
858 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
859 )
860 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
861 const promises = new Set()
862 const maxMultiplier = 2
863 for (let i = 0; i < max * maxMultiplier; i++) {
864 promises.add(pool.execute())
865 }
866 await Promise.all(promises)
867 for (const workerNode of pool.workerNodes) {
868 expect(workerNode.usage).toMatchObject({
869 tasks: {
870 executed: expect.any(Number),
871 executing: 0,
872 queued: 0,
873 maxQueued: 0,
874 failed: 0
875 },
876 runTime: {
877 history: expect.any(CircularArray)
878 },
879 waitTime: {
880 history: expect.any(CircularArray)
881 },
882 elu: {
883 idle: expect.objectContaining({
884 history: expect.any(CircularArray)
885 }),
886 active: expect.objectContaining({
887 history: expect.any(CircularArray)
888 })
889 }
890 })
891 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
892 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
893 max * maxMultiplier
894 )
895 if (workerNode.usage.elu.utilization == null) {
896 expect(workerNode.usage.elu.utilization).toBeUndefined()
897 } else {
898 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
899 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
900 }
901 }
902 // We need to clean up the resources after our test
903 await pool.destroy()
904 })
905
906 it('Verify FAIR_SHARE strategy default policy', async () => {
907 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
908 let pool = new FixedThreadPool(
909 max,
910 './tests/worker-files/thread/testWorker.js',
911 { workerChoiceStrategy }
912 )
913 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
914 useDynamicWorker: false
915 })
916 await pool.destroy()
917 pool = new DynamicThreadPool(
918 min,
919 max,
920 './tests/worker-files/thread/testWorker.js',
921 { workerChoiceStrategy }
922 )
923 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
924 useDynamicWorker: false
925 })
926 // We need to clean up the resources after our test
927 await pool.destroy()
928 })
929
930 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
931 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
932 let pool = new FixedThreadPool(
933 max,
934 './tests/worker-files/thread/testWorker.js',
935 { workerChoiceStrategy }
936 )
937 expect(
938 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
939 ).toStrictEqual({
940 runTime: {
941 aggregate: true,
942 average: true,
943 median: false
944 },
945 waitTime: {
946 aggregate: false,
947 average: false,
948 median: false
949 },
950 elu: {
951 aggregate: true,
952 average: true,
953 median: false
954 }
955 })
956 await pool.destroy()
957 pool = new DynamicThreadPool(
958 min,
959 max,
960 './tests/worker-files/thread/testWorker.js',
961 { workerChoiceStrategy }
962 )
963 expect(
964 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
965 ).toStrictEqual({
966 runTime: {
967 aggregate: true,
968 average: true,
969 median: false
970 },
971 waitTime: {
972 aggregate: false,
973 average: false,
974 median: false
975 },
976 elu: {
977 aggregate: true,
978 average: true,
979 median: false
980 }
981 })
982 // We need to clean up the resources after our test
983 await pool.destroy()
984 })
985
986 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
987 const pool = new FixedThreadPool(
988 max,
989 './tests/worker-files/thread/testWorker.js',
990 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
991 )
992 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
993 const promises = new Set()
994 const maxMultiplier = 2
995 for (let i = 0; i < max * maxMultiplier; i++) {
996 promises.add(pool.execute())
997 }
998 await Promise.all(promises)
999 for (const workerNode of pool.workerNodes) {
1000 expect(workerNode.usage).toMatchObject({
1001 tasks: {
1002 executed: expect.any(Number),
1003 executing: 0,
1004 queued: 0,
1005 maxQueued: 0,
1006 failed: 0
1007 },
1008 runTime: expect.objectContaining({
1009 history: expect.any(CircularArray)
1010 }),
1011 waitTime: {
1012 history: expect.any(CircularArray)
1013 },
1014 elu: {
1015 idle: expect.objectContaining({
1016 history: expect.any(CircularArray)
1017 }),
1018 active: expect.objectContaining({
1019 history: expect.any(CircularArray)
1020 })
1021 }
1022 })
1023 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1024 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1025 max * maxMultiplier
1026 )
1027 if (workerNode.usage.runTime.aggregate == null) {
1028 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1029 } else {
1030 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1031 }
1032 if (workerNode.usage.runTime.average == null) {
1033 expect(workerNode.usage.runTime.average).toBeUndefined()
1034 } else {
1035 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1036 }
1037 if (workerNode.usage.elu.utilization == null) {
1038 expect(workerNode.usage.elu.utilization).toBeUndefined()
1039 } else {
1040 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1041 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1042 }
1043 }
1044 expect(
1045 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1046 pool.workerChoiceStrategyContext.workerChoiceStrategy
1047 ).workersVirtualTaskEndTimestamp.length
1048 ).toBe(pool.workerNodes.length)
1049 // We need to clean up the resources after our test
1050 await pool.destroy()
1051 })
1052
1053 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1054 const pool = new DynamicThreadPool(
1055 min,
1056 max,
1057 './tests/worker-files/thread/testWorker.js',
1058 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1059 )
1060 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1061 const promises = new Set()
1062 const maxMultiplier = 2
1063 for (let i = 0; i < max * maxMultiplier; i++) {
1064 promises.add(pool.execute())
1065 }
1066 await Promise.all(promises)
1067 for (const workerNode of pool.workerNodes) {
1068 expect(workerNode.usage).toMatchObject({
1069 tasks: {
1070 executed: expect.any(Number),
1071 executing: 0,
1072 queued: 0,
1073 maxQueued: 0,
1074 failed: 0
1075 },
1076 runTime: expect.objectContaining({
1077 history: expect.any(CircularArray)
1078 }),
1079 waitTime: {
1080 history: expect.any(CircularArray)
1081 },
1082 elu: {
1083 idle: expect.objectContaining({
1084 history: expect.any(CircularArray)
1085 }),
1086 active: expect.objectContaining({
1087 history: expect.any(CircularArray)
1088 })
1089 }
1090 })
1091 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1092 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1093 max * maxMultiplier
1094 )
1095 if (workerNode.usage.runTime.aggregate == null) {
1096 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1097 } else {
1098 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1099 }
1100 if (workerNode.usage.runTime.average == null) {
1101 expect(workerNode.usage.runTime.average).toBeUndefined()
1102 } else {
1103 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1104 }
1105 if (workerNode.usage.elu.utilization == null) {
1106 expect(workerNode.usage.elu.utilization).toBeUndefined()
1107 } else {
1108 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1109 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1110 }
1111 }
1112 expect(
1113 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1114 pool.workerChoiceStrategyContext.workerChoiceStrategy
1115 ).workersVirtualTaskEndTimestamp.length
1116 ).toBe(pool.workerNodes.length)
1117 // We need to clean up the resources after our test
1118 await pool.destroy()
1119 })
1120
1121 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1122 const pool = new DynamicThreadPool(
1123 min,
1124 max,
1125 './tests/worker-files/thread/testWorker.js',
1126 {
1127 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1128 workerChoiceStrategyOptions: {
1129 runTime: { median: true }
1130 }
1131 }
1132 )
1133 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1134 const promises = new Set()
1135 const maxMultiplier = 2
1136 for (let i = 0; i < max * maxMultiplier; i++) {
1137 promises.add(pool.execute())
1138 }
1139 await Promise.all(promises)
1140 for (const workerNode of pool.workerNodes) {
1141 expect(workerNode.usage).toMatchObject({
1142 tasks: {
1143 executed: expect.any(Number),
1144 executing: 0,
1145 queued: 0,
1146 maxQueued: 0,
1147 failed: 0
1148 },
1149 runTime: expect.objectContaining({
1150 history: expect.any(CircularArray)
1151 }),
1152 waitTime: {
1153 history: expect.any(CircularArray)
1154 },
1155 elu: {
1156 idle: expect.objectContaining({
1157 history: expect.any(CircularArray)
1158 }),
1159 active: expect.objectContaining({
1160 history: expect.any(CircularArray)
1161 })
1162 }
1163 })
1164 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1165 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1166 max * maxMultiplier
1167 )
1168 if (workerNode.usage.runTime.aggregate == null) {
1169 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1170 } else {
1171 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1172 }
1173 if (workerNode.usage.runTime.median == null) {
1174 expect(workerNode.usage.runTime.median).toBeUndefined()
1175 } else {
1176 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1177 }
1178 if (workerNode.usage.elu.utilization == null) {
1179 expect(workerNode.usage.elu.utilization).toBeUndefined()
1180 } else {
1181 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1182 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1183 }
1184 }
1185 expect(
1186 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1187 pool.workerChoiceStrategyContext.workerChoiceStrategy
1188 ).workersVirtualTaskEndTimestamp.length
1189 ).toBe(pool.workerNodes.length)
1190 // We need to clean up the resources after our test
1191 await pool.destroy()
1192 })
1193
1194 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1195 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1196 let pool = new FixedThreadPool(
1197 max,
1198 './tests/worker-files/thread/testWorker.js'
1199 )
1200 expect(
1201 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1202 workerChoiceStrategy
1203 ).workersVirtualTaskEndTimestamp
1204 ).toBeInstanceOf(Array)
1205 expect(
1206 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1207 workerChoiceStrategy
1208 ).workersVirtualTaskEndTimestamp.length
1209 ).toBe(0)
1210 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1211 workerChoiceStrategy
1212 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1213 expect(
1214 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1215 workerChoiceStrategy
1216 ).workersVirtualTaskEndTimestamp.length
1217 ).toBe(1)
1218 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1219 expect(
1220 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1221 workerChoiceStrategy
1222 ).workersVirtualTaskEndTimestamp
1223 ).toBeInstanceOf(Array)
1224 expect(
1225 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1226 workerChoiceStrategy
1227 ).workersVirtualTaskEndTimestamp.length
1228 ).toBe(0)
1229 await pool.destroy()
1230 pool = new DynamicThreadPool(
1231 min,
1232 max,
1233 './tests/worker-files/thread/testWorker.js'
1234 )
1235 expect(
1236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1237 workerChoiceStrategy
1238 ).workersVirtualTaskEndTimestamp
1239 ).toBeInstanceOf(Array)
1240 expect(
1241 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1242 workerChoiceStrategy
1243 ).workersVirtualTaskEndTimestamp.length
1244 ).toBe(0)
1245 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1246 workerChoiceStrategy
1247 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1248 expect(
1249 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1250 workerChoiceStrategy
1251 ).workersVirtualTaskEndTimestamp.length
1252 ).toBe(1)
1253 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1254 expect(
1255 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1256 workerChoiceStrategy
1257 ).workersVirtualTaskEndTimestamp
1258 ).toBeInstanceOf(Array)
1259 expect(
1260 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1261 workerChoiceStrategy
1262 ).workersVirtualTaskEndTimestamp.length
1263 ).toBe(0)
1264 // We need to clean up the resources after our test
1265 await pool.destroy()
1266 })
1267
1268 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1269 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1270 let pool = new FixedThreadPool(
1271 max,
1272 './tests/worker-files/thread/testWorker.js',
1273 { workerChoiceStrategy }
1274 )
1275 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1276 useDynamicWorker: true
1277 })
1278 await pool.destroy()
1279 pool = new DynamicThreadPool(
1280 min,
1281 max,
1282 './tests/worker-files/thread/testWorker.js',
1283 { workerChoiceStrategy }
1284 )
1285 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1286 useDynamicWorker: true
1287 })
1288 // We need to clean up the resources after our test
1289 await pool.destroy()
1290 })
1291
1292 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1293 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1294 let pool = new FixedThreadPool(
1295 max,
1296 './tests/worker-files/thread/testWorker.js',
1297 { workerChoiceStrategy }
1298 )
1299 expect(
1300 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1301 ).toStrictEqual({
1302 runTime: {
1303 aggregate: true,
1304 average: true,
1305 median: false
1306 },
1307 waitTime: {
1308 aggregate: false,
1309 average: false,
1310 median: false
1311 },
1312 elu: {
1313 aggregate: false,
1314 average: false,
1315 median: false
1316 }
1317 })
1318 await pool.destroy()
1319 pool = new DynamicThreadPool(
1320 min,
1321 max,
1322 './tests/worker-files/thread/testWorker.js',
1323 { workerChoiceStrategy }
1324 )
1325 expect(
1326 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1327 ).toStrictEqual({
1328 runTime: {
1329 aggregate: true,
1330 average: true,
1331 median: false
1332 },
1333 waitTime: {
1334 aggregate: false,
1335 average: false,
1336 median: false
1337 },
1338 elu: {
1339 aggregate: false,
1340 average: false,
1341 median: false
1342 }
1343 })
1344 // We need to clean up the resources after our test
1345 await pool.destroy()
1346 })
1347
1348 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1349 const pool = new FixedThreadPool(
1350 max,
1351 './tests/worker-files/thread/testWorker.js',
1352 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1353 )
1354 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1355 const promises = new Set()
1356 const maxMultiplier = 2
1357 for (let i = 0; i < max * maxMultiplier; i++) {
1358 promises.add(pool.execute())
1359 }
1360 await Promise.all(promises)
1361 for (const workerNode of pool.workerNodes) {
1362 expect(workerNode.usage).toStrictEqual({
1363 tasks: {
1364 executed: expect.any(Number),
1365 executing: 0,
1366 queued: 0,
1367 maxQueued: 0,
1368 failed: 0
1369 },
1370 runTime: expect.objectContaining({
1371 history: expect.any(CircularArray)
1372 }),
1373 waitTime: {
1374 history: expect.any(CircularArray)
1375 },
1376 elu: {
1377 idle: {
1378 history: expect.any(CircularArray)
1379 },
1380 active: {
1381 history: expect.any(CircularArray)
1382 }
1383 }
1384 })
1385 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1386 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1387 max * maxMultiplier
1388 )
1389 if (workerNode.usage.runTime.aggregate == null) {
1390 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1391 } else {
1392 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1393 }
1394 if (workerNode.usage.runTime.average == null) {
1395 expect(workerNode.usage.runTime.average).toBeUndefined()
1396 } else {
1397 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1398 }
1399 }
1400 expect(
1401 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1402 pool.workerChoiceStrategyContext.workerChoiceStrategy
1403 ).defaultWorkerWeight
1404 ).toBeGreaterThan(0)
1405 expect(
1406 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1407 pool.workerChoiceStrategyContext.workerChoiceStrategy
1408 ).workerVirtualTaskRunTime
1409 ).toBeGreaterThanOrEqual(0)
1410 // We need to clean up the resources after our test
1411 await pool.destroy()
1412 })
1413
1414 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1415 const pool = new DynamicThreadPool(
1416 min,
1417 max,
1418 './tests/worker-files/thread/testWorker.js',
1419 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1420 )
1421 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1422 const promises = new Set()
1423 const maxMultiplier = 2
1424 for (let i = 0; i < max * maxMultiplier; i++) {
1425 promises.add(pool.execute())
1426 }
1427 await Promise.all(promises)
1428 for (const workerNode of pool.workerNodes) {
1429 expect(workerNode.usage).toStrictEqual({
1430 tasks: {
1431 executed: expect.any(Number),
1432 executing: 0,
1433 queued: 0,
1434 maxQueued: 0,
1435 failed: 0
1436 },
1437 runTime: {
1438 aggregate: expect.any(Number),
1439 maximum: expect.any(Number),
1440 minimum: expect.any(Number),
1441 average: expect.any(Number),
1442 history: expect.any(CircularArray)
1443 },
1444 waitTime: {
1445 history: expect.any(CircularArray)
1446 },
1447 elu: {
1448 idle: {
1449 history: expect.any(CircularArray)
1450 },
1451 active: {
1452 history: expect.any(CircularArray)
1453 }
1454 }
1455 })
1456 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1457 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1458 max * maxMultiplier
1459 )
1460 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1461 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1462 }
1463 expect(
1464 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1465 pool.workerChoiceStrategyContext.workerChoiceStrategy
1466 ).defaultWorkerWeight
1467 ).toBeGreaterThan(0)
1468 expect(
1469 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1470 pool.workerChoiceStrategyContext.workerChoiceStrategy
1471 ).workerVirtualTaskRunTime
1472 ).toBeGreaterThanOrEqual(0)
1473 // We need to clean up the resources after our test
1474 await pool.destroy()
1475 })
1476
1477 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1478 const pool = new DynamicThreadPool(
1479 min,
1480 max,
1481 './tests/worker-files/thread/testWorker.js',
1482 {
1483 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1484 workerChoiceStrategyOptions: {
1485 runTime: { median: true }
1486 }
1487 }
1488 )
1489 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1490 const promises = new Set()
1491 const maxMultiplier = 2
1492 for (let i = 0; i < max * maxMultiplier; i++) {
1493 promises.add(pool.execute())
1494 }
1495 await Promise.all(promises)
1496 for (const workerNode of pool.workerNodes) {
1497 expect(workerNode.usage).toStrictEqual({
1498 tasks: {
1499 executed: expect.any(Number),
1500 executing: 0,
1501 queued: 0,
1502 maxQueued: 0,
1503 failed: 0
1504 },
1505 runTime: {
1506 aggregate: expect.any(Number),
1507 maximum: expect.any(Number),
1508 minimum: expect.any(Number),
1509 median: expect.any(Number),
1510 history: expect.any(CircularArray)
1511 },
1512 waitTime: {
1513 history: expect.any(CircularArray)
1514 },
1515 elu: {
1516 idle: {
1517 history: expect.any(CircularArray)
1518 },
1519 active: {
1520 history: expect.any(CircularArray)
1521 }
1522 }
1523 })
1524 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1525 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1526 max * maxMultiplier
1527 )
1528 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1529 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1530 }
1531 expect(
1532 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1533 pool.workerChoiceStrategyContext.workerChoiceStrategy
1534 ).defaultWorkerWeight
1535 ).toBeGreaterThan(0)
1536 expect(
1537 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1538 pool.workerChoiceStrategyContext.workerChoiceStrategy
1539 ).workerVirtualTaskRunTime
1540 ).toBeGreaterThanOrEqual(0)
1541 // We need to clean up the resources after our test
1542 await pool.destroy()
1543 })
1544
1545 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1546 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1547 let pool = new FixedThreadPool(
1548 max,
1549 './tests/worker-files/thread/testWorker.js'
1550 )
1551 expect(
1552 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1553 workerChoiceStrategy
1554 ).nextWorkerNodeId
1555 ).toBeDefined()
1556 expect(
1557 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1558 workerChoiceStrategy
1559 ).defaultWorkerWeight
1560 ).toBeDefined()
1561 expect(
1562 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1563 workerChoiceStrategy
1564 ).workerVirtualTaskRunTime
1565 ).toBeDefined()
1566 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1567 expect(
1568 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1569 pool.workerChoiceStrategyContext.workerChoiceStrategy
1570 ).nextWorkerNodeId
1571 ).toBe(0)
1572 expect(
1573 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1574 pool.workerChoiceStrategyContext.workerChoiceStrategy
1575 ).defaultWorkerWeight
1576 ).toBeGreaterThan(0)
1577 expect(
1578 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1579 workerChoiceStrategy
1580 ).workerVirtualTaskRunTime
1581 ).toBe(0)
1582 await pool.destroy()
1583 pool = new DynamicThreadPool(
1584 min,
1585 max,
1586 './tests/worker-files/thread/testWorker.js'
1587 )
1588 expect(
1589 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1590 workerChoiceStrategy
1591 ).nextWorkerNodeId
1592 ).toBeDefined()
1593 expect(
1594 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1595 workerChoiceStrategy
1596 ).defaultWorkerWeight
1597 ).toBeDefined()
1598 expect(
1599 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1600 workerChoiceStrategy
1601 ).workerVirtualTaskRunTime
1602 ).toBeDefined()
1603 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1604 expect(
1605 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1606 pool.workerChoiceStrategyContext.workerChoiceStrategy
1607 ).nextWorkerNodeId
1608 ).toBe(0)
1609 expect(
1610 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1611 pool.workerChoiceStrategyContext.workerChoiceStrategy
1612 ).defaultWorkerWeight
1613 ).toBeGreaterThan(0)
1614 expect(
1615 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1616 workerChoiceStrategy
1617 ).workerVirtualTaskRunTime
1618 ).toBe(0)
1619 // We need to clean up the resources after our test
1620 await pool.destroy()
1621 })
1622
1623 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1624 const workerChoiceStrategy =
1625 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1626 let pool = new FixedThreadPool(
1627 max,
1628 './tests/worker-files/thread/testWorker.js',
1629 { workerChoiceStrategy }
1630 )
1631 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1632 useDynamicWorker: true
1633 })
1634 await pool.destroy()
1635 pool = new DynamicThreadPool(
1636 min,
1637 max,
1638 './tests/worker-files/thread/testWorker.js',
1639 { workerChoiceStrategy }
1640 )
1641 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1642 useDynamicWorker: true
1643 })
1644 // We need to clean up the resources after our test
1645 await pool.destroy()
1646 })
1647
1648 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1649 const workerChoiceStrategy =
1650 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1651 let pool = new FixedThreadPool(
1652 max,
1653 './tests/worker-files/thread/testWorker.js',
1654 { workerChoiceStrategy }
1655 )
1656 expect(
1657 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1658 ).toStrictEqual({
1659 runTime: {
1660 aggregate: false,
1661 average: false,
1662 median: false
1663 },
1664 waitTime: {
1665 aggregate: false,
1666 average: false,
1667 median: false
1668 },
1669 elu: {
1670 aggregate: false,
1671 average: false,
1672 median: false
1673 }
1674 })
1675 await pool.destroy()
1676 pool = new DynamicThreadPool(
1677 min,
1678 max,
1679 './tests/worker-files/thread/testWorker.js',
1680 { workerChoiceStrategy }
1681 )
1682 expect(
1683 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1684 ).toStrictEqual({
1685 runTime: {
1686 aggregate: false,
1687 average: false,
1688 median: false
1689 },
1690 waitTime: {
1691 aggregate: false,
1692 average: false,
1693 median: false
1694 },
1695 elu: {
1696 aggregate: false,
1697 average: false,
1698 median: false
1699 }
1700 })
1701 // We need to clean up the resources after our test
1702 await pool.destroy()
1703 })
1704
1705 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1706 const pool = new FixedThreadPool(
1707 max,
1708 './tests/worker-files/thread/testWorker.js',
1709 {
1710 workerChoiceStrategy:
1711 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1712 }
1713 )
1714 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1715 const promises = new Set()
1716 const maxMultiplier = 2
1717 for (let i = 0; i < max * maxMultiplier; i++) {
1718 promises.add(pool.execute())
1719 }
1720 await Promise.all(promises)
1721 for (const workerNode of pool.workerNodes) {
1722 expect(workerNode.usage).toStrictEqual({
1723 tasks: {
1724 executed: maxMultiplier,
1725 executing: 0,
1726 queued: 0,
1727 maxQueued: 0,
1728 failed: 0
1729 },
1730 runTime: {
1731 history: expect.any(CircularArray)
1732 },
1733 waitTime: {
1734 history: expect.any(CircularArray)
1735 },
1736 elu: {
1737 idle: {
1738 history: expect.any(CircularArray)
1739 },
1740 active: {
1741 history: expect.any(CircularArray)
1742 }
1743 }
1744 })
1745 }
1746 expect(
1747 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1748 pool.workerChoiceStrategyContext.workerChoiceStrategy
1749 ).defaultWorkerWeight
1750 ).toBeGreaterThan(0)
1751 expect(
1752 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1753 pool.workerChoiceStrategyContext.workerChoiceStrategy
1754 ).roundId
1755 ).toBe(0)
1756 expect(
1757 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1758 pool.workerChoiceStrategyContext.workerChoiceStrategy
1759 ).nextWorkerNodeId
1760 ).toBe(0)
1761 expect(
1762 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1763 pool.workerChoiceStrategyContext.workerChoiceStrategy
1764 ).roundWeights
1765 ).toStrictEqual([
1766 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1767 pool.workerChoiceStrategyContext.workerChoiceStrategy
1768 ).defaultWorkerWeight
1769 ])
1770 // We need to clean up the resources after our test
1771 await pool.destroy()
1772 })
1773
1774 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1775 const pool = new DynamicThreadPool(
1776 min,
1777 max,
1778 './tests/worker-files/thread/testWorker.js',
1779 {
1780 workerChoiceStrategy:
1781 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1782 }
1783 )
1784 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1785 const promises = new Set()
1786 const maxMultiplier = 2
1787 for (let i = 0; i < max * maxMultiplier; i++) {
1788 promises.add(pool.execute())
1789 }
1790 await Promise.all(promises)
1791 for (const workerNode of pool.workerNodes) {
1792 expect(workerNode.usage).toStrictEqual({
1793 tasks: {
1794 executed: maxMultiplier,
1795 executing: 0,
1796 queued: 0,
1797 maxQueued: 0,
1798 failed: 0
1799 },
1800 runTime: {
1801 history: expect.any(CircularArray)
1802 },
1803 waitTime: {
1804 history: expect.any(CircularArray)
1805 },
1806 elu: {
1807 idle: {
1808 history: expect.any(CircularArray)
1809 },
1810 active: {
1811 history: expect.any(CircularArray)
1812 }
1813 }
1814 })
1815 }
1816 expect(
1817 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1818 pool.workerChoiceStrategyContext.workerChoiceStrategy
1819 ).defaultWorkerWeight
1820 ).toBeGreaterThan(0)
1821 expect(
1822 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1823 pool.workerChoiceStrategyContext.workerChoiceStrategy
1824 ).roundId
1825 ).toBe(0)
1826 expect(
1827 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1828 pool.workerChoiceStrategyContext.workerChoiceStrategy
1829 ).nextWorkerNodeId
1830 ).toBe(0)
1831 expect(
1832 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1833 pool.workerChoiceStrategyContext.workerChoiceStrategy
1834 ).roundWeights
1835 ).toStrictEqual([
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 pool.workerChoiceStrategyContext.workerChoiceStrategy
1838 ).defaultWorkerWeight
1839 ])
1840 // We need to clean up the resources after our test
1841 await pool.destroy()
1842 })
1843
1844 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1845 const workerChoiceStrategy =
1846 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1847 let pool = new FixedThreadPool(
1848 max,
1849 './tests/worker-files/thread/testWorker.js'
1850 )
1851 expect(
1852 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1853 workerChoiceStrategy
1854 ).roundId
1855 ).toBeDefined()
1856 expect(
1857 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1858 workerChoiceStrategy
1859 ).nextWorkerNodeId
1860 ).toBeDefined()
1861 expect(
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 workerChoiceStrategy
1864 ).defaultWorkerWeight
1865 ).toBeDefined()
1866 expect(
1867 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1868 workerChoiceStrategy
1869 ).roundWeights
1870 ).toBeDefined()
1871 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1872 expect(
1873 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1874 workerChoiceStrategy
1875 ).roundId
1876 ).toBe(0)
1877 expect(
1878 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1879 pool.workerChoiceStrategyContext.workerChoiceStrategy
1880 ).nextWorkerNodeId
1881 ).toBe(0)
1882 expect(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1884 pool.workerChoiceStrategyContext.workerChoiceStrategy
1885 ).defaultWorkerWeight
1886 ).toBeGreaterThan(0)
1887 expect(
1888 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1889 workerChoiceStrategy
1890 ).roundWeights
1891 ).toStrictEqual([
1892 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1893 pool.workerChoiceStrategyContext.workerChoiceStrategy
1894 ).defaultWorkerWeight
1895 ])
1896 await pool.destroy()
1897 pool = new DynamicThreadPool(
1898 min,
1899 max,
1900 './tests/worker-files/thread/testWorker.js'
1901 )
1902 expect(
1903 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1904 workerChoiceStrategy
1905 ).roundId
1906 ).toBeDefined()
1907 expect(
1908 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1909 workerChoiceStrategy
1910 ).nextWorkerNodeId
1911 ).toBeDefined()
1912 expect(
1913 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1914 workerChoiceStrategy
1915 ).defaultWorkerWeight
1916 ).toBeDefined()
1917 expect(
1918 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1919 workerChoiceStrategy
1920 ).roundWeights
1921 ).toBeDefined()
1922 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1923 expect(
1924 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1925 pool.workerChoiceStrategyContext.workerChoiceStrategy
1926 ).nextWorkerNodeId
1927 ).toBe(0)
1928 expect(
1929 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1930 pool.workerChoiceStrategyContext.workerChoiceStrategy
1931 ).defaultWorkerWeight
1932 ).toBeGreaterThan(0)
1933 expect(
1934 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1935 workerChoiceStrategy
1936 ).roundWeights
1937 ).toStrictEqual([
1938 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1939 pool.workerChoiceStrategyContext.workerChoiceStrategy
1940 ).defaultWorkerWeight
1941 ])
1942 // We need to clean up the resources after our test
1943 await pool.destroy()
1944 })
1945
1946 it('Verify unknown strategy throw error', () => {
1947 expect(
1948 () =>
1949 new DynamicThreadPool(
1950 min,
1951 max,
1952 './tests/worker-files/thread/testWorker.js',
1953 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
1954 )
1955 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
1956 })
1957 })