fix: test for worker file existence
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 WorkerChoiceStrategies
7 } = require('../../../lib')
8 const { CircularArray } = require('../../../lib/circular-array')
9
10 describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeKey
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).nextWorkerNodeKey
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 useDynamicWorker: true
127 })
128 await pool.destroy()
129 pool = new DynamicThreadPool(
130 min,
131 max,
132 './tests/worker-files/thread/testWorker.js',
133 { workerChoiceStrategy }
134 )
135 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
136 useDynamicWorker: true
137 })
138 // We need to clean up the resources after our test
139 await pool.destroy()
140 })
141
142 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
143 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
144 let pool = new FixedThreadPool(
145 max,
146 './tests/worker-files/thread/testWorker.js',
147 { workerChoiceStrategy }
148 )
149 expect(
150 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
151 ).toStrictEqual({
152 runTime: {
153 aggregate: false,
154 average: false,
155 median: false
156 },
157 waitTime: {
158 aggregate: false,
159 average: false,
160 median: false
161 },
162 elu: {
163 aggregate: false,
164 average: false,
165 median: false
166 }
167 })
168 await pool.destroy()
169 pool = new DynamicThreadPool(
170 min,
171 max,
172 './tests/worker-files/thread/testWorker.js',
173 { workerChoiceStrategy }
174 )
175 expect(
176 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
177 ).toStrictEqual({
178 runTime: {
179 aggregate: false,
180 average: false,
181 median: false
182 },
183 waitTime: {
184 aggregate: false,
185 average: false,
186 median: false
187 },
188 elu: {
189 aggregate: false,
190 average: false,
191 median: false
192 }
193 })
194 // We need to clean up the resources after our test
195 await pool.destroy()
196 })
197
198 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
199 const pool = new FixedThreadPool(
200 max,
201 './tests/worker-files/thread/testWorker.js',
202 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
203 )
204 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
205 const promises = new Set()
206 const maxMultiplier = 2
207 for (let i = 0; i < max * maxMultiplier; i++) {
208 promises.add(pool.execute())
209 }
210 await Promise.all(promises)
211 for (const workerNode of pool.workerNodes) {
212 expect(workerNode.usage).toStrictEqual({
213 tasks: {
214 executed: maxMultiplier,
215 executing: 0,
216 queued: 0,
217 maxQueued: 0,
218 failed: 0
219 },
220 runTime: {
221 history: expect.any(CircularArray)
222 },
223 waitTime: {
224 history: expect.any(CircularArray)
225 },
226 elu: {
227 idle: {
228 history: expect.any(CircularArray)
229 },
230 active: {
231 history: expect.any(CircularArray)
232 }
233 }
234 })
235 }
236 expect(
237 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
238 WorkerChoiceStrategies.ROUND_ROBIN
239 ).nextWorkerNodeKey
240 ).toBe(0)
241 // We need to clean up the resources after our test
242 await pool.destroy()
243 })
244
245 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
246 const pool = new DynamicThreadPool(
247 min,
248 max,
249 './tests/worker-files/thread/testWorker.js',
250 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
251 )
252 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
253 const promises = new Set()
254 const maxMultiplier = 2
255 for (let i = 0; i < max * maxMultiplier; i++) {
256 promises.add(pool.execute())
257 }
258 await Promise.all(promises)
259 for (const workerNode of pool.workerNodes) {
260 expect(workerNode.usage).toStrictEqual({
261 tasks: {
262 executed: maxMultiplier,
263 executing: 0,
264 queued: 0,
265 maxQueued: 0,
266 failed: 0
267 },
268 runTime: {
269 history: expect.any(CircularArray)
270 },
271 waitTime: {
272 history: expect.any(CircularArray)
273 },
274 elu: {
275 idle: {
276 history: expect.any(CircularArray)
277 },
278 active: {
279 history: expect.any(CircularArray)
280 }
281 }
282 })
283 }
284 expect(
285 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
286 WorkerChoiceStrategies.ROUND_ROBIN
287 ).nextWorkerNodeKey
288 ).toBe(0)
289 // We need to clean up the resources after our test
290 await pool.destroy()
291 })
292
293 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
294 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
295 let pool = new FixedClusterPool(
296 max,
297 './tests/worker-files/cluster/testWorker.js',
298 { workerChoiceStrategy }
299 )
300 let results = new Set()
301 for (let i = 0; i < max; i++) {
302 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
303 }
304 expect(results.size).toBe(max)
305 await pool.destroy()
306 pool = new FixedThreadPool(
307 max,
308 './tests/worker-files/thread/testWorker.js',
309 { workerChoiceStrategy }
310 )
311 results = new Set()
312 for (let i = 0; i < max; i++) {
313 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
314 }
315 expect(results.size).toBe(max)
316 await pool.destroy()
317 })
318
319 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
320 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
321 let pool = new FixedThreadPool(
322 max,
323 './tests/worker-files/thread/testWorker.js',
324 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
325 )
326 expect(
327 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
328 workerChoiceStrategy
329 ).nextWorkerNodeKey
330 ).toBeDefined()
331 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
332 expect(
333 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
334 pool.workerChoiceStrategyContext.workerChoiceStrategy
335 ).nextWorkerNodeKey
336 ).toBe(0)
337 await pool.destroy()
338 pool = new DynamicThreadPool(
339 min,
340 max,
341 './tests/worker-files/thread/testWorker.js',
342 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
343 )
344 expect(
345 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
346 workerChoiceStrategy
347 ).nextWorkerNodeKey
348 ).toBeDefined()
349 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
350 expect(
351 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
352 pool.workerChoiceStrategyContext.workerChoiceStrategy
353 ).nextWorkerNodeKey
354 ).toBe(0)
355 // We need to clean up the resources after our test
356 await pool.destroy()
357 })
358
359 it('Verify LEAST_USED strategy default policy', async () => {
360 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
361 let pool = new FixedThreadPool(
362 max,
363 './tests/worker-files/thread/testWorker.js',
364 { workerChoiceStrategy }
365 )
366 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
367 useDynamicWorker: false
368 })
369 await pool.destroy()
370 pool = new DynamicThreadPool(
371 min,
372 max,
373 './tests/worker-files/thread/testWorker.js',
374 { workerChoiceStrategy }
375 )
376 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
377 useDynamicWorker: false
378 })
379 // We need to clean up the resources after our test
380 await pool.destroy()
381 })
382
383 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
385 let pool = new FixedThreadPool(
386 max,
387 './tests/worker-files/thread/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 expect(
391 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 ).toStrictEqual({
393 runTime: {
394 aggregate: false,
395 average: false,
396 median: false
397 },
398 waitTime: {
399 aggregate: false,
400 average: false,
401 median: false
402 },
403 elu: {
404 aggregate: false,
405 average: false,
406 median: false
407 }
408 })
409 await pool.destroy()
410 pool = new DynamicThreadPool(
411 min,
412 max,
413 './tests/worker-files/thread/testWorker.js',
414 { workerChoiceStrategy }
415 )
416 expect(
417 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 ).toStrictEqual({
419 runTime: {
420 aggregate: false,
421 average: false,
422 median: false
423 },
424 waitTime: {
425 aggregate: false,
426 average: false,
427 median: false
428 },
429 elu: {
430 aggregate: false,
431 average: false,
432 median: false
433 }
434 })
435 // We need to clean up the resources after our test
436 await pool.destroy()
437 })
438
439 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
440 const pool = new FixedThreadPool(
441 max,
442 './tests/worker-files/thread/testWorker.js',
443 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
444 )
445 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
446 const promises = new Set()
447 const maxMultiplier = 2
448 for (let i = 0; i < max * maxMultiplier; i++) {
449 promises.add(pool.execute())
450 }
451 await Promise.all(promises)
452 for (const workerNode of pool.workerNodes) {
453 expect(workerNode.usage).toStrictEqual({
454 tasks: {
455 executed: expect.any(Number),
456 executing: 0,
457 queued: 0,
458 maxQueued: 0,
459 failed: 0
460 },
461 runTime: {
462 history: expect.any(CircularArray)
463 },
464 waitTime: {
465 history: expect.any(CircularArray)
466 },
467 elu: {
468 idle: {
469 history: expect.any(CircularArray)
470 },
471 active: {
472 history: expect.any(CircularArray)
473 }
474 }
475 })
476 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
477 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
478 max * maxMultiplier
479 )
480 }
481 // We need to clean up the resources after our test
482 await pool.destroy()
483 })
484
485 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
486 const pool = new DynamicThreadPool(
487 min,
488 max,
489 './tests/worker-files/thread/testWorker.js',
490 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
491 )
492 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
493 const promises = new Set()
494 const maxMultiplier = 2
495 for (let i = 0; i < max * maxMultiplier; i++) {
496 promises.add(pool.execute())
497 }
498 await Promise.all(promises)
499 for (const workerNode of pool.workerNodes) {
500 expect(workerNode.usage).toStrictEqual({
501 tasks: {
502 executed: expect.any(Number),
503 executing: 0,
504 queued: 0,
505 maxQueued: 0,
506 failed: 0
507 },
508 runTime: {
509 history: expect.any(CircularArray)
510 },
511 waitTime: {
512 history: expect.any(CircularArray)
513 },
514 elu: {
515 idle: {
516 history: expect.any(CircularArray)
517 },
518 active: {
519 history: expect.any(CircularArray)
520 }
521 }
522 })
523 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
524 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
525 max * maxMultiplier
526 )
527 }
528 // We need to clean up the resources after our test
529 await pool.destroy()
530 })
531
532 it('Verify LEAST_BUSY strategy default policy', async () => {
533 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
534 let pool = new FixedThreadPool(
535 max,
536 './tests/worker-files/thread/testWorker.js',
537 { workerChoiceStrategy }
538 )
539 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
540 useDynamicWorker: false
541 })
542 await pool.destroy()
543 pool = new DynamicThreadPool(
544 min,
545 max,
546 './tests/worker-files/thread/testWorker.js',
547 { workerChoiceStrategy }
548 )
549 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
550 useDynamicWorker: false
551 })
552 // We need to clean up the resources after our test
553 await pool.destroy()
554 })
555
556 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
557 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
558 let pool = new FixedThreadPool(
559 max,
560 './tests/worker-files/thread/testWorker.js',
561 { workerChoiceStrategy }
562 )
563 expect(
564 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
565 ).toStrictEqual({
566 runTime: {
567 aggregate: true,
568 average: false,
569 median: false
570 },
571 waitTime: {
572 aggregate: true,
573 average: false,
574 median: false
575 },
576 elu: {
577 aggregate: false,
578 average: false,
579 median: false
580 }
581 })
582 await pool.destroy()
583 pool = new DynamicThreadPool(
584 min,
585 max,
586 './tests/worker-files/thread/testWorker.js',
587 { workerChoiceStrategy }
588 )
589 expect(
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
591 ).toStrictEqual({
592 runTime: {
593 aggregate: true,
594 average: false,
595 median: false
596 },
597 waitTime: {
598 aggregate: true,
599 average: false,
600 median: false
601 },
602 elu: {
603 aggregate: false,
604 average: false,
605 median: false
606 }
607 })
608 // We need to clean up the resources after our test
609 await pool.destroy()
610 })
611
612 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
613 const pool = new FixedThreadPool(
614 max,
615 './tests/worker-files/thread/testWorker.js',
616 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
617 )
618 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
619 const promises = new Set()
620 const maxMultiplier = 2
621 for (let i = 0; i < max * maxMultiplier; i++) {
622 promises.add(pool.execute())
623 }
624 await Promise.all(promises)
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.usage).toMatchObject({
627 tasks: {
628 executed: expect.any(Number),
629 executing: 0,
630 queued: 0,
631 maxQueued: 0,
632 failed: 0
633 },
634 runTime: {
635 history: expect.any(CircularArray)
636 },
637 waitTime: {
638 history: expect.any(CircularArray)
639 },
640 elu: {
641 idle: {
642 history: expect.any(CircularArray)
643 },
644 active: {
645 history: expect.any(CircularArray)
646 }
647 }
648 })
649 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
650 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
651 max * maxMultiplier
652 )
653 if (workerNode.usage.runTime.aggregate == null) {
654 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
655 } else {
656 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
657 }
658 if (workerNode.usage.waitTime.aggregate == null) {
659 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
660 } else {
661 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
662 }
663 }
664 // We need to clean up the resources after our test
665 await pool.destroy()
666 })
667
668 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
669 const pool = new DynamicThreadPool(
670 min,
671 max,
672 './tests/worker-files/thread/testWorker.js',
673 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
674 )
675 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
676 const promises = new Set()
677 const maxMultiplier = 2
678 for (let i = 0; i < max * maxMultiplier; i++) {
679 promises.add(pool.execute())
680 }
681 await Promise.all(promises)
682 for (const workerNode of pool.workerNodes) {
683 expect(workerNode.usage).toMatchObject({
684 tasks: {
685 executed: expect.any(Number),
686 executing: 0,
687 queued: 0,
688 maxQueued: 0,
689 failed: 0
690 },
691 runTime: {
692 history: expect.any(CircularArray)
693 },
694 waitTime: {
695 history: expect.any(CircularArray)
696 },
697 elu: {
698 idle: {
699 history: expect.any(CircularArray)
700 },
701 active: {
702 history: expect.any(CircularArray)
703 }
704 }
705 })
706 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
707 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
708 max * maxMultiplier
709 )
710 if (workerNode.usage.runTime.aggregate == null) {
711 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
712 } else {
713 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
714 }
715 if (workerNode.usage.waitTime.aggregate == null) {
716 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
717 } else {
718 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
719 }
720 }
721 // We need to clean up the resources after our test
722 await pool.destroy()
723 })
724
725 it('Verify LEAST_ELU strategy default policy', async () => {
726 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
727 let pool = new FixedThreadPool(
728 max,
729 './tests/worker-files/thread/testWorker.js',
730 { workerChoiceStrategy }
731 )
732 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
733 useDynamicWorker: false
734 })
735 await pool.destroy()
736 pool = new DynamicThreadPool(
737 min,
738 max,
739 './tests/worker-files/thread/testWorker.js',
740 { workerChoiceStrategy }
741 )
742 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
743 useDynamicWorker: false
744 })
745 // We need to clean up the resources after our test
746 await pool.destroy()
747 })
748
749 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
750 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
751 let pool = new FixedThreadPool(
752 max,
753 './tests/worker-files/thread/testWorker.js',
754 { workerChoiceStrategy }
755 )
756 expect(
757 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
758 ).toStrictEqual({
759 runTime: {
760 aggregate: false,
761 average: false,
762 median: false
763 },
764 waitTime: {
765 aggregate: false,
766 average: false,
767 median: false
768 },
769 elu: {
770 aggregate: true,
771 average: false,
772 median: false
773 }
774 })
775 await pool.destroy()
776 pool = new DynamicThreadPool(
777 min,
778 max,
779 './tests/worker-files/thread/testWorker.js',
780 { workerChoiceStrategy }
781 )
782 expect(
783 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
784 ).toStrictEqual({
785 runTime: {
786 aggregate: false,
787 average: false,
788 median: false
789 },
790 waitTime: {
791 aggregate: false,
792 average: false,
793 median: false
794 },
795 elu: {
796 aggregate: true,
797 average: false,
798 median: false
799 }
800 })
801 // We need to clean up the resources after our test
802 await pool.destroy()
803 })
804
805 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
806 const pool = new FixedThreadPool(
807 max,
808 './tests/worker-files/thread/testWorker.js',
809 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
810 )
811 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
812 const promises = new Set()
813 const maxMultiplier = 2
814 for (let i = 0; i < max * maxMultiplier; i++) {
815 promises.add(pool.execute())
816 }
817 await Promise.all(promises)
818 for (const workerNode of pool.workerNodes) {
819 expect(workerNode.usage).toMatchObject({
820 tasks: {
821 executed: expect.any(Number),
822 executing: 0,
823 queued: 0,
824 maxQueued: 0,
825 failed: 0
826 },
827 runTime: {
828 history: expect.any(CircularArray)
829 },
830 waitTime: {
831 history: expect.any(CircularArray)
832 },
833 elu: {
834 idle: expect.objectContaining({
835 history: expect.any(CircularArray)
836 }),
837 active: expect.objectContaining({
838 history: expect.any(CircularArray)
839 })
840 }
841 })
842 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
843 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
844 max * maxMultiplier
845 )
846 if (workerNode.usage.elu.utilization == null) {
847 expect(workerNode.usage.elu.utilization).toBeUndefined()
848 } else {
849 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
850 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
851 }
852 }
853 // We need to clean up the resources after our test
854 await pool.destroy()
855 })
856
857 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
858 const pool = new DynamicThreadPool(
859 min,
860 max,
861 './tests/worker-files/thread/testWorker.js',
862 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
863 )
864 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
865 const promises = new Set()
866 const maxMultiplier = 2
867 for (let i = 0; i < max * maxMultiplier; i++) {
868 promises.add(pool.execute())
869 }
870 await Promise.all(promises)
871 for (const workerNode of pool.workerNodes) {
872 expect(workerNode.usage).toMatchObject({
873 tasks: {
874 executed: expect.any(Number),
875 executing: 0,
876 queued: 0,
877 maxQueued: 0,
878 failed: 0
879 },
880 runTime: {
881 history: expect.any(CircularArray)
882 },
883 waitTime: {
884 history: expect.any(CircularArray)
885 },
886 elu: {
887 idle: expect.objectContaining({
888 history: expect.any(CircularArray)
889 }),
890 active: expect.objectContaining({
891 history: expect.any(CircularArray)
892 })
893 }
894 })
895 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
896 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
897 max * maxMultiplier
898 )
899 if (workerNode.usage.elu.utilization == null) {
900 expect(workerNode.usage.elu.utilization).toBeUndefined()
901 } else {
902 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
903 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
904 }
905 }
906 // We need to clean up the resources after our test
907 await pool.destroy()
908 })
909
910 it('Verify FAIR_SHARE strategy default policy', async () => {
911 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
912 let pool = new FixedThreadPool(
913 max,
914 './tests/worker-files/thread/testWorker.js',
915 { workerChoiceStrategy }
916 )
917 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
918 useDynamicWorker: false
919 })
920 await pool.destroy()
921 pool = new DynamicThreadPool(
922 min,
923 max,
924 './tests/worker-files/thread/testWorker.js',
925 { workerChoiceStrategy }
926 )
927 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
928 useDynamicWorker: false
929 })
930 // We need to clean up the resources after our test
931 await pool.destroy()
932 })
933
934 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
935 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
936 let pool = new FixedThreadPool(
937 max,
938 './tests/worker-files/thread/testWorker.js',
939 { workerChoiceStrategy }
940 )
941 expect(
942 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
943 ).toStrictEqual({
944 runTime: {
945 aggregate: true,
946 average: true,
947 median: false
948 },
949 waitTime: {
950 aggregate: false,
951 average: false,
952 median: false
953 },
954 elu: {
955 aggregate: true,
956 average: true,
957 median: false
958 }
959 })
960 await pool.destroy()
961 pool = new DynamicThreadPool(
962 min,
963 max,
964 './tests/worker-files/thread/testWorker.js',
965 { workerChoiceStrategy }
966 )
967 expect(
968 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
969 ).toStrictEqual({
970 runTime: {
971 aggregate: true,
972 average: true,
973 median: false
974 },
975 waitTime: {
976 aggregate: false,
977 average: false,
978 median: false
979 },
980 elu: {
981 aggregate: true,
982 average: true,
983 median: false
984 }
985 })
986 // We need to clean up the resources after our test
987 await pool.destroy()
988 })
989
990 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
991 const pool = new FixedThreadPool(
992 max,
993 './tests/worker-files/thread/testWorker.js',
994 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
995 )
996 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
997 const promises = new Set()
998 const maxMultiplier = 2
999 for (let i = 0; i < max * maxMultiplier; i++) {
1000 promises.add(pool.execute())
1001 }
1002 await Promise.all(promises)
1003 for (const workerNode of pool.workerNodes) {
1004 expect(workerNode.usage).toMatchObject({
1005 tasks: {
1006 executed: expect.any(Number),
1007 executing: 0,
1008 queued: 0,
1009 maxQueued: 0,
1010 failed: 0
1011 },
1012 runTime: expect.objectContaining({
1013 history: expect.any(CircularArray)
1014 }),
1015 waitTime: {
1016 history: expect.any(CircularArray)
1017 },
1018 elu: {
1019 idle: expect.objectContaining({
1020 history: expect.any(CircularArray)
1021 }),
1022 active: expect.objectContaining({
1023 history: expect.any(CircularArray)
1024 })
1025 }
1026 })
1027 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1028 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1029 max * maxMultiplier
1030 )
1031 if (workerNode.usage.runTime.aggregate == null) {
1032 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1033 } else {
1034 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1035 }
1036 if (workerNode.usage.runTime.average == null) {
1037 expect(workerNode.usage.runTime.average).toBeUndefined()
1038 } else {
1039 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1040 }
1041 if (workerNode.usage.elu.utilization == null) {
1042 expect(workerNode.usage.elu.utilization).toBeUndefined()
1043 } else {
1044 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1045 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1046 }
1047 }
1048 expect(
1049 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1050 pool.workerChoiceStrategyContext.workerChoiceStrategy
1051 ).workersVirtualTaskEndTimestamp.length
1052 ).toBe(pool.workerNodes.length)
1053 // We need to clean up the resources after our test
1054 await pool.destroy()
1055 })
1056
1057 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1058 const pool = new DynamicThreadPool(
1059 min,
1060 max,
1061 './tests/worker-files/thread/testWorker.js',
1062 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1063 )
1064 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1065 const promises = new Set()
1066 const maxMultiplier = 2
1067 for (let i = 0; i < max * maxMultiplier; i++) {
1068 promises.add(pool.execute())
1069 }
1070 await Promise.all(promises)
1071 for (const workerNode of pool.workerNodes) {
1072 expect(workerNode.usage).toMatchObject({
1073 tasks: {
1074 executed: expect.any(Number),
1075 executing: 0,
1076 queued: 0,
1077 maxQueued: 0,
1078 failed: 0
1079 },
1080 runTime: expect.objectContaining({
1081 history: expect.any(CircularArray)
1082 }),
1083 waitTime: {
1084 history: expect.any(CircularArray)
1085 },
1086 elu: {
1087 idle: expect.objectContaining({
1088 history: expect.any(CircularArray)
1089 }),
1090 active: expect.objectContaining({
1091 history: expect.any(CircularArray)
1092 })
1093 }
1094 })
1095 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1096 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1097 max * maxMultiplier
1098 )
1099 if (workerNode.usage.runTime.aggregate == null) {
1100 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1101 } else {
1102 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1103 }
1104 if (workerNode.usage.runTime.average == null) {
1105 expect(workerNode.usage.runTime.average).toBeUndefined()
1106 } else {
1107 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1108 }
1109 if (workerNode.usage.elu.utilization == null) {
1110 expect(workerNode.usage.elu.utilization).toBeUndefined()
1111 } else {
1112 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1113 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1114 }
1115 }
1116 expect(
1117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1118 pool.workerChoiceStrategyContext.workerChoiceStrategy
1119 ).workersVirtualTaskEndTimestamp.length
1120 ).toBe(pool.workerNodes.length)
1121 // We need to clean up the resources after our test
1122 await pool.destroy()
1123 })
1124
1125 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1126 const pool = new DynamicThreadPool(
1127 min,
1128 max,
1129 './tests/worker-files/thread/testWorker.js',
1130 {
1131 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1132 workerChoiceStrategyOptions: {
1133 runTime: { median: true }
1134 }
1135 }
1136 )
1137 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1138 const promises = new Set()
1139 const maxMultiplier = 2
1140 for (let i = 0; i < max * maxMultiplier; i++) {
1141 promises.add(pool.execute())
1142 }
1143 await Promise.all(promises)
1144 for (const workerNode of pool.workerNodes) {
1145 expect(workerNode.usage).toMatchObject({
1146 tasks: {
1147 executed: expect.any(Number),
1148 executing: 0,
1149 queued: 0,
1150 maxQueued: 0,
1151 failed: 0
1152 },
1153 runTime: expect.objectContaining({
1154 history: expect.any(CircularArray)
1155 }),
1156 waitTime: {
1157 history: expect.any(CircularArray)
1158 },
1159 elu: {
1160 idle: expect.objectContaining({
1161 history: expect.any(CircularArray)
1162 }),
1163 active: expect.objectContaining({
1164 history: expect.any(CircularArray)
1165 })
1166 }
1167 })
1168 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1169 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1170 max * maxMultiplier
1171 )
1172 if (workerNode.usage.runTime.aggregate == null) {
1173 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1174 } else {
1175 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1176 }
1177 if (workerNode.usage.runTime.median == null) {
1178 expect(workerNode.usage.runTime.median).toBeUndefined()
1179 } else {
1180 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1181 }
1182 if (workerNode.usage.elu.utilization == null) {
1183 expect(workerNode.usage.elu.utilization).toBeUndefined()
1184 } else {
1185 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1186 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1187 }
1188 }
1189 expect(
1190 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1191 pool.workerChoiceStrategyContext.workerChoiceStrategy
1192 ).workersVirtualTaskEndTimestamp.length
1193 ).toBe(pool.workerNodes.length)
1194 // We need to clean up the resources after our test
1195 await pool.destroy()
1196 })
1197
1198 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1199 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1200 let pool = new FixedThreadPool(
1201 max,
1202 './tests/worker-files/thread/testWorker.js'
1203 )
1204 expect(
1205 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1206 workerChoiceStrategy
1207 ).workersVirtualTaskEndTimestamp
1208 ).toBeInstanceOf(Array)
1209 expect(
1210 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1211 workerChoiceStrategy
1212 ).workersVirtualTaskEndTimestamp.length
1213 ).toBe(0)
1214 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1215 workerChoiceStrategy
1216 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1217 expect(
1218 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1219 workerChoiceStrategy
1220 ).workersVirtualTaskEndTimestamp.length
1221 ).toBe(1)
1222 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1223 expect(
1224 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1225 workerChoiceStrategy
1226 ).workersVirtualTaskEndTimestamp
1227 ).toBeInstanceOf(Array)
1228 expect(
1229 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1230 workerChoiceStrategy
1231 ).workersVirtualTaskEndTimestamp.length
1232 ).toBe(0)
1233 await pool.destroy()
1234 pool = new DynamicThreadPool(
1235 min,
1236 max,
1237 './tests/worker-files/thread/testWorker.js'
1238 )
1239 expect(
1240 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1241 workerChoiceStrategy
1242 ).workersVirtualTaskEndTimestamp
1243 ).toBeInstanceOf(Array)
1244 expect(
1245 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1246 workerChoiceStrategy
1247 ).workersVirtualTaskEndTimestamp.length
1248 ).toBe(0)
1249 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1250 workerChoiceStrategy
1251 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1252 expect(
1253 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1254 workerChoiceStrategy
1255 ).workersVirtualTaskEndTimestamp.length
1256 ).toBe(1)
1257 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1258 expect(
1259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1260 workerChoiceStrategy
1261 ).workersVirtualTaskEndTimestamp
1262 ).toBeInstanceOf(Array)
1263 expect(
1264 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1265 workerChoiceStrategy
1266 ).workersVirtualTaskEndTimestamp.length
1267 ).toBe(0)
1268 // We need to clean up the resources after our test
1269 await pool.destroy()
1270 })
1271
1272 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1273 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1274 let pool = new FixedThreadPool(
1275 max,
1276 './tests/worker-files/thread/testWorker.js',
1277 { workerChoiceStrategy }
1278 )
1279 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1280 useDynamicWorker: true
1281 })
1282 await pool.destroy()
1283 pool = new DynamicThreadPool(
1284 min,
1285 max,
1286 './tests/worker-files/thread/testWorker.js',
1287 { workerChoiceStrategy }
1288 )
1289 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1290 useDynamicWorker: true
1291 })
1292 // We need to clean up the resources after our test
1293 await pool.destroy()
1294 })
1295
1296 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1297 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1298 let pool = new FixedThreadPool(
1299 max,
1300 './tests/worker-files/thread/testWorker.js',
1301 { workerChoiceStrategy }
1302 )
1303 expect(
1304 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1305 ).toStrictEqual({
1306 runTime: {
1307 aggregate: true,
1308 average: true,
1309 median: false
1310 },
1311 waitTime: {
1312 aggregate: false,
1313 average: false,
1314 median: false
1315 },
1316 elu: {
1317 aggregate: false,
1318 average: false,
1319 median: false
1320 }
1321 })
1322 await pool.destroy()
1323 pool = new DynamicThreadPool(
1324 min,
1325 max,
1326 './tests/worker-files/thread/testWorker.js',
1327 { workerChoiceStrategy }
1328 )
1329 expect(
1330 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1331 ).toStrictEqual({
1332 runTime: {
1333 aggregate: true,
1334 average: true,
1335 median: false
1336 },
1337 waitTime: {
1338 aggregate: false,
1339 average: false,
1340 median: false
1341 },
1342 elu: {
1343 aggregate: false,
1344 average: false,
1345 median: false
1346 }
1347 })
1348 // We need to clean up the resources after our test
1349 await pool.destroy()
1350 })
1351
1352 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1353 const pool = new FixedThreadPool(
1354 max,
1355 './tests/worker-files/thread/testWorker.js',
1356 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1357 )
1358 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1359 const promises = new Set()
1360 const maxMultiplier = 2
1361 for (let i = 0; i < max * maxMultiplier; i++) {
1362 promises.add(pool.execute())
1363 }
1364 await Promise.all(promises)
1365 for (const workerNode of pool.workerNodes) {
1366 expect(workerNode.usage).toStrictEqual({
1367 tasks: {
1368 executed: expect.any(Number),
1369 executing: 0,
1370 queued: 0,
1371 maxQueued: 0,
1372 failed: 0
1373 },
1374 runTime: expect.objectContaining({
1375 history: expect.any(CircularArray)
1376 }),
1377 waitTime: {
1378 history: expect.any(CircularArray)
1379 },
1380 elu: {
1381 idle: {
1382 history: expect.any(CircularArray)
1383 },
1384 active: {
1385 history: expect.any(CircularArray)
1386 }
1387 }
1388 })
1389 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1390 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1391 max * maxMultiplier
1392 )
1393 if (workerNode.usage.runTime.aggregate == null) {
1394 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1395 } else {
1396 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1397 }
1398 if (workerNode.usage.runTime.average == null) {
1399 expect(workerNode.usage.runTime.average).toBeUndefined()
1400 } else {
1401 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1402 }
1403 }
1404 expect(
1405 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1406 pool.workerChoiceStrategyContext.workerChoiceStrategy
1407 ).defaultWorkerWeight
1408 ).toBeGreaterThan(0)
1409 expect(
1410 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1411 pool.workerChoiceStrategyContext.workerChoiceStrategy
1412 ).workerVirtualTaskRunTime
1413 ).toBeGreaterThanOrEqual(0)
1414 // We need to clean up the resources after our test
1415 await pool.destroy()
1416 })
1417
1418 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1419 const pool = new DynamicThreadPool(
1420 min,
1421 max,
1422 './tests/worker-files/thread/testWorker.js',
1423 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1424 )
1425 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1426 const promises = new Set()
1427 const maxMultiplier = 2
1428 for (let i = 0; i < max * maxMultiplier; i++) {
1429 promises.add(pool.execute())
1430 }
1431 await Promise.all(promises)
1432 for (const workerNode of pool.workerNodes) {
1433 expect(workerNode.usage).toStrictEqual({
1434 tasks: {
1435 executed: expect.any(Number),
1436 executing: 0,
1437 queued: 0,
1438 maxQueued: 0,
1439 failed: 0
1440 },
1441 runTime: {
1442 aggregate: expect.any(Number),
1443 maximum: expect.any(Number),
1444 minimum: expect.any(Number),
1445 average: expect.any(Number),
1446 history: expect.any(CircularArray)
1447 },
1448 waitTime: {
1449 history: expect.any(CircularArray)
1450 },
1451 elu: {
1452 idle: {
1453 history: expect.any(CircularArray)
1454 },
1455 active: {
1456 history: expect.any(CircularArray)
1457 }
1458 }
1459 })
1460 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1461 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1462 max * maxMultiplier
1463 )
1464 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1465 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1466 }
1467 expect(
1468 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1469 pool.workerChoiceStrategyContext.workerChoiceStrategy
1470 ).defaultWorkerWeight
1471 ).toBeGreaterThan(0)
1472 expect(
1473 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1474 pool.workerChoiceStrategyContext.workerChoiceStrategy
1475 ).workerVirtualTaskRunTime
1476 ).toBeGreaterThanOrEqual(0)
1477 // We need to clean up the resources after our test
1478 await pool.destroy()
1479 })
1480
1481 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1482 const pool = new DynamicThreadPool(
1483 min,
1484 max,
1485 './tests/worker-files/thread/testWorker.js',
1486 {
1487 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1488 workerChoiceStrategyOptions: {
1489 runTime: { median: true }
1490 }
1491 }
1492 )
1493 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1494 const promises = new Set()
1495 const maxMultiplier = 2
1496 for (let i = 0; i < max * maxMultiplier; i++) {
1497 promises.add(pool.execute())
1498 }
1499 await Promise.all(promises)
1500 for (const workerNode of pool.workerNodes) {
1501 expect(workerNode.usage).toStrictEqual({
1502 tasks: {
1503 executed: expect.any(Number),
1504 executing: 0,
1505 queued: 0,
1506 maxQueued: 0,
1507 failed: 0
1508 },
1509 runTime: {
1510 aggregate: expect.any(Number),
1511 maximum: expect.any(Number),
1512 minimum: expect.any(Number),
1513 median: expect.any(Number),
1514 history: expect.any(CircularArray)
1515 },
1516 waitTime: {
1517 history: expect.any(CircularArray)
1518 },
1519 elu: {
1520 idle: {
1521 history: expect.any(CircularArray)
1522 },
1523 active: {
1524 history: expect.any(CircularArray)
1525 }
1526 }
1527 })
1528 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1529 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1530 max * maxMultiplier
1531 )
1532 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1533 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1534 }
1535 expect(
1536 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1537 pool.workerChoiceStrategyContext.workerChoiceStrategy
1538 ).defaultWorkerWeight
1539 ).toBeGreaterThan(0)
1540 expect(
1541 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1542 pool.workerChoiceStrategyContext.workerChoiceStrategy
1543 ).workerVirtualTaskRunTime
1544 ).toBeGreaterThanOrEqual(0)
1545 // We need to clean up the resources after our test
1546 await pool.destroy()
1547 })
1548
1549 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1550 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1551 let pool = new FixedThreadPool(
1552 max,
1553 './tests/worker-files/thread/testWorker.js'
1554 )
1555 expect(
1556 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1557 workerChoiceStrategy
1558 ).nextWorkerNodeKey
1559 ).toBeDefined()
1560 expect(
1561 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1562 workerChoiceStrategy
1563 ).defaultWorkerWeight
1564 ).toBeDefined()
1565 expect(
1566 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1567 workerChoiceStrategy
1568 ).workerVirtualTaskRunTime
1569 ).toBeDefined()
1570 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1571 expect(
1572 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1573 pool.workerChoiceStrategyContext.workerChoiceStrategy
1574 ).nextWorkerNodeKey
1575 ).toBe(0)
1576 expect(
1577 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1578 pool.workerChoiceStrategyContext.workerChoiceStrategy
1579 ).defaultWorkerWeight
1580 ).toBeGreaterThan(0)
1581 expect(
1582 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1583 workerChoiceStrategy
1584 ).workerVirtualTaskRunTime
1585 ).toBe(0)
1586 await pool.destroy()
1587 pool = new DynamicThreadPool(
1588 min,
1589 max,
1590 './tests/worker-files/thread/testWorker.js'
1591 )
1592 expect(
1593 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1594 workerChoiceStrategy
1595 ).nextWorkerNodeKey
1596 ).toBeDefined()
1597 expect(
1598 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1599 workerChoiceStrategy
1600 ).defaultWorkerWeight
1601 ).toBeDefined()
1602 expect(
1603 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1604 workerChoiceStrategy
1605 ).workerVirtualTaskRunTime
1606 ).toBeDefined()
1607 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1608 expect(
1609 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1610 pool.workerChoiceStrategyContext.workerChoiceStrategy
1611 ).nextWorkerNodeKey
1612 ).toBe(0)
1613 expect(
1614 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1615 pool.workerChoiceStrategyContext.workerChoiceStrategy
1616 ).defaultWorkerWeight
1617 ).toBeGreaterThan(0)
1618 expect(
1619 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1620 workerChoiceStrategy
1621 ).workerVirtualTaskRunTime
1622 ).toBe(0)
1623 // We need to clean up the resources after our test
1624 await pool.destroy()
1625 })
1626
1627 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1628 const workerChoiceStrategy =
1629 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1630 let pool = new FixedThreadPool(
1631 max,
1632 './tests/worker-files/thread/testWorker.js',
1633 { workerChoiceStrategy }
1634 )
1635 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1636 useDynamicWorker: true
1637 })
1638 await pool.destroy()
1639 pool = new DynamicThreadPool(
1640 min,
1641 max,
1642 './tests/worker-files/thread/testWorker.js',
1643 { workerChoiceStrategy }
1644 )
1645 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1646 useDynamicWorker: true
1647 })
1648 // We need to clean up the resources after our test
1649 await pool.destroy()
1650 })
1651
1652 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1653 const workerChoiceStrategy =
1654 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1655 let pool = new FixedThreadPool(
1656 max,
1657 './tests/worker-files/thread/testWorker.js',
1658 { workerChoiceStrategy }
1659 )
1660 expect(
1661 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1662 ).toStrictEqual({
1663 runTime: {
1664 aggregate: false,
1665 average: false,
1666 median: false
1667 },
1668 waitTime: {
1669 aggregate: false,
1670 average: false,
1671 median: false
1672 },
1673 elu: {
1674 aggregate: false,
1675 average: false,
1676 median: false
1677 }
1678 })
1679 await pool.destroy()
1680 pool = new DynamicThreadPool(
1681 min,
1682 max,
1683 './tests/worker-files/thread/testWorker.js',
1684 { workerChoiceStrategy }
1685 )
1686 expect(
1687 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1688 ).toStrictEqual({
1689 runTime: {
1690 aggregate: false,
1691 average: false,
1692 median: false
1693 },
1694 waitTime: {
1695 aggregate: false,
1696 average: false,
1697 median: false
1698 },
1699 elu: {
1700 aggregate: false,
1701 average: false,
1702 median: false
1703 }
1704 })
1705 // We need to clean up the resources after our test
1706 await pool.destroy()
1707 })
1708
1709 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1710 const pool = new FixedThreadPool(
1711 max,
1712 './tests/worker-files/thread/testWorker.js',
1713 {
1714 workerChoiceStrategy:
1715 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1716 }
1717 )
1718 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1719 const promises = new Set()
1720 const maxMultiplier = 2
1721 for (let i = 0; i < max * maxMultiplier; i++) {
1722 promises.add(pool.execute())
1723 }
1724 await Promise.all(promises)
1725 for (const workerNode of pool.workerNodes) {
1726 expect(workerNode.usage).toStrictEqual({
1727 tasks: {
1728 executed: maxMultiplier,
1729 executing: 0,
1730 queued: 0,
1731 maxQueued: 0,
1732 failed: 0
1733 },
1734 runTime: {
1735 history: expect.any(CircularArray)
1736 },
1737 waitTime: {
1738 history: expect.any(CircularArray)
1739 },
1740 elu: {
1741 idle: {
1742 history: expect.any(CircularArray)
1743 },
1744 active: {
1745 history: expect.any(CircularArray)
1746 }
1747 }
1748 })
1749 }
1750 expect(
1751 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1752 pool.workerChoiceStrategyContext.workerChoiceStrategy
1753 ).defaultWorkerWeight
1754 ).toBeGreaterThan(0)
1755 expect(
1756 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1757 pool.workerChoiceStrategyContext.workerChoiceStrategy
1758 ).roundId
1759 ).toBe(0)
1760 expect(
1761 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1762 pool.workerChoiceStrategyContext.workerChoiceStrategy
1763 ).nextWorkerNodeKey
1764 ).toBe(0)
1765 expect(
1766 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1767 pool.workerChoiceStrategyContext.workerChoiceStrategy
1768 ).roundWeights
1769 ).toStrictEqual([
1770 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1771 pool.workerChoiceStrategyContext.workerChoiceStrategy
1772 ).defaultWorkerWeight
1773 ])
1774 // We need to clean up the resources after our test
1775 await pool.destroy()
1776 })
1777
1778 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1779 const pool = new DynamicThreadPool(
1780 min,
1781 max,
1782 './tests/worker-files/thread/testWorker.js',
1783 {
1784 workerChoiceStrategy:
1785 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1786 }
1787 )
1788 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1789 const promises = new Set()
1790 const maxMultiplier = 2
1791 for (let i = 0; i < max * maxMultiplier; i++) {
1792 promises.add(pool.execute())
1793 }
1794 await Promise.all(promises)
1795 for (const workerNode of pool.workerNodes) {
1796 expect(workerNode.usage).toStrictEqual({
1797 tasks: {
1798 executed: maxMultiplier,
1799 executing: 0,
1800 queued: 0,
1801 maxQueued: 0,
1802 failed: 0
1803 },
1804 runTime: {
1805 history: expect.any(CircularArray)
1806 },
1807 waitTime: {
1808 history: expect.any(CircularArray)
1809 },
1810 elu: {
1811 idle: {
1812 history: expect.any(CircularArray)
1813 },
1814 active: {
1815 history: expect.any(CircularArray)
1816 }
1817 }
1818 })
1819 }
1820 expect(
1821 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1822 pool.workerChoiceStrategyContext.workerChoiceStrategy
1823 ).defaultWorkerWeight
1824 ).toBeGreaterThan(0)
1825 expect(
1826 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1827 pool.workerChoiceStrategyContext.workerChoiceStrategy
1828 ).roundId
1829 ).toBe(0)
1830 expect(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1832 pool.workerChoiceStrategyContext.workerChoiceStrategy
1833 ).nextWorkerNodeKey
1834 ).toBe(0)
1835 expect(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 pool.workerChoiceStrategyContext.workerChoiceStrategy
1838 ).roundWeights
1839 ).toStrictEqual([
1840 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1841 pool.workerChoiceStrategyContext.workerChoiceStrategy
1842 ).defaultWorkerWeight
1843 ])
1844 // We need to clean up the resources after our test
1845 await pool.destroy()
1846 })
1847
1848 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1849 const workerChoiceStrategy =
1850 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1851 let pool = new FixedThreadPool(
1852 max,
1853 './tests/worker-files/thread/testWorker.js'
1854 )
1855 expect(
1856 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1857 workerChoiceStrategy
1858 ).roundId
1859 ).toBeDefined()
1860 expect(
1861 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1862 workerChoiceStrategy
1863 ).nextWorkerNodeKey
1864 ).toBeDefined()
1865 expect(
1866 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1867 workerChoiceStrategy
1868 ).defaultWorkerWeight
1869 ).toBeDefined()
1870 expect(
1871 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1872 workerChoiceStrategy
1873 ).roundWeights
1874 ).toBeDefined()
1875 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1876 expect(
1877 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1878 workerChoiceStrategy
1879 ).roundId
1880 ).toBe(0)
1881 expect(
1882 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategy
1884 ).nextWorkerNodeKey
1885 ).toBe(0)
1886 expect(
1887 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1888 pool.workerChoiceStrategyContext.workerChoiceStrategy
1889 ).defaultWorkerWeight
1890 ).toBeGreaterThan(0)
1891 expect(
1892 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1893 workerChoiceStrategy
1894 ).roundWeights
1895 ).toStrictEqual([
1896 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1897 pool.workerChoiceStrategyContext.workerChoiceStrategy
1898 ).defaultWorkerWeight
1899 ])
1900 await pool.destroy()
1901 pool = new DynamicThreadPool(
1902 min,
1903 max,
1904 './tests/worker-files/thread/testWorker.js'
1905 )
1906 expect(
1907 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1908 workerChoiceStrategy
1909 ).roundId
1910 ).toBeDefined()
1911 expect(
1912 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1913 workerChoiceStrategy
1914 ).nextWorkerNodeKey
1915 ).toBeDefined()
1916 expect(
1917 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1918 workerChoiceStrategy
1919 ).defaultWorkerWeight
1920 ).toBeDefined()
1921 expect(
1922 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1923 workerChoiceStrategy
1924 ).roundWeights
1925 ).toBeDefined()
1926 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1927 expect(
1928 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1929 pool.workerChoiceStrategyContext.workerChoiceStrategy
1930 ).nextWorkerNodeKey
1931 ).toBe(0)
1932 expect(
1933 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1934 pool.workerChoiceStrategyContext.workerChoiceStrategy
1935 ).defaultWorkerWeight
1936 ).toBeGreaterThan(0)
1937 expect(
1938 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1939 workerChoiceStrategy
1940 ).roundWeights
1941 ).toStrictEqual([
1942 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1943 pool.workerChoiceStrategyContext.workerChoiceStrategy
1944 ).defaultWorkerWeight
1945 ])
1946 // We need to clean up the resources after our test
1947 await pool.destroy()
1948 })
1949
1950 it('Verify unknown strategy throw error', () => {
1951 expect(
1952 () =>
1953 new DynamicThreadPool(
1954 min,
1955 max,
1956 './tests/worker-files/thread/testWorker.js',
1957 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
1958 )
1959 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
1960 })
1961 })