fix: ensure newly created worker is used only if needed
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 WorkerChoiceStrategies,
4 DynamicThreadPool,
5 FixedThreadPool,
6 FixedClusterPool
7 } = require('../../../lib')
8 const { CircularArray } = require('../../../lib/circular-array')
9
10 describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeId
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).currentWorkerNodeId
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 useDynamicWorker: true
127 })
128 await pool.destroy()
129 pool = new DynamicThreadPool(
130 min,
131 max,
132 './tests/worker-files/thread/testWorker.js',
133 { workerChoiceStrategy }
134 )
135 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
136 useDynamicWorker: true
137 })
138 // We need to clean up the resources after our test
139 await pool.destroy()
140 })
141
142 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
143 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
144 let pool = new FixedThreadPool(
145 max,
146 './tests/worker-files/thread/testWorker.js',
147 { workerChoiceStrategy }
148 )
149 expect(
150 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
151 ).toStrictEqual({
152 runTime: {
153 aggregate: false,
154 average: false,
155 median: false
156 },
157 waitTime: {
158 aggregate: false,
159 average: false,
160 median: false
161 },
162 elu: {
163 aggregate: false,
164 average: false,
165 median: false
166 }
167 })
168 await pool.destroy()
169 pool = new DynamicThreadPool(
170 min,
171 max,
172 './tests/worker-files/thread/testWorker.js',
173 { workerChoiceStrategy }
174 )
175 expect(
176 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
177 ).toStrictEqual({
178 runTime: {
179 aggregate: false,
180 average: false,
181 median: false
182 },
183 waitTime: {
184 aggregate: false,
185 average: false,
186 median: false
187 },
188 elu: {
189 aggregate: false,
190 average: false,
191 median: false
192 }
193 })
194 // We need to clean up the resources after our test
195 await pool.destroy()
196 })
197
198 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
199 const pool = new FixedThreadPool(
200 max,
201 './tests/worker-files/thread/testWorker.js',
202 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
203 )
204 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
205 const promises = new Set()
206 const maxMultiplier = 2
207 for (let i = 0; i < max * maxMultiplier; i++) {
208 promises.add(pool.execute())
209 }
210 await Promise.all(promises)
211 for (const workerNode of pool.workerNodes) {
212 expect(workerNode.workerUsage).toStrictEqual({
213 tasks: {
214 executed: maxMultiplier,
215 executing: 0,
216 queued: 0,
217 failed: 0
218 },
219 runTime: {
220 aggregate: 0,
221 average: 0,
222 median: 0,
223 history: expect.any(CircularArray)
224 },
225 waitTime: {
226 aggregate: 0,
227 average: 0,
228 median: 0,
229 history: expect.any(CircularArray)
230 },
231 elu: {
232 idle: {
233 aggregate: 0,
234 average: 0,
235 median: 0,
236 history: expect.any(CircularArray)
237 },
238 active: {
239 aggregate: 0,
240 average: 0,
241 median: 0,
242 history: expect.any(CircularArray)
243 },
244 utilization: 0
245 }
246 })
247 }
248 expect(
249 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
250 WorkerChoiceStrategies.ROUND_ROBIN
251 ).nextWorkerNodeId
252 ).toBe(0)
253 // We need to clean up the resources after our test
254 await pool.destroy()
255 })
256
257 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
258 const pool = new DynamicThreadPool(
259 min,
260 max,
261 './tests/worker-files/thread/testWorker.js',
262 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
263 )
264 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
265 const promises = new Set()
266 const maxMultiplier = 2
267 for (let i = 0; i < max * maxMultiplier; i++) {
268 promises.add(pool.execute())
269 }
270 await Promise.all(promises)
271 for (const workerNode of pool.workerNodes) {
272 expect(workerNode.workerUsage).toStrictEqual({
273 tasks: {
274 executed: maxMultiplier,
275 executing: 0,
276 queued: 0,
277 failed: 0
278 },
279 runTime: {
280 aggregate: 0,
281 average: 0,
282 median: 0,
283 history: expect.any(CircularArray)
284 },
285 waitTime: {
286 aggregate: 0,
287 average: 0,
288 median: 0,
289 history: expect.any(CircularArray)
290 },
291 elu: {
292 idle: {
293 aggregate: 0,
294 average: 0,
295 median: 0,
296 history: expect.any(CircularArray)
297 },
298 active: {
299 aggregate: 0,
300 average: 0,
301 median: 0,
302 history: expect.any(CircularArray)
303 },
304 utilization: 0
305 }
306 })
307 }
308 expect(
309 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
310 WorkerChoiceStrategies.ROUND_ROBIN
311 ).nextWorkerNodeId
312 ).toBe(0)
313 // We need to clean up the resources after our test
314 await pool.destroy()
315 })
316
317 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
318 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
319 let pool = new FixedClusterPool(
320 max,
321 './tests/worker-files/cluster/testWorker.js',
322 { workerChoiceStrategy }
323 )
324 let results = new Set()
325 for (let i = 0; i < max; i++) {
326 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
327 }
328 expect(results.size).toBe(max)
329 await pool.destroy()
330 pool = new FixedThreadPool(
331 max,
332 './tests/worker-files/thread/testWorker.js',
333 { workerChoiceStrategy }
334 )
335 results = new Set()
336 for (let i = 0; i < max; i++) {
337 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
338 }
339 expect(results.size).toBe(max)
340 await pool.destroy()
341 })
342
343 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
344 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
345 let pool = new FixedThreadPool(
346 max,
347 './tests/worker-files/thread/testWorker.js',
348 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
349 )
350 expect(
351 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
352 workerChoiceStrategy
353 ).nextWorkerNodeId
354 ).toBeDefined()
355 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
356 expect(
357 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
358 pool.workerChoiceStrategyContext.workerChoiceStrategy
359 ).nextWorkerNodeId
360 ).toBe(0)
361 await pool.destroy()
362 pool = new DynamicThreadPool(
363 min,
364 max,
365 './tests/worker-files/thread/testWorker.js',
366 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
367 )
368 expect(
369 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
370 workerChoiceStrategy
371 ).nextWorkerNodeId
372 ).toBeDefined()
373 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
374 expect(
375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
376 pool.workerChoiceStrategyContext.workerChoiceStrategy
377 ).nextWorkerNodeId
378 ).toBe(0)
379 // We need to clean up the resources after our test
380 await pool.destroy()
381 })
382
383 it('Verify LEAST_USED strategy default policy', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
385 let pool = new FixedThreadPool(
386 max,
387 './tests/worker-files/thread/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
391 useDynamicWorker: false
392 })
393 await pool.destroy()
394 pool = new DynamicThreadPool(
395 min,
396 max,
397 './tests/worker-files/thread/testWorker.js',
398 { workerChoiceStrategy }
399 )
400 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
401 useDynamicWorker: false
402 })
403 // We need to clean up the resources after our test
404 await pool.destroy()
405 })
406
407 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
408 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
409 let pool = new FixedThreadPool(
410 max,
411 './tests/worker-files/thread/testWorker.js',
412 { workerChoiceStrategy }
413 )
414 expect(
415 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 ).toStrictEqual({
417 runTime: {
418 aggregate: false,
419 average: false,
420 median: false
421 },
422 waitTime: {
423 aggregate: false,
424 average: false,
425 median: false
426 },
427 elu: {
428 aggregate: false,
429 average: false,
430 median: false
431 }
432 })
433 await pool.destroy()
434 pool = new DynamicThreadPool(
435 min,
436 max,
437 './tests/worker-files/thread/testWorker.js',
438 { workerChoiceStrategy }
439 )
440 expect(
441 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
442 ).toStrictEqual({
443 runTime: {
444 aggregate: false,
445 average: false,
446 median: false
447 },
448 waitTime: {
449 aggregate: false,
450 average: false,
451 median: false
452 },
453 elu: {
454 aggregate: false,
455 average: false,
456 median: false
457 }
458 })
459 // We need to clean up the resources after our test
460 await pool.destroy()
461 })
462
463 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
464 const pool = new FixedThreadPool(
465 max,
466 './tests/worker-files/thread/testWorker.js',
467 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
468 )
469 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
470 const promises = new Set()
471 const maxMultiplier = 2
472 for (let i = 0; i < max * maxMultiplier; i++) {
473 promises.add(pool.execute())
474 }
475 await Promise.all(promises)
476 for (const workerNode of pool.workerNodes) {
477 expect(workerNode.workerUsage).toStrictEqual({
478 tasks: {
479 executed: maxMultiplier,
480 executing: 0,
481 queued: 0,
482 failed: 0
483 },
484 runTime: {
485 aggregate: 0,
486 average: 0,
487 median: 0,
488 history: expect.any(CircularArray)
489 },
490 waitTime: {
491 aggregate: 0,
492 average: 0,
493 median: 0,
494 history: expect.any(CircularArray)
495 },
496 elu: {
497 idle: {
498 aggregate: 0,
499 average: 0,
500 median: 0,
501 history: expect.any(CircularArray)
502 },
503 active: {
504 aggregate: 0,
505 average: 0,
506 median: 0,
507 history: expect.any(CircularArray)
508 },
509 utilization: 0
510 }
511 })
512 }
513 // We need to clean up the resources after our test
514 await pool.destroy()
515 })
516
517 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
518 const pool = new DynamicThreadPool(
519 min,
520 max,
521 './tests/worker-files/thread/testWorker.js',
522 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
523 )
524 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
525 const promises = new Set()
526 const maxMultiplier = 2
527 for (let i = 0; i < max * maxMultiplier; i++) {
528 promises.add(pool.execute())
529 }
530 await Promise.all(promises)
531 for (const workerNode of pool.workerNodes) {
532 expect(workerNode.workerUsage).toStrictEqual({
533 tasks: {
534 executed: maxMultiplier,
535 executing: 0,
536 queued: 0,
537 failed: 0
538 },
539 runTime: {
540 aggregate: 0,
541 average: 0,
542 median: 0,
543 history: expect.any(CircularArray)
544 },
545 waitTime: {
546 aggregate: 0,
547 average: 0,
548 median: 0,
549 history: expect.any(CircularArray)
550 },
551 elu: {
552 idle: {
553 aggregate: 0,
554 average: 0,
555 median: 0,
556 history: expect.any(CircularArray)
557 },
558 active: {
559 aggregate: 0,
560 average: 0,
561 median: 0,
562 history: expect.any(CircularArray)
563 },
564 utilization: 0
565 }
566 })
567 }
568 // We need to clean up the resources after our test
569 await pool.destroy()
570 })
571
572 it('Verify LEAST_BUSY strategy default policy', async () => {
573 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
574 let pool = new FixedThreadPool(
575 max,
576 './tests/worker-files/thread/testWorker.js',
577 { workerChoiceStrategy }
578 )
579 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
580 useDynamicWorker: false
581 })
582 await pool.destroy()
583 pool = new DynamicThreadPool(
584 min,
585 max,
586 './tests/worker-files/thread/testWorker.js',
587 { workerChoiceStrategy }
588 )
589 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
590 useDynamicWorker: false
591 })
592 // We need to clean up the resources after our test
593 await pool.destroy()
594 })
595
596 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
597 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
598 let pool = new FixedThreadPool(
599 max,
600 './tests/worker-files/thread/testWorker.js',
601 { workerChoiceStrategy }
602 )
603 expect(
604 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
605 ).toStrictEqual({
606 runTime: {
607 aggregate: true,
608 average: false,
609 median: false
610 },
611 waitTime: {
612 aggregate: true,
613 average: false,
614 median: false
615 },
616 elu: {
617 aggregate: false,
618 average: false,
619 median: false
620 }
621 })
622 await pool.destroy()
623 pool = new DynamicThreadPool(
624 min,
625 max,
626 './tests/worker-files/thread/testWorker.js',
627 { workerChoiceStrategy }
628 )
629 expect(
630 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
631 ).toStrictEqual({
632 runTime: {
633 aggregate: true,
634 average: false,
635 median: false
636 },
637 waitTime: {
638 aggregate: true,
639 average: false,
640 median: false
641 },
642 elu: {
643 aggregate: false,
644 average: false,
645 median: false
646 }
647 })
648 // We need to clean up the resources after our test
649 await pool.destroy()
650 })
651
652 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
653 const pool = new FixedThreadPool(
654 max,
655 './tests/worker-files/thread/testWorker.js',
656 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
657 )
658 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
659 const promises = new Set()
660 const maxMultiplier = 2
661 for (let i = 0; i < max * maxMultiplier; i++) {
662 promises.add(pool.execute())
663 }
664 await Promise.all(promises)
665 for (const workerNode of pool.workerNodes) {
666 expect(workerNode.workerUsage).toStrictEqual({
667 tasks: {
668 executed: expect.any(Number),
669 executing: 0,
670 queued: 0,
671 failed: 0
672 },
673 runTime: {
674 aggregate: expect.any(Number),
675 average: 0,
676 median: 0,
677 history: expect.any(CircularArray)
678 },
679 waitTime: {
680 aggregate: expect.any(Number),
681 average: 0,
682 median: 0,
683 history: expect.any(CircularArray)
684 },
685 elu: {
686 idle: {
687 aggregate: 0,
688 average: 0,
689 median: 0,
690 history: expect.any(CircularArray)
691 },
692 active: {
693 aggregate: 0,
694 average: 0,
695 median: 0,
696 history: expect.any(CircularArray)
697 },
698 utilization: 0
699 }
700 })
701 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
702 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
703 max * maxMultiplier
704 )
705 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
706 expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual(
707 0
708 )
709 }
710 // We need to clean up the resources after our test
711 await pool.destroy()
712 })
713
714 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
715 const pool = new DynamicThreadPool(
716 min,
717 max,
718 './tests/worker-files/thread/testWorker.js',
719 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
720 )
721 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
722 const promises = new Set()
723 const maxMultiplier = 2
724 for (let i = 0; i < max * maxMultiplier; i++) {
725 promises.add(pool.execute())
726 }
727 await Promise.all(promises)
728 for (const workerNode of pool.workerNodes) {
729 expect(workerNode.workerUsage).toStrictEqual({
730 tasks: {
731 executed: expect.any(Number),
732 executing: 0,
733 queued: 0,
734 failed: 0
735 },
736 runTime: {
737 aggregate: expect.any(Number),
738 average: 0,
739 median: 0,
740 history: expect.any(CircularArray)
741 },
742 waitTime: {
743 aggregate: expect.any(Number),
744 average: 0,
745 median: 0,
746 history: expect.any(CircularArray)
747 },
748 elu: {
749 idle: {
750 aggregate: 0,
751 average: 0,
752 median: 0,
753 history: expect.any(CircularArray)
754 },
755 active: {
756 aggregate: 0,
757 average: 0,
758 median: 0,
759 history: expect.any(CircularArray)
760 },
761 utilization: 0
762 }
763 })
764 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
765 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
766 max * maxMultiplier
767 )
768 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
769 expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThan(0)
770 }
771 // We need to clean up the resources after our test
772 await pool.destroy()
773 })
774
775 it('Verify LEAST_ELU strategy default policy', async () => {
776 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
777 let pool = new FixedThreadPool(
778 max,
779 './tests/worker-files/thread/testWorker.js',
780 { workerChoiceStrategy }
781 )
782 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
783 useDynamicWorker: false
784 })
785 await pool.destroy()
786 pool = new DynamicThreadPool(
787 min,
788 max,
789 './tests/worker-files/thread/testWorker.js',
790 { workerChoiceStrategy }
791 )
792 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
793 useDynamicWorker: false
794 })
795 // We need to clean up the resources after our test
796 await pool.destroy()
797 })
798
799 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
800 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
801 let pool = new FixedThreadPool(
802 max,
803 './tests/worker-files/thread/testWorker.js',
804 { workerChoiceStrategy }
805 )
806 expect(
807 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
808 ).toStrictEqual({
809 runTime: {
810 aggregate: false,
811 average: false,
812 median: false
813 },
814 waitTime: {
815 aggregate: false,
816 average: false,
817 median: false
818 },
819 elu: {
820 aggregate: true,
821 average: false,
822 median: false
823 }
824 })
825 await pool.destroy()
826 pool = new DynamicThreadPool(
827 min,
828 max,
829 './tests/worker-files/thread/testWorker.js',
830 { workerChoiceStrategy }
831 )
832 expect(
833 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
834 ).toStrictEqual({
835 runTime: {
836 aggregate: false,
837 average: false,
838 median: false
839 },
840 waitTime: {
841 aggregate: false,
842 average: false,
843 median: false
844 },
845 elu: {
846 aggregate: true,
847 average: false,
848 median: false
849 }
850 })
851 // We need to clean up the resources after our test
852 await pool.destroy()
853 })
854
855 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
856 const pool = new FixedThreadPool(
857 max,
858 './tests/worker-files/thread/testWorker.js',
859 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
860 )
861 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
862 const promises = new Set()
863 const maxMultiplier = 2
864 for (let i = 0; i < max * maxMultiplier; i++) {
865 promises.add(pool.execute())
866 }
867 await Promise.all(promises)
868 for (const workerNode of pool.workerNodes) {
869 expect(workerNode.workerUsage).toStrictEqual({
870 tasks: {
871 executed: expect.any(Number),
872 executing: 0,
873 queued: 0,
874 failed: 0
875 },
876 runTime: {
877 aggregate: 0,
878 average: 0,
879 median: 0,
880 history: expect.any(CircularArray)
881 },
882 waitTime: {
883 aggregate: 0,
884 average: 0,
885 median: 0,
886 history: expect.any(CircularArray)
887 },
888 elu: {
889 idle: {
890 aggregate: 0,
891 average: 0,
892 median: 0,
893 history: expect.any(CircularArray)
894 },
895 active: {
896 aggregate: expect.any(Number),
897 average: 0,
898 median: 0,
899 history: expect.any(CircularArray)
900 },
901 utilization: expect.any(Number)
902 }
903 })
904 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
905 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
906 max * maxMultiplier
907 )
908 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
909 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
910 }
911 // We need to clean up the resources after our test
912 await pool.destroy()
913 })
914
915 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
916 const pool = new DynamicThreadPool(
917 min,
918 max,
919 './tests/worker-files/thread/testWorker.js',
920 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
921 )
922 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
923 const promises = new Set()
924 const maxMultiplier = 2
925 for (let i = 0; i < max * maxMultiplier; i++) {
926 promises.add(pool.execute())
927 }
928 await Promise.all(promises)
929 for (const workerNode of pool.workerNodes) {
930 expect(workerNode.workerUsage).toStrictEqual({
931 tasks: {
932 executed: expect.any(Number),
933 executing: 0,
934 queued: 0,
935 failed: 0
936 },
937 runTime: {
938 aggregate: 0,
939 average: 0,
940 median: 0,
941 history: expect.any(CircularArray)
942 },
943 waitTime: {
944 aggregate: 0,
945 average: 0,
946 median: 0,
947 history: expect.any(CircularArray)
948 },
949 elu: {
950 idle: {
951 aggregate: 0,
952 average: 0,
953 median: 0,
954 history: expect.any(CircularArray)
955 },
956 active: {
957 aggregate: expect.any(Number),
958 average: 0,
959 median: 0,
960 history: expect.any(CircularArray)
961 },
962 utilization: expect.any(Number)
963 }
964 })
965 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
966 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
967 max * maxMultiplier
968 )
969 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
970 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
971 }
972 // We need to clean up the resources after our test
973 await pool.destroy()
974 })
975
976 it('Verify FAIR_SHARE strategy default policy', async () => {
977 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
978 let pool = new FixedThreadPool(
979 max,
980 './tests/worker-files/thread/testWorker.js',
981 { workerChoiceStrategy }
982 )
983 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
984 useDynamicWorker: false
985 })
986 await pool.destroy()
987 pool = new DynamicThreadPool(
988 min,
989 max,
990 './tests/worker-files/thread/testWorker.js',
991 { workerChoiceStrategy }
992 )
993 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
994 useDynamicWorker: false
995 })
996 // We need to clean up the resources after our test
997 await pool.destroy()
998 })
999
1000 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1001 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1002 let pool = new FixedThreadPool(
1003 max,
1004 './tests/worker-files/thread/testWorker.js',
1005 { workerChoiceStrategy }
1006 )
1007 expect(
1008 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1009 ).toStrictEqual({
1010 runTime: {
1011 aggregate: true,
1012 average: true,
1013 median: false
1014 },
1015 waitTime: {
1016 aggregate: false,
1017 average: false,
1018 median: false
1019 },
1020 elu: {
1021 aggregate: true,
1022 average: true,
1023 median: false
1024 }
1025 })
1026 await pool.destroy()
1027 pool = new DynamicThreadPool(
1028 min,
1029 max,
1030 './tests/worker-files/thread/testWorker.js',
1031 { workerChoiceStrategy }
1032 )
1033 expect(
1034 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1035 ).toStrictEqual({
1036 runTime: {
1037 aggregate: true,
1038 average: true,
1039 median: false
1040 },
1041 waitTime: {
1042 aggregate: false,
1043 average: false,
1044 median: false
1045 },
1046 elu: {
1047 aggregate: true,
1048 average: true,
1049 median: false
1050 }
1051 })
1052 // We need to clean up the resources after our test
1053 await pool.destroy()
1054 })
1055
1056 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1057 const pool = new FixedThreadPool(
1058 max,
1059 './tests/worker-files/thread/testWorker.js',
1060 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1061 )
1062 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1063 const promises = new Set()
1064 const maxMultiplier = 2
1065 for (let i = 0; i < max * maxMultiplier; i++) {
1066 promises.add(pool.execute())
1067 }
1068 await Promise.all(promises)
1069 for (const workerNode of pool.workerNodes) {
1070 expect(workerNode.workerUsage).toStrictEqual({
1071 tasks: {
1072 executed: maxMultiplier,
1073 executing: 0,
1074 queued: 0,
1075 failed: 0
1076 },
1077 runTime: {
1078 aggregate: expect.any(Number),
1079 average: expect.any(Number),
1080 median: 0,
1081 history: expect.any(CircularArray)
1082 },
1083 waitTime: {
1084 aggregate: 0,
1085 average: 0,
1086 median: 0,
1087 history: expect.any(CircularArray)
1088 },
1089 elu: {
1090 idle: {
1091 aggregate: 0,
1092 average: 0,
1093 median: 0,
1094 history: expect.any(CircularArray)
1095 },
1096 active: {
1097 aggregate: expect.any(Number),
1098 average: expect.any(Number),
1099 median: 0,
1100 history: expect.any(CircularArray)
1101 },
1102 utilization: expect.any(Number)
1103 }
1104 })
1105 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1106 expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
1107 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1108 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1109 }
1110 expect(
1111 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1112 pool.workerChoiceStrategyContext.workerChoiceStrategy
1113 ).workersVirtualTaskEndTimestamp.length
1114 ).toBe(pool.workerNodes.length)
1115 // We need to clean up the resources after our test
1116 await pool.destroy()
1117 })
1118
1119 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1120 const pool = new DynamicThreadPool(
1121 min,
1122 max,
1123 './tests/worker-files/thread/testWorker.js',
1124 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1125 )
1126 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1127 const promises = new Set()
1128 const maxMultiplier = 2
1129 for (let i = 0; i < max * maxMultiplier; i++) {
1130 promises.add(pool.execute())
1131 }
1132 await Promise.all(promises)
1133 for (const workerNode of pool.workerNodes) {
1134 expect(workerNode.workerUsage).toStrictEqual({
1135 tasks: {
1136 executed: expect.any(Number),
1137 executing: 0,
1138 queued: 0,
1139 failed: 0
1140 },
1141 runTime: {
1142 aggregate: expect.any(Number),
1143 average: expect.any(Number),
1144 median: 0,
1145 history: expect.any(CircularArray)
1146 },
1147 waitTime: {
1148 aggregate: 0,
1149 average: 0,
1150 median: 0,
1151 history: expect.any(CircularArray)
1152 },
1153 elu: {
1154 idle: {
1155 aggregate: 0,
1156 average: 0,
1157 median: 0,
1158 history: expect.any(CircularArray)
1159 },
1160 active: {
1161 aggregate: expect.any(Number),
1162 average: expect.any(Number),
1163 median: 0,
1164 history: expect.any(CircularArray)
1165 },
1166 utilization: expect.any(Number)
1167 }
1168 })
1169 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
1170 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1171 max * maxMultiplier
1172 )
1173 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1174 expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
1175 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1176 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1177 }
1178 expect(
1179 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1180 pool.workerChoiceStrategyContext.workerChoiceStrategy
1181 ).workersVirtualTaskEndTimestamp.length
1182 ).toBe(pool.workerNodes.length)
1183 // We need to clean up the resources after our test
1184 await pool.destroy()
1185 })
1186
1187 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1188 const pool = new DynamicThreadPool(
1189 min,
1190 max,
1191 './tests/worker-files/thread/testWorker.js',
1192 {
1193 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1194 workerChoiceStrategyOptions: {
1195 runTime: { median: true }
1196 }
1197 }
1198 )
1199 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1200 const promises = new Set()
1201 const maxMultiplier = 2
1202 for (let i = 0; i < max * maxMultiplier; i++) {
1203 promises.add(pool.execute())
1204 }
1205 await Promise.all(promises)
1206 for (const workerNode of pool.workerNodes) {
1207 expect(workerNode.workerUsage).toStrictEqual({
1208 tasks: {
1209 executed: expect.any(Number),
1210 executing: 0,
1211 queued: 0,
1212 failed: 0
1213 },
1214 runTime: {
1215 aggregate: expect.any(Number),
1216 average: 0,
1217 median: expect.any(Number),
1218 history: expect.any(CircularArray)
1219 },
1220 waitTime: {
1221 aggregate: 0,
1222 average: 0,
1223 median: 0,
1224 history: expect.any(CircularArray)
1225 },
1226 elu: {
1227 idle: {
1228 aggregate: 0,
1229 average: 0,
1230 median: 0,
1231 history: expect.any(CircularArray)
1232 },
1233 active: {
1234 aggregate: expect.any(Number),
1235 average: expect.any(Number),
1236 median: 0,
1237 history: expect.any(CircularArray)
1238 },
1239 utilization: expect.any(Number)
1240 }
1241 })
1242 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
1243 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1244 max * maxMultiplier
1245 )
1246 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1247 expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
1248 expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
1249 expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
1250 }
1251 expect(
1252 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1253 pool.workerChoiceStrategyContext.workerChoiceStrategy
1254 ).workersVirtualTaskEndTimestamp.length
1255 ).toBe(pool.workerNodes.length)
1256 // We need to clean up the resources after our test
1257 await pool.destroy()
1258 })
1259
1260 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1261 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1262 let pool = new FixedThreadPool(
1263 max,
1264 './tests/worker-files/thread/testWorker.js'
1265 )
1266 expect(
1267 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1268 workerChoiceStrategy
1269 ).workersVirtualTaskEndTimestamp
1270 ).toBeInstanceOf(Array)
1271 expect(
1272 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1273 workerChoiceStrategy
1274 ).workersVirtualTaskEndTimestamp.length
1275 ).toBe(0)
1276 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1277 workerChoiceStrategy
1278 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1279 expect(
1280 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1281 workerChoiceStrategy
1282 ).workersVirtualTaskEndTimestamp.length
1283 ).toBe(1)
1284 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1285 expect(
1286 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1287 workerChoiceStrategy
1288 ).workersVirtualTaskEndTimestamp
1289 ).toBeInstanceOf(Array)
1290 expect(
1291 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1292 workerChoiceStrategy
1293 ).workersVirtualTaskEndTimestamp.length
1294 ).toBe(0)
1295 await pool.destroy()
1296 pool = new DynamicThreadPool(
1297 min,
1298 max,
1299 './tests/worker-files/thread/testWorker.js'
1300 )
1301 expect(
1302 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1303 workerChoiceStrategy
1304 ).workersVirtualTaskEndTimestamp
1305 ).toBeInstanceOf(Array)
1306 expect(
1307 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1308 workerChoiceStrategy
1309 ).workersVirtualTaskEndTimestamp.length
1310 ).toBe(0)
1311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1312 workerChoiceStrategy
1313 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1314 expect(
1315 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1316 workerChoiceStrategy
1317 ).workersVirtualTaskEndTimestamp.length
1318 ).toBe(1)
1319 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1320 expect(
1321 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1322 workerChoiceStrategy
1323 ).workersVirtualTaskEndTimestamp
1324 ).toBeInstanceOf(Array)
1325 expect(
1326 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1327 workerChoiceStrategy
1328 ).workersVirtualTaskEndTimestamp.length
1329 ).toBe(0)
1330 // We need to clean up the resources after our test
1331 await pool.destroy()
1332 })
1333
1334 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1335 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1336 let pool = new FixedThreadPool(
1337 max,
1338 './tests/worker-files/thread/testWorker.js',
1339 { workerChoiceStrategy }
1340 )
1341 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1342 useDynamicWorker: true
1343 })
1344 await pool.destroy()
1345 pool = new DynamicThreadPool(
1346 min,
1347 max,
1348 './tests/worker-files/thread/testWorker.js',
1349 { workerChoiceStrategy }
1350 )
1351 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1352 useDynamicWorker: true
1353 })
1354 // We need to clean up the resources after our test
1355 await pool.destroy()
1356 })
1357
1358 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1359 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1360 let pool = new FixedThreadPool(
1361 max,
1362 './tests/worker-files/thread/testWorker.js',
1363 { workerChoiceStrategy }
1364 )
1365 expect(
1366 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1367 ).toStrictEqual({
1368 runTime: {
1369 aggregate: true,
1370 average: true,
1371 median: false
1372 },
1373 waitTime: {
1374 aggregate: false,
1375 average: false,
1376 median: false
1377 },
1378 elu: {
1379 aggregate: false,
1380 average: false,
1381 median: false
1382 }
1383 })
1384 await pool.destroy()
1385 pool = new DynamicThreadPool(
1386 min,
1387 max,
1388 './tests/worker-files/thread/testWorker.js',
1389 { workerChoiceStrategy }
1390 )
1391 expect(
1392 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1393 ).toStrictEqual({
1394 runTime: {
1395 aggregate: true,
1396 average: true,
1397 median: false
1398 },
1399 waitTime: {
1400 aggregate: false,
1401 average: false,
1402 median: false
1403 },
1404 elu: {
1405 aggregate: false,
1406 average: false,
1407 median: false
1408 }
1409 })
1410 // We need to clean up the resources after our test
1411 await pool.destroy()
1412 })
1413
1414 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1415 const pool = new FixedThreadPool(
1416 max,
1417 './tests/worker-files/thread/testWorker.js',
1418 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1419 )
1420 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1421 const promises = new Set()
1422 const maxMultiplier = 2
1423 for (let i = 0; i < max * maxMultiplier; i++) {
1424 promises.add(pool.execute())
1425 }
1426 await Promise.all(promises)
1427 for (const workerNode of pool.workerNodes) {
1428 expect(workerNode.workerUsage).toStrictEqual({
1429 tasks: {
1430 executed: expect.any(Number),
1431 executing: 0,
1432 queued: 0,
1433 failed: 0
1434 },
1435 runTime: {
1436 aggregate: expect.any(Number),
1437 average: expect.any(Number),
1438 median: 0,
1439 history: expect.any(CircularArray)
1440 },
1441 waitTime: {
1442 aggregate: 0,
1443 average: 0,
1444 median: 0,
1445 history: expect.any(CircularArray)
1446 },
1447 elu: {
1448 idle: {
1449 aggregate: 0,
1450 average: 0,
1451 median: 0,
1452 history: expect.any(CircularArray)
1453 },
1454 active: {
1455 aggregate: 0,
1456 average: 0,
1457 median: 0,
1458 history: expect.any(CircularArray)
1459 },
1460 utilization: 0
1461 }
1462 })
1463 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1464 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1465 max * maxMultiplier
1466 )
1467 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
1468 expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
1469 }
1470 expect(
1471 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1472 pool.workerChoiceStrategyContext.workerChoiceStrategy
1473 ).defaultWorkerWeight
1474 ).toBeGreaterThan(0)
1475 expect(
1476 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1477 pool.workerChoiceStrategyContext.workerChoiceStrategy
1478 ).workerVirtualTaskRunTime
1479 ).toBeGreaterThanOrEqual(0)
1480 // We need to clean up the resources after our test
1481 await pool.destroy()
1482 })
1483
1484 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1485 const pool = new DynamicThreadPool(
1486 min,
1487 max,
1488 './tests/worker-files/thread/testWorker.js',
1489 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1490 )
1491 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1492 const promises = new Set()
1493 const maxMultiplier = 2
1494 for (let i = 0; i < max * maxMultiplier; i++) {
1495 promises.add(pool.execute())
1496 }
1497 await Promise.all(promises)
1498 for (const workerNode of pool.workerNodes) {
1499 expect(workerNode.workerUsage).toStrictEqual({
1500 tasks: {
1501 executed: expect.any(Number),
1502 executing: 0,
1503 queued: 0,
1504 failed: 0
1505 },
1506 runTime: {
1507 aggregate: expect.any(Number),
1508 average: expect.any(Number),
1509 median: 0,
1510 history: expect.any(CircularArray)
1511 },
1512 waitTime: {
1513 aggregate: 0,
1514 average: 0,
1515 median: 0,
1516 history: expect.any(CircularArray)
1517 },
1518 elu: {
1519 idle: {
1520 aggregate: 0,
1521 average: 0,
1522 median: 0,
1523 history: expect.any(CircularArray)
1524 },
1525 active: {
1526 aggregate: 0,
1527 average: 0,
1528 median: 0,
1529 history: expect.any(CircularArray)
1530 },
1531 utilization: 0
1532 }
1533 })
1534 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1535 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1536 max * maxMultiplier
1537 )
1538 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1539 expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
1540 }
1541 expect(
1542 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1543 pool.workerChoiceStrategyContext.workerChoiceStrategy
1544 ).defaultWorkerWeight
1545 ).toBeGreaterThan(0)
1546 expect(
1547 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1548 pool.workerChoiceStrategyContext.workerChoiceStrategy
1549 ).workerVirtualTaskRunTime
1550 ).toBeGreaterThanOrEqual(0)
1551 // We need to clean up the resources after our test
1552 await pool.destroy()
1553 })
1554
1555 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1556 const pool = new DynamicThreadPool(
1557 min,
1558 max,
1559 './tests/worker-files/thread/testWorker.js',
1560 {
1561 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1562 workerChoiceStrategyOptions: {
1563 runTime: { median: true }
1564 }
1565 }
1566 )
1567 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1568 const promises = new Set()
1569 const maxMultiplier = 2
1570 for (let i = 0; i < max * maxMultiplier; i++) {
1571 promises.add(pool.execute())
1572 }
1573 await Promise.all(promises)
1574 for (const workerNode of pool.workerNodes) {
1575 expect(workerNode.workerUsage).toStrictEqual({
1576 tasks: {
1577 executed: expect.any(Number),
1578 executing: 0,
1579 queued: 0,
1580 failed: 0
1581 },
1582 runTime: {
1583 aggregate: expect.any(Number),
1584 average: 0,
1585 median: expect.any(Number),
1586 history: expect.any(CircularArray)
1587 },
1588 waitTime: {
1589 aggregate: 0,
1590 average: 0,
1591 median: 0,
1592 history: expect.any(CircularArray)
1593 },
1594 elu: {
1595 idle: {
1596 aggregate: 0,
1597 average: 0,
1598 median: 0,
1599 history: expect.any(CircularArray)
1600 },
1601 active: {
1602 aggregate: 0,
1603 average: 0,
1604 median: 0,
1605 history: expect.any(CircularArray)
1606 },
1607 utilization: 0
1608 }
1609 })
1610 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
1611 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
1612 max * maxMultiplier
1613 )
1614 expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
1615 expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
1616 }
1617 expect(
1618 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1619 pool.workerChoiceStrategyContext.workerChoiceStrategy
1620 ).defaultWorkerWeight
1621 ).toBeGreaterThan(0)
1622 expect(
1623 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1624 pool.workerChoiceStrategyContext.workerChoiceStrategy
1625 ).workerVirtualTaskRunTime
1626 ).toBeGreaterThanOrEqual(0)
1627 // We need to clean up the resources after our test
1628 await pool.destroy()
1629 })
1630
1631 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1632 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1633 let pool = new FixedThreadPool(
1634 max,
1635 './tests/worker-files/thread/testWorker.js'
1636 )
1637 expect(
1638 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1639 workerChoiceStrategy
1640 ).currentWorkerNodeId
1641 ).toBeDefined()
1642 expect(
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 workerChoiceStrategy
1645 ).defaultWorkerWeight
1646 ).toBeDefined()
1647 expect(
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 workerChoiceStrategy
1650 ).workerVirtualTaskRunTime
1651 ).toBeDefined()
1652 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1653 expect(
1654 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1655 pool.workerChoiceStrategyContext.workerChoiceStrategy
1656 ).currentWorkerNodeId
1657 ).toBe(0)
1658 expect(
1659 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1660 pool.workerChoiceStrategyContext.workerChoiceStrategy
1661 ).defaultWorkerWeight
1662 ).toBeGreaterThan(0)
1663 expect(
1664 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1665 workerChoiceStrategy
1666 ).workerVirtualTaskRunTime
1667 ).toBe(0)
1668 await pool.destroy()
1669 pool = new DynamicThreadPool(
1670 min,
1671 max,
1672 './tests/worker-files/thread/testWorker.js'
1673 )
1674 expect(
1675 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1676 workerChoiceStrategy
1677 ).currentWorkerNodeId
1678 ).toBeDefined()
1679 expect(
1680 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1681 workerChoiceStrategy
1682 ).defaultWorkerWeight
1683 ).toBeDefined()
1684 expect(
1685 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1686 workerChoiceStrategy
1687 ).workerVirtualTaskRunTime
1688 ).toBeDefined()
1689 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1690 expect(
1691 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1692 pool.workerChoiceStrategyContext.workerChoiceStrategy
1693 ).currentWorkerNodeId
1694 ).toBe(0)
1695 expect(
1696 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1697 pool.workerChoiceStrategyContext.workerChoiceStrategy
1698 ).defaultWorkerWeight
1699 ).toBeGreaterThan(0)
1700 expect(
1701 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1702 workerChoiceStrategy
1703 ).workerVirtualTaskRunTime
1704 ).toBe(0)
1705 // We need to clean up the resources after our test
1706 await pool.destroy()
1707 })
1708
1709 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1710 const workerChoiceStrategy =
1711 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1712 let pool = new FixedThreadPool(
1713 max,
1714 './tests/worker-files/thread/testWorker.js',
1715 { workerChoiceStrategy }
1716 )
1717 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1718 useDynamicWorker: true
1719 })
1720 await pool.destroy()
1721 pool = new DynamicThreadPool(
1722 min,
1723 max,
1724 './tests/worker-files/thread/testWorker.js',
1725 { workerChoiceStrategy }
1726 )
1727 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1728 useDynamicWorker: true
1729 })
1730 // We need to clean up the resources after our test
1731 await pool.destroy()
1732 })
1733
1734 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1735 const workerChoiceStrategy =
1736 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1737 let pool = new FixedThreadPool(
1738 max,
1739 './tests/worker-files/thread/testWorker.js',
1740 { workerChoiceStrategy }
1741 )
1742 expect(
1743 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1744 ).toStrictEqual({
1745 runTime: {
1746 aggregate: false,
1747 average: false,
1748 median: false
1749 },
1750 waitTime: {
1751 aggregate: false,
1752 average: false,
1753 median: false
1754 },
1755 elu: {
1756 aggregate: false,
1757 average: false,
1758 median: false
1759 }
1760 })
1761 await pool.destroy()
1762 pool = new DynamicThreadPool(
1763 min,
1764 max,
1765 './tests/worker-files/thread/testWorker.js',
1766 { workerChoiceStrategy }
1767 )
1768 expect(
1769 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1770 ).toStrictEqual({
1771 runTime: {
1772 aggregate: false,
1773 average: false,
1774 median: false
1775 },
1776 waitTime: {
1777 aggregate: false,
1778 average: false,
1779 median: false
1780 },
1781 elu: {
1782 aggregate: false,
1783 average: false,
1784 median: false
1785 }
1786 })
1787 // We need to clean up the resources after our test
1788 await pool.destroy()
1789 })
1790
1791 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1792 const pool = new FixedThreadPool(
1793 max,
1794 './tests/worker-files/thread/testWorker.js',
1795 {
1796 workerChoiceStrategy:
1797 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1798 }
1799 )
1800 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1801 const promises = new Set()
1802 const maxMultiplier = 2
1803 for (let i = 0; i < max * maxMultiplier; i++) {
1804 promises.add(pool.execute())
1805 }
1806 await Promise.all(promises)
1807 for (const workerNode of pool.workerNodes) {
1808 expect(workerNode.workerUsage).toStrictEqual({
1809 tasks: {
1810 executed: maxMultiplier,
1811 executing: 0,
1812 queued: 0,
1813 failed: 0
1814 },
1815 runTime: {
1816 aggregate: 0,
1817 average: 0,
1818 median: 0,
1819 history: expect.any(CircularArray)
1820 },
1821 waitTime: {
1822 aggregate: 0,
1823 average: 0,
1824 median: 0,
1825 history: expect.any(CircularArray)
1826 },
1827 elu: {
1828 idle: {
1829 aggregate: 0,
1830 average: 0,
1831 median: 0,
1832 history: expect.any(CircularArray)
1833 },
1834 active: {
1835 aggregate: 0,
1836 average: 0,
1837 median: 0,
1838 history: expect.any(CircularArray)
1839 },
1840 utilization: 0
1841 }
1842 })
1843 }
1844 expect(
1845 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1846 pool.workerChoiceStrategyContext.workerChoiceStrategy
1847 ).defaultWorkerWeight
1848 ).toBeGreaterThan(0)
1849 expect(
1850 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1851 pool.workerChoiceStrategyContext.workerChoiceStrategy
1852 ).currentRoundId
1853 ).toBe(0)
1854 expect(
1855 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1856 pool.workerChoiceStrategyContext.workerChoiceStrategy
1857 ).currentWorkerNodeId
1858 ).toBe(0)
1859 expect(
1860 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1861 pool.workerChoiceStrategyContext.workerChoiceStrategy
1862 ).roundWeights
1863 ).toStrictEqual([
1864 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1865 pool.workerChoiceStrategyContext.workerChoiceStrategy
1866 ).defaultWorkerWeight
1867 ])
1868 // We need to clean up the resources after our test
1869 await pool.destroy()
1870 })
1871
1872 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1873 const pool = new DynamicThreadPool(
1874 min,
1875 max,
1876 './tests/worker-files/thread/testWorker.js',
1877 {
1878 workerChoiceStrategy:
1879 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1880 }
1881 )
1882 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1883 const promises = new Set()
1884 const maxMultiplier = 2
1885 for (let i = 0; i < max * maxMultiplier; i++) {
1886 promises.add(pool.execute())
1887 }
1888 await Promise.all(promises)
1889 for (const workerNode of pool.workerNodes) {
1890 expect(workerNode.workerUsage).toStrictEqual({
1891 tasks: {
1892 executed: maxMultiplier,
1893 executing: 0,
1894 queued: 0,
1895 failed: 0
1896 },
1897 runTime: {
1898 aggregate: 0,
1899 average: 0,
1900 median: 0,
1901 history: expect.any(CircularArray)
1902 },
1903 waitTime: {
1904 aggregate: 0,
1905 average: 0,
1906 median: 0,
1907 history: expect.any(CircularArray)
1908 },
1909 elu: {
1910 idle: {
1911 aggregate: 0,
1912 average: 0,
1913 median: 0,
1914 history: expect.any(CircularArray)
1915 },
1916 active: {
1917 aggregate: 0,
1918 average: 0,
1919 median: 0,
1920 history: expect.any(CircularArray)
1921 },
1922 utilization: 0
1923 }
1924 })
1925 }
1926 expect(
1927 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1928 pool.workerChoiceStrategyContext.workerChoiceStrategy
1929 ).defaultWorkerWeight
1930 ).toBeGreaterThan(0)
1931 expect(
1932 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1933 pool.workerChoiceStrategyContext.workerChoiceStrategy
1934 ).currentRoundId
1935 ).toBe(0)
1936 expect(
1937 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1938 pool.workerChoiceStrategyContext.workerChoiceStrategy
1939 ).currentWorkerNodeId
1940 ).toBe(0)
1941 expect(
1942 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1943 pool.workerChoiceStrategyContext.workerChoiceStrategy
1944 ).roundWeights
1945 ).toStrictEqual([
1946 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1947 pool.workerChoiceStrategyContext.workerChoiceStrategy
1948 ).defaultWorkerWeight
1949 ])
1950 // We need to clean up the resources after our test
1951 await pool.destroy()
1952 })
1953
1954 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1955 const workerChoiceStrategy =
1956 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1957 let pool = new FixedThreadPool(
1958 max,
1959 './tests/worker-files/thread/testWorker.js'
1960 )
1961 expect(
1962 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1963 workerChoiceStrategy
1964 ).currentRoundId
1965 ).toBeDefined()
1966 expect(
1967 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1968 workerChoiceStrategy
1969 ).currentWorkerNodeId
1970 ).toBeDefined()
1971 expect(
1972 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1973 workerChoiceStrategy
1974 ).defaultWorkerWeight
1975 ).toBeDefined()
1976 expect(
1977 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1978 workerChoiceStrategy
1979 ).roundWeights
1980 ).toBeDefined()
1981 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1982 expect(
1983 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1984 workerChoiceStrategy
1985 ).currentRoundId
1986 ).toBe(0)
1987 expect(
1988 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1989 pool.workerChoiceStrategyContext.workerChoiceStrategy
1990 ).currentWorkerNodeId
1991 ).toBe(0)
1992 expect(
1993 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1994 pool.workerChoiceStrategyContext.workerChoiceStrategy
1995 ).defaultWorkerWeight
1996 ).toBeGreaterThan(0)
1997 expect(
1998 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1999 workerChoiceStrategy
2000 ).roundWeights
2001 ).toStrictEqual([
2002 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2003 pool.workerChoiceStrategyContext.workerChoiceStrategy
2004 ).defaultWorkerWeight
2005 ])
2006 await pool.destroy()
2007 pool = new DynamicThreadPool(
2008 min,
2009 max,
2010 './tests/worker-files/thread/testWorker.js'
2011 )
2012 expect(
2013 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2014 workerChoiceStrategy
2015 ).currentRoundId
2016 ).toBeDefined()
2017 expect(
2018 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2019 workerChoiceStrategy
2020 ).currentWorkerNodeId
2021 ).toBeDefined()
2022 expect(
2023 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2024 workerChoiceStrategy
2025 ).defaultWorkerWeight
2026 ).toBeDefined()
2027 expect(
2028 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2029 workerChoiceStrategy
2030 ).roundWeights
2031 ).toBeDefined()
2032 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2033 expect(
2034 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2035 pool.workerChoiceStrategyContext.workerChoiceStrategy
2036 ).currentWorkerNodeId
2037 ).toBe(0)
2038 expect(
2039 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2040 pool.workerChoiceStrategyContext.workerChoiceStrategy
2041 ).defaultWorkerWeight
2042 ).toBeGreaterThan(0)
2043 expect(
2044 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2045 workerChoiceStrategy
2046 ).roundWeights
2047 ).toStrictEqual([
2048 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2049 pool.workerChoiceStrategyContext.workerChoiceStrategy
2050 ).defaultWorkerWeight
2051 ])
2052 // We need to clean up the resources after our test
2053 await pool.destroy()
2054 })
2055
2056 it('Verify unknown strategy throw error', () => {
2057 expect(
2058 () =>
2059 new DynamicThreadPool(
2060 min,
2061 max,
2062 './tests/worker-files/thread/testWorker.js',
2063 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2064 )
2065 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2066 })
2067 })