fix: fix worker choice strategy retries mechanism on some edge cases
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 WorkerChoiceStrategies
7 } = require('../../../lib')
8 const { CircularArray } = require('../../../lib/circular-array')
9
10 describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeKey
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).nextWorkerNodeKey
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 dynamicWorkerUsage: true,
127 dynamicWorkerReady: true
128 })
129 await pool.destroy()
130 pool = new DynamicThreadPool(
131 min,
132 max,
133 './tests/worker-files/thread/testWorker.js',
134 { workerChoiceStrategy }
135 )
136 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
137 dynamicWorkerUsage: true,
138 dynamicWorkerReady: true
139 })
140 // We need to clean up the resources after our test
141 await pool.destroy()
142 })
143
144 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
145 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
146 let pool = new FixedThreadPool(
147 max,
148 './tests/worker-files/thread/testWorker.js',
149 { workerChoiceStrategy }
150 )
151 expect(
152 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
153 ).toStrictEqual({
154 runTime: {
155 aggregate: false,
156 average: false,
157 median: false
158 },
159 waitTime: {
160 aggregate: false,
161 average: false,
162 median: false
163 },
164 elu: {
165 aggregate: false,
166 average: false,
167 median: false
168 }
169 })
170 await pool.destroy()
171 pool = new DynamicThreadPool(
172 min,
173 max,
174 './tests/worker-files/thread/testWorker.js',
175 { workerChoiceStrategy }
176 )
177 expect(
178 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
179 ).toStrictEqual({
180 runTime: {
181 aggregate: false,
182 average: false,
183 median: false
184 },
185 waitTime: {
186 aggregate: false,
187 average: false,
188 median: false
189 },
190 elu: {
191 aggregate: false,
192 average: false,
193 median: false
194 }
195 })
196 // We need to clean up the resources after our test
197 await pool.destroy()
198 })
199
200 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
201 const pool = new FixedThreadPool(
202 max,
203 './tests/worker-files/thread/testWorker.js',
204 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
205 )
206 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
207 const promises = new Set()
208 const maxMultiplier = 2
209 for (let i = 0; i < max * maxMultiplier; i++) {
210 promises.add(pool.execute())
211 }
212 await Promise.all(promises)
213 for (const workerNode of pool.workerNodes) {
214 expect(workerNode.usage).toStrictEqual({
215 tasks: {
216 executed: maxMultiplier,
217 executing: 0,
218 queued: 0,
219 maxQueued: 0,
220 failed: 0
221 },
222 runTime: {
223 history: expect.any(CircularArray)
224 },
225 waitTime: {
226 history: expect.any(CircularArray)
227 },
228 elu: {
229 idle: {
230 history: expect.any(CircularArray)
231 },
232 active: {
233 history: expect.any(CircularArray)
234 }
235 }
236 })
237 }
238 expect(
239 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
240 WorkerChoiceStrategies.ROUND_ROBIN
241 ).nextWorkerNodeKey
242 ).toBe(0)
243 // We need to clean up the resources after our test
244 await pool.destroy()
245 })
246
247 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
248 const pool = new DynamicThreadPool(
249 min,
250 max,
251 './tests/worker-files/thread/testWorker.js',
252 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
253 )
254 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
255 const promises = new Set()
256 const maxMultiplier = 2
257 for (let i = 0; i < max * maxMultiplier; i++) {
258 promises.add(pool.execute())
259 }
260 await Promise.all(promises)
261 for (const workerNode of pool.workerNodes) {
262 expect(workerNode.usage).toStrictEqual({
263 tasks: {
264 executed: maxMultiplier,
265 executing: 0,
266 queued: 0,
267 maxQueued: 0,
268 failed: 0
269 },
270 runTime: {
271 history: expect.any(CircularArray)
272 },
273 waitTime: {
274 history: expect.any(CircularArray)
275 },
276 elu: {
277 idle: {
278 history: expect.any(CircularArray)
279 },
280 active: {
281 history: expect.any(CircularArray)
282 }
283 }
284 })
285 }
286 expect(
287 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
288 WorkerChoiceStrategies.ROUND_ROBIN
289 ).nextWorkerNodeKey
290 ).toBe(0)
291 // We need to clean up the resources after our test
292 await pool.destroy()
293 })
294
295 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
296 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
297 let pool = new FixedClusterPool(
298 max,
299 './tests/worker-files/cluster/testWorker.js',
300 { workerChoiceStrategy }
301 )
302 let results = new Set()
303 for (let i = 0; i < max; i++) {
304 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
305 }
306 expect(results.size).toBe(max)
307 await pool.destroy()
308 pool = new FixedThreadPool(
309 max,
310 './tests/worker-files/thread/testWorker.js',
311 { workerChoiceStrategy }
312 )
313 results = new Set()
314 for (let i = 0; i < max; i++) {
315 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
316 }
317 expect(results.size).toBe(max)
318 await pool.destroy()
319 })
320
321 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
322 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
323 let pool = new FixedThreadPool(
324 max,
325 './tests/worker-files/thread/testWorker.js',
326 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
327 )
328 expect(
329 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
330 workerChoiceStrategy
331 ).nextWorkerNodeKey
332 ).toBeDefined()
333 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
334 expect(
335 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
336 pool.workerChoiceStrategyContext.workerChoiceStrategy
337 ).nextWorkerNodeKey
338 ).toBe(0)
339 await pool.destroy()
340 pool = new DynamicThreadPool(
341 min,
342 max,
343 './tests/worker-files/thread/testWorker.js',
344 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
345 )
346 expect(
347 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
348 workerChoiceStrategy
349 ).nextWorkerNodeKey
350 ).toBeDefined()
351 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
352 expect(
353 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
354 pool.workerChoiceStrategyContext.workerChoiceStrategy
355 ).nextWorkerNodeKey
356 ).toBe(0)
357 // We need to clean up the resources after our test
358 await pool.destroy()
359 })
360
361 it('Verify LEAST_USED strategy default policy', async () => {
362 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
363 let pool = new FixedThreadPool(
364 max,
365 './tests/worker-files/thread/testWorker.js',
366 { workerChoiceStrategy }
367 )
368 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
369 dynamicWorkerUsage: false,
370 dynamicWorkerReady: true
371 })
372 await pool.destroy()
373 pool = new DynamicThreadPool(
374 min,
375 max,
376 './tests/worker-files/thread/testWorker.js',
377 { workerChoiceStrategy }
378 )
379 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
380 dynamicWorkerUsage: false,
381 dynamicWorkerReady: true
382 })
383 // We need to clean up the resources after our test
384 await pool.destroy()
385 })
386
387 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
388 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
389 let pool = new FixedThreadPool(
390 max,
391 './tests/worker-files/thread/testWorker.js',
392 { workerChoiceStrategy }
393 )
394 expect(
395 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
396 ).toStrictEqual({
397 runTime: {
398 aggregate: false,
399 average: false,
400 median: false
401 },
402 waitTime: {
403 aggregate: false,
404 average: false,
405 median: false
406 },
407 elu: {
408 aggregate: false,
409 average: false,
410 median: false
411 }
412 })
413 await pool.destroy()
414 pool = new DynamicThreadPool(
415 min,
416 max,
417 './tests/worker-files/thread/testWorker.js',
418 { workerChoiceStrategy }
419 )
420 expect(
421 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
422 ).toStrictEqual({
423 runTime: {
424 aggregate: false,
425 average: false,
426 median: false
427 },
428 waitTime: {
429 aggregate: false,
430 average: false,
431 median: false
432 },
433 elu: {
434 aggregate: false,
435 average: false,
436 median: false
437 }
438 })
439 // We need to clean up the resources after our test
440 await pool.destroy()
441 })
442
443 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
444 const pool = new FixedThreadPool(
445 max,
446 './tests/worker-files/thread/testWorker.js',
447 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
448 )
449 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
450 const promises = new Set()
451 const maxMultiplier = 2
452 for (let i = 0; i < max * maxMultiplier; i++) {
453 promises.add(pool.execute())
454 }
455 await Promise.all(promises)
456 for (const workerNode of pool.workerNodes) {
457 expect(workerNode.usage).toStrictEqual({
458 tasks: {
459 executed: expect.any(Number),
460 executing: 0,
461 queued: 0,
462 maxQueued: 0,
463 failed: 0
464 },
465 runTime: {
466 history: expect.any(CircularArray)
467 },
468 waitTime: {
469 history: expect.any(CircularArray)
470 },
471 elu: {
472 idle: {
473 history: expect.any(CircularArray)
474 },
475 active: {
476 history: expect.any(CircularArray)
477 }
478 }
479 })
480 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
481 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
482 max * maxMultiplier
483 )
484 }
485 // We need to clean up the resources after our test
486 await pool.destroy()
487 })
488
489 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
490 const pool = new DynamicThreadPool(
491 min,
492 max,
493 './tests/worker-files/thread/testWorker.js',
494 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
495 )
496 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
497 const promises = new Set()
498 const maxMultiplier = 2
499 for (let i = 0; i < max * maxMultiplier; i++) {
500 promises.add(pool.execute())
501 }
502 await Promise.all(promises)
503 for (const workerNode of pool.workerNodes) {
504 expect(workerNode.usage).toStrictEqual({
505 tasks: {
506 executed: expect.any(Number),
507 executing: 0,
508 queued: 0,
509 maxQueued: 0,
510 failed: 0
511 },
512 runTime: {
513 history: expect.any(CircularArray)
514 },
515 waitTime: {
516 history: expect.any(CircularArray)
517 },
518 elu: {
519 idle: {
520 history: expect.any(CircularArray)
521 },
522 active: {
523 history: expect.any(CircularArray)
524 }
525 }
526 })
527 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
528 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
529 max * maxMultiplier
530 )
531 }
532 // We need to clean up the resources after our test
533 await pool.destroy()
534 })
535
536 it('Verify LEAST_BUSY strategy default policy', async () => {
537 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
538 let pool = new FixedThreadPool(
539 max,
540 './tests/worker-files/thread/testWorker.js',
541 { workerChoiceStrategy }
542 )
543 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
544 dynamicWorkerUsage: false,
545 dynamicWorkerReady: true
546 })
547 await pool.destroy()
548 pool = new DynamicThreadPool(
549 min,
550 max,
551 './tests/worker-files/thread/testWorker.js',
552 { workerChoiceStrategy }
553 )
554 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
555 dynamicWorkerUsage: false,
556 dynamicWorkerReady: true
557 })
558 // We need to clean up the resources after our test
559 await pool.destroy()
560 })
561
562 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
563 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
564 let pool = new FixedThreadPool(
565 max,
566 './tests/worker-files/thread/testWorker.js',
567 { workerChoiceStrategy }
568 )
569 expect(
570 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
571 ).toStrictEqual({
572 runTime: {
573 aggregate: true,
574 average: false,
575 median: false
576 },
577 waitTime: {
578 aggregate: true,
579 average: false,
580 median: false
581 },
582 elu: {
583 aggregate: false,
584 average: false,
585 median: false
586 }
587 })
588 await pool.destroy()
589 pool = new DynamicThreadPool(
590 min,
591 max,
592 './tests/worker-files/thread/testWorker.js',
593 { workerChoiceStrategy }
594 )
595 expect(
596 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
597 ).toStrictEqual({
598 runTime: {
599 aggregate: true,
600 average: false,
601 median: false
602 },
603 waitTime: {
604 aggregate: true,
605 average: false,
606 median: false
607 },
608 elu: {
609 aggregate: false,
610 average: false,
611 median: false
612 }
613 })
614 // We need to clean up the resources after our test
615 await pool.destroy()
616 })
617
618 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
619 const pool = new FixedThreadPool(
620 max,
621 './tests/worker-files/thread/testWorker.js',
622 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
623 )
624 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
625 const promises = new Set()
626 const maxMultiplier = 2
627 for (let i = 0; i < max * maxMultiplier; i++) {
628 promises.add(pool.execute())
629 }
630 await Promise.all(promises)
631 for (const workerNode of pool.workerNodes) {
632 expect(workerNode.usage).toMatchObject({
633 tasks: {
634 executed: expect.any(Number),
635 executing: 0,
636 queued: 0,
637 maxQueued: 0,
638 failed: 0
639 },
640 runTime: {
641 history: expect.any(CircularArray)
642 },
643 waitTime: {
644 history: expect.any(CircularArray)
645 },
646 elu: {
647 idle: {
648 history: expect.any(CircularArray)
649 },
650 active: {
651 history: expect.any(CircularArray)
652 }
653 }
654 })
655 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
656 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
657 max * maxMultiplier
658 )
659 if (workerNode.usage.runTime.aggregate == null) {
660 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
661 } else {
662 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
663 }
664 if (workerNode.usage.waitTime.aggregate == null) {
665 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
666 } else {
667 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
668 }
669 }
670 // We need to clean up the resources after our test
671 await pool.destroy()
672 })
673
674 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
675 const pool = new DynamicThreadPool(
676 min,
677 max,
678 './tests/worker-files/thread/testWorker.js',
679 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
680 )
681 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
682 const promises = new Set()
683 const maxMultiplier = 2
684 for (let i = 0; i < max * maxMultiplier; i++) {
685 promises.add(pool.execute())
686 }
687 await Promise.all(promises)
688 for (const workerNode of pool.workerNodes) {
689 expect(workerNode.usage).toMatchObject({
690 tasks: {
691 executed: expect.any(Number),
692 executing: 0,
693 queued: 0,
694 maxQueued: 0,
695 failed: 0
696 },
697 runTime: {
698 history: expect.any(CircularArray)
699 },
700 waitTime: {
701 history: expect.any(CircularArray)
702 },
703 elu: {
704 idle: {
705 history: expect.any(CircularArray)
706 },
707 active: {
708 history: expect.any(CircularArray)
709 }
710 }
711 })
712 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
713 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
714 max * maxMultiplier
715 )
716 if (workerNode.usage.runTime.aggregate == null) {
717 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
718 } else {
719 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
720 }
721 if (workerNode.usage.waitTime.aggregate == null) {
722 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
723 } else {
724 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
725 }
726 }
727 // We need to clean up the resources after our test
728 await pool.destroy()
729 })
730
731 it('Verify LEAST_ELU strategy default policy', async () => {
732 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
733 let pool = new FixedThreadPool(
734 max,
735 './tests/worker-files/thread/testWorker.js',
736 { workerChoiceStrategy }
737 )
738 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
739 dynamicWorkerUsage: false,
740 dynamicWorkerReady: true
741 })
742 await pool.destroy()
743 pool = new DynamicThreadPool(
744 min,
745 max,
746 './tests/worker-files/thread/testWorker.js',
747 { workerChoiceStrategy }
748 )
749 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
750 dynamicWorkerUsage: false,
751 dynamicWorkerReady: true
752 })
753 // We need to clean up the resources after our test
754 await pool.destroy()
755 })
756
757 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
758 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
759 let pool = new FixedThreadPool(
760 max,
761 './tests/worker-files/thread/testWorker.js',
762 { workerChoiceStrategy }
763 )
764 expect(
765 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
766 ).toStrictEqual({
767 runTime: {
768 aggregate: false,
769 average: false,
770 median: false
771 },
772 waitTime: {
773 aggregate: false,
774 average: false,
775 median: false
776 },
777 elu: {
778 aggregate: true,
779 average: false,
780 median: false
781 }
782 })
783 await pool.destroy()
784 pool = new DynamicThreadPool(
785 min,
786 max,
787 './tests/worker-files/thread/testWorker.js',
788 { workerChoiceStrategy }
789 )
790 expect(
791 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
792 ).toStrictEqual({
793 runTime: {
794 aggregate: false,
795 average: false,
796 median: false
797 },
798 waitTime: {
799 aggregate: false,
800 average: false,
801 median: false
802 },
803 elu: {
804 aggregate: true,
805 average: false,
806 median: false
807 }
808 })
809 // We need to clean up the resources after our test
810 await pool.destroy()
811 })
812
813 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
814 const pool = new FixedThreadPool(
815 max,
816 './tests/worker-files/thread/testWorker.js',
817 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
818 )
819 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
820 const promises = new Set()
821 const maxMultiplier = 2
822 for (let i = 0; i < max * maxMultiplier; i++) {
823 promises.add(pool.execute())
824 }
825 await Promise.all(promises)
826 for (const workerNode of pool.workerNodes) {
827 expect(workerNode.usage).toMatchObject({
828 tasks: {
829 executed: expect.any(Number),
830 executing: 0,
831 queued: 0,
832 maxQueued: 0,
833 failed: 0
834 },
835 runTime: {
836 history: expect.any(CircularArray)
837 },
838 waitTime: {
839 history: expect.any(CircularArray)
840 },
841 elu: {
842 idle: expect.objectContaining({
843 history: expect.any(CircularArray)
844 }),
845 active: expect.objectContaining({
846 history: expect.any(CircularArray)
847 })
848 }
849 })
850 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
851 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
852 max * maxMultiplier
853 )
854 if (workerNode.usage.elu.utilization == null) {
855 expect(workerNode.usage.elu.utilization).toBeUndefined()
856 } else {
857 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
858 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
859 }
860 }
861 // We need to clean up the resources after our test
862 await pool.destroy()
863 })
864
865 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
866 const pool = new DynamicThreadPool(
867 min,
868 max,
869 './tests/worker-files/thread/testWorker.js',
870 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
871 )
872 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
873 const promises = new Set()
874 const maxMultiplier = 2
875 for (let i = 0; i < max * maxMultiplier; i++) {
876 promises.add(pool.execute())
877 }
878 await Promise.all(promises)
879 for (const workerNode of pool.workerNodes) {
880 expect(workerNode.usage).toMatchObject({
881 tasks: {
882 executed: expect.any(Number),
883 executing: 0,
884 queued: 0,
885 maxQueued: 0,
886 failed: 0
887 },
888 runTime: {
889 history: expect.any(CircularArray)
890 },
891 waitTime: {
892 history: expect.any(CircularArray)
893 },
894 elu: {
895 idle: expect.objectContaining({
896 history: expect.any(CircularArray)
897 }),
898 active: expect.objectContaining({
899 history: expect.any(CircularArray)
900 })
901 }
902 })
903 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
904 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
905 max * maxMultiplier
906 )
907 if (workerNode.usage.elu.utilization == null) {
908 expect(workerNode.usage.elu.utilization).toBeUndefined()
909 } else {
910 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
911 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
912 }
913 }
914 // We need to clean up the resources after our test
915 await pool.destroy()
916 })
917
918 it('Verify FAIR_SHARE strategy default policy', async () => {
919 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
920 let pool = new FixedThreadPool(
921 max,
922 './tests/worker-files/thread/testWorker.js',
923 { workerChoiceStrategy }
924 )
925 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
926 dynamicWorkerUsage: false,
927 dynamicWorkerReady: true
928 })
929 await pool.destroy()
930 pool = new DynamicThreadPool(
931 min,
932 max,
933 './tests/worker-files/thread/testWorker.js',
934 { workerChoiceStrategy }
935 )
936 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
937 dynamicWorkerUsage: false,
938 dynamicWorkerReady: true
939 })
940 // We need to clean up the resources after our test
941 await pool.destroy()
942 })
943
944 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
945 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
946 let pool = new FixedThreadPool(
947 max,
948 './tests/worker-files/thread/testWorker.js',
949 { workerChoiceStrategy }
950 )
951 expect(
952 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
953 ).toStrictEqual({
954 runTime: {
955 aggregate: true,
956 average: true,
957 median: false
958 },
959 waitTime: {
960 aggregate: false,
961 average: false,
962 median: false
963 },
964 elu: {
965 aggregate: true,
966 average: true,
967 median: false
968 }
969 })
970 await pool.destroy()
971 pool = new DynamicThreadPool(
972 min,
973 max,
974 './tests/worker-files/thread/testWorker.js',
975 { workerChoiceStrategy }
976 )
977 expect(
978 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
979 ).toStrictEqual({
980 runTime: {
981 aggregate: true,
982 average: true,
983 median: false
984 },
985 waitTime: {
986 aggregate: false,
987 average: false,
988 median: false
989 },
990 elu: {
991 aggregate: true,
992 average: true,
993 median: false
994 }
995 })
996 // We need to clean up the resources after our test
997 await pool.destroy()
998 })
999
1000 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1001 const pool = new FixedThreadPool(
1002 max,
1003 './tests/worker-files/thread/testWorker.js',
1004 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1005 )
1006 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1007 const promises = new Set()
1008 const maxMultiplier = 2
1009 for (let i = 0; i < max * maxMultiplier; i++) {
1010 promises.add(pool.execute())
1011 }
1012 await Promise.all(promises)
1013 for (const workerNode of pool.workerNodes) {
1014 expect(workerNode.usage).toMatchObject({
1015 tasks: {
1016 executed: expect.any(Number),
1017 executing: 0,
1018 queued: 0,
1019 maxQueued: 0,
1020 failed: 0
1021 },
1022 runTime: expect.objectContaining({
1023 history: expect.any(CircularArray)
1024 }),
1025 waitTime: {
1026 history: expect.any(CircularArray)
1027 },
1028 elu: {
1029 idle: expect.objectContaining({
1030 history: expect.any(CircularArray)
1031 }),
1032 active: expect.objectContaining({
1033 history: expect.any(CircularArray)
1034 })
1035 }
1036 })
1037 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1038 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1039 max * maxMultiplier
1040 )
1041 if (workerNode.usage.runTime.aggregate == null) {
1042 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1043 } else {
1044 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1045 }
1046 if (workerNode.usage.runTime.average == null) {
1047 expect(workerNode.usage.runTime.average).toBeUndefined()
1048 } else {
1049 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1050 }
1051 if (workerNode.usage.elu.utilization == null) {
1052 expect(workerNode.usage.elu.utilization).toBeUndefined()
1053 } else {
1054 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1055 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1056 }
1057 }
1058 expect(
1059 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1060 pool.workerChoiceStrategyContext.workerChoiceStrategy
1061 ).workersVirtualTaskEndTimestamp.length
1062 ).toBe(pool.workerNodes.length)
1063 // We need to clean up the resources after our test
1064 await pool.destroy()
1065 })
1066
1067 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1068 const pool = new DynamicThreadPool(
1069 min,
1070 max,
1071 './tests/worker-files/thread/testWorker.js',
1072 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1073 )
1074 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1075 const promises = new Set()
1076 const maxMultiplier = 2
1077 for (let i = 0; i < max * maxMultiplier; i++) {
1078 promises.add(pool.execute())
1079 }
1080 await Promise.all(promises)
1081 for (const workerNode of pool.workerNodes) {
1082 expect(workerNode.usage).toMatchObject({
1083 tasks: {
1084 executed: expect.any(Number),
1085 executing: 0,
1086 queued: 0,
1087 maxQueued: 0,
1088 failed: 0
1089 },
1090 runTime: expect.objectContaining({
1091 history: expect.any(CircularArray)
1092 }),
1093 waitTime: {
1094 history: expect.any(CircularArray)
1095 },
1096 elu: {
1097 idle: expect.objectContaining({
1098 history: expect.any(CircularArray)
1099 }),
1100 active: expect.objectContaining({
1101 history: expect.any(CircularArray)
1102 })
1103 }
1104 })
1105 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1106 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1107 max * maxMultiplier
1108 )
1109 if (workerNode.usage.runTime.aggregate == null) {
1110 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1111 } else {
1112 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1113 }
1114 if (workerNode.usage.runTime.average == null) {
1115 expect(workerNode.usage.runTime.average).toBeUndefined()
1116 } else {
1117 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1118 }
1119 if (workerNode.usage.elu.utilization == null) {
1120 expect(workerNode.usage.elu.utilization).toBeUndefined()
1121 } else {
1122 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1123 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1124 }
1125 }
1126 expect(
1127 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1128 pool.workerChoiceStrategyContext.workerChoiceStrategy
1129 ).workersVirtualTaskEndTimestamp.length
1130 ).toBe(pool.workerNodes.length)
1131 // We need to clean up the resources after our test
1132 await pool.destroy()
1133 })
1134
1135 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1136 const pool = new DynamicThreadPool(
1137 min,
1138 max,
1139 './tests/worker-files/thread/testWorker.js',
1140 {
1141 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1142 workerChoiceStrategyOptions: {
1143 runTime: { median: true }
1144 }
1145 }
1146 )
1147 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1148 const promises = new Set()
1149 const maxMultiplier = 2
1150 for (let i = 0; i < max * maxMultiplier; i++) {
1151 promises.add(pool.execute())
1152 }
1153 await Promise.all(promises)
1154 for (const workerNode of pool.workerNodes) {
1155 expect(workerNode.usage).toMatchObject({
1156 tasks: {
1157 executed: expect.any(Number),
1158 executing: 0,
1159 queued: 0,
1160 maxQueued: 0,
1161 failed: 0
1162 },
1163 runTime: expect.objectContaining({
1164 history: expect.any(CircularArray)
1165 }),
1166 waitTime: {
1167 history: expect.any(CircularArray)
1168 },
1169 elu: {
1170 idle: expect.objectContaining({
1171 history: expect.any(CircularArray)
1172 }),
1173 active: expect.objectContaining({
1174 history: expect.any(CircularArray)
1175 })
1176 }
1177 })
1178 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1179 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1180 max * maxMultiplier
1181 )
1182 if (workerNode.usage.runTime.aggregate == null) {
1183 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1184 } else {
1185 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1186 }
1187 if (workerNode.usage.runTime.median == null) {
1188 expect(workerNode.usage.runTime.median).toBeUndefined()
1189 } else {
1190 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1191 }
1192 if (workerNode.usage.elu.utilization == null) {
1193 expect(workerNode.usage.elu.utilization).toBeUndefined()
1194 } else {
1195 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1196 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1197 }
1198 }
1199 expect(
1200 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1201 pool.workerChoiceStrategyContext.workerChoiceStrategy
1202 ).workersVirtualTaskEndTimestamp.length
1203 ).toBe(pool.workerNodes.length)
1204 // We need to clean up the resources after our test
1205 await pool.destroy()
1206 })
1207
1208 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1209 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1210 let pool = new FixedThreadPool(
1211 max,
1212 './tests/worker-files/thread/testWorker.js'
1213 )
1214 expect(
1215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1216 workerChoiceStrategy
1217 ).workersVirtualTaskEndTimestamp
1218 ).toBeInstanceOf(Array)
1219 expect(
1220 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1221 workerChoiceStrategy
1222 ).workersVirtualTaskEndTimestamp.length
1223 ).toBe(0)
1224 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1225 workerChoiceStrategy
1226 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1227 expect(
1228 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1229 workerChoiceStrategy
1230 ).workersVirtualTaskEndTimestamp.length
1231 ).toBe(1)
1232 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1233 expect(
1234 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1235 workerChoiceStrategy
1236 ).workersVirtualTaskEndTimestamp
1237 ).toBeInstanceOf(Array)
1238 expect(
1239 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1240 workerChoiceStrategy
1241 ).workersVirtualTaskEndTimestamp.length
1242 ).toBe(0)
1243 await pool.destroy()
1244 pool = new DynamicThreadPool(
1245 min,
1246 max,
1247 './tests/worker-files/thread/testWorker.js'
1248 )
1249 expect(
1250 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1251 workerChoiceStrategy
1252 ).workersVirtualTaskEndTimestamp
1253 ).toBeInstanceOf(Array)
1254 expect(
1255 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1256 workerChoiceStrategy
1257 ).workersVirtualTaskEndTimestamp.length
1258 ).toBe(0)
1259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1260 workerChoiceStrategy
1261 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1262 expect(
1263 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1264 workerChoiceStrategy
1265 ).workersVirtualTaskEndTimestamp.length
1266 ).toBe(1)
1267 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1268 expect(
1269 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1270 workerChoiceStrategy
1271 ).workersVirtualTaskEndTimestamp
1272 ).toBeInstanceOf(Array)
1273 expect(
1274 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1275 workerChoiceStrategy
1276 ).workersVirtualTaskEndTimestamp.length
1277 ).toBe(0)
1278 // We need to clean up the resources after our test
1279 await pool.destroy()
1280 })
1281
1282 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1283 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1284 let pool = new FixedThreadPool(
1285 max,
1286 './tests/worker-files/thread/testWorker.js',
1287 { workerChoiceStrategy }
1288 )
1289 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1290 dynamicWorkerUsage: false,
1291 dynamicWorkerReady: true
1292 })
1293 await pool.destroy()
1294 pool = new DynamicThreadPool(
1295 min,
1296 max,
1297 './tests/worker-files/thread/testWorker.js',
1298 { workerChoiceStrategy }
1299 )
1300 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1301 dynamicWorkerUsage: false,
1302 dynamicWorkerReady: true
1303 })
1304 // We need to clean up the resources after our test
1305 await pool.destroy()
1306 })
1307
1308 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1309 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1310 let pool = new FixedThreadPool(
1311 max,
1312 './tests/worker-files/thread/testWorker.js',
1313 { workerChoiceStrategy }
1314 )
1315 expect(
1316 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1317 ).toStrictEqual({
1318 runTime: {
1319 aggregate: true,
1320 average: true,
1321 median: false
1322 },
1323 waitTime: {
1324 aggregate: false,
1325 average: false,
1326 median: false
1327 },
1328 elu: {
1329 aggregate: false,
1330 average: false,
1331 median: false
1332 }
1333 })
1334 await pool.destroy()
1335 pool = new DynamicThreadPool(
1336 min,
1337 max,
1338 './tests/worker-files/thread/testWorker.js',
1339 { workerChoiceStrategy }
1340 )
1341 expect(
1342 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1343 ).toStrictEqual({
1344 runTime: {
1345 aggregate: true,
1346 average: true,
1347 median: false
1348 },
1349 waitTime: {
1350 aggregate: false,
1351 average: false,
1352 median: false
1353 },
1354 elu: {
1355 aggregate: false,
1356 average: false,
1357 median: false
1358 }
1359 })
1360 // We need to clean up the resources after our test
1361 await pool.destroy()
1362 })
1363
1364 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1365 const pool = new FixedThreadPool(
1366 max,
1367 './tests/worker-files/thread/testWorker.js',
1368 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1369 )
1370 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1371 const promises = new Set()
1372 const maxMultiplier = 2
1373 for (let i = 0; i < max * maxMultiplier; i++) {
1374 promises.add(pool.execute())
1375 }
1376 await Promise.all(promises)
1377 for (const workerNode of pool.workerNodes) {
1378 expect(workerNode.usage).toStrictEqual({
1379 tasks: {
1380 executed: expect.any(Number),
1381 executing: 0,
1382 queued: 0,
1383 maxQueued: 0,
1384 failed: 0
1385 },
1386 runTime: expect.objectContaining({
1387 history: expect.any(CircularArray)
1388 }),
1389 waitTime: {
1390 history: expect.any(CircularArray)
1391 },
1392 elu: {
1393 idle: {
1394 history: expect.any(CircularArray)
1395 },
1396 active: {
1397 history: expect.any(CircularArray)
1398 }
1399 }
1400 })
1401 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1402 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1403 max * maxMultiplier
1404 )
1405 if (workerNode.usage.runTime.aggregate == null) {
1406 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1407 } else {
1408 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1409 }
1410 if (workerNode.usage.runTime.average == null) {
1411 expect(workerNode.usage.runTime.average).toBeUndefined()
1412 } else {
1413 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1414 }
1415 }
1416 expect(
1417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1418 pool.workerChoiceStrategyContext.workerChoiceStrategy
1419 ).defaultWorkerWeight
1420 ).toBeGreaterThan(0)
1421 expect(
1422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1423 pool.workerChoiceStrategyContext.workerChoiceStrategy
1424 ).workerVirtualTaskRunTime
1425 ).toBeGreaterThanOrEqual(0)
1426 // We need to clean up the resources after our test
1427 await pool.destroy()
1428 })
1429
1430 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1431 const pool = new DynamicThreadPool(
1432 min,
1433 max,
1434 './tests/worker-files/thread/testWorker.js',
1435 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1436 )
1437 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1438 const promises = new Set()
1439 const maxMultiplier = 2
1440 for (let i = 0; i < max * maxMultiplier; i++) {
1441 promises.add(pool.execute())
1442 }
1443 await Promise.all(promises)
1444 for (const workerNode of pool.workerNodes) {
1445 expect(workerNode.usage).toStrictEqual({
1446 tasks: {
1447 executed: expect.any(Number),
1448 executing: 0,
1449 queued: 0,
1450 maxQueued: 0,
1451 failed: 0
1452 },
1453 runTime: expect.objectContaining({
1454 history: expect.any(CircularArray)
1455 }),
1456 waitTime: {
1457 history: expect.any(CircularArray)
1458 },
1459 elu: {
1460 idle: {
1461 history: expect.any(CircularArray)
1462 },
1463 active: {
1464 history: expect.any(CircularArray)
1465 }
1466 }
1467 })
1468 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1469 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1470 max * maxMultiplier
1471 )
1472 if (workerNode.usage.runTime.aggregate == null) {
1473 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1474 } else {
1475 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1476 }
1477 if (workerNode.usage.runTime.average == null) {
1478 expect(workerNode.usage.runTime.average).toBeUndefined()
1479 } else {
1480 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1481 }
1482 }
1483 expect(
1484 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1485 pool.workerChoiceStrategyContext.workerChoiceStrategy
1486 ).defaultWorkerWeight
1487 ).toBeGreaterThan(0)
1488 expect(
1489 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1490 pool.workerChoiceStrategyContext.workerChoiceStrategy
1491 ).workerVirtualTaskRunTime
1492 ).toBeGreaterThanOrEqual(0)
1493 // We need to clean up the resources after our test
1494 await pool.destroy()
1495 })
1496
1497 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1498 const pool = new DynamicThreadPool(
1499 min,
1500 max,
1501 './tests/worker-files/thread/testWorker.js',
1502 {
1503 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1504 workerChoiceStrategyOptions: {
1505 runTime: { median: true }
1506 }
1507 }
1508 )
1509 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1510 const promises = new Set()
1511 const maxMultiplier = 2
1512 for (let i = 0; i < max * maxMultiplier; i++) {
1513 promises.add(pool.execute())
1514 }
1515 await Promise.all(promises)
1516 for (const workerNode of pool.workerNodes) {
1517 expect(workerNode.usage).toStrictEqual({
1518 tasks: {
1519 executed: expect.any(Number),
1520 executing: 0,
1521 queued: 0,
1522 maxQueued: 0,
1523 failed: 0
1524 },
1525 runTime: expect.objectContaining({
1526 history: expect.any(CircularArray)
1527 }),
1528 waitTime: {
1529 history: expect.any(CircularArray)
1530 },
1531 elu: {
1532 idle: {
1533 history: expect.any(CircularArray)
1534 },
1535 active: {
1536 history: expect.any(CircularArray)
1537 }
1538 }
1539 })
1540 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1541 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1542 max * maxMultiplier
1543 )
1544 if (workerNode.usage.runTime.aggregate == null) {
1545 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1546 } else {
1547 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1548 }
1549 if (workerNode.usage.runTime.median == null) {
1550 expect(workerNode.usage.runTime.median).toBeUndefined()
1551 } else {
1552 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1553 }
1554 }
1555 expect(
1556 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1557 pool.workerChoiceStrategyContext.workerChoiceStrategy
1558 ).defaultWorkerWeight
1559 ).toBeGreaterThan(0)
1560 expect(
1561 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1562 pool.workerChoiceStrategyContext.workerChoiceStrategy
1563 ).workerVirtualTaskRunTime
1564 ).toBeGreaterThanOrEqual(0)
1565 // We need to clean up the resources after our test
1566 await pool.destroy()
1567 })
1568
1569 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1570 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1571 let pool = new FixedThreadPool(
1572 max,
1573 './tests/worker-files/thread/testWorker.js'
1574 )
1575 expect(
1576 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1577 workerChoiceStrategy
1578 ).nextWorkerNodeKey
1579 ).toBeDefined()
1580 expect(
1581 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1582 workerChoiceStrategy
1583 ).defaultWorkerWeight
1584 ).toBeDefined()
1585 expect(
1586 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1587 workerChoiceStrategy
1588 ).workerVirtualTaskRunTime
1589 ).toBeDefined()
1590 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1591 expect(
1592 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1593 pool.workerChoiceStrategyContext.workerChoiceStrategy
1594 ).nextWorkerNodeKey
1595 ).toBe(0)
1596 expect(
1597 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1598 pool.workerChoiceStrategyContext.workerChoiceStrategy
1599 ).defaultWorkerWeight
1600 ).toBeGreaterThan(0)
1601 expect(
1602 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1603 workerChoiceStrategy
1604 ).workerVirtualTaskRunTime
1605 ).toBe(0)
1606 await pool.destroy()
1607 pool = new DynamicThreadPool(
1608 min,
1609 max,
1610 './tests/worker-files/thread/testWorker.js'
1611 )
1612 expect(
1613 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1614 workerChoiceStrategy
1615 ).nextWorkerNodeKey
1616 ).toBeDefined()
1617 expect(
1618 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1619 workerChoiceStrategy
1620 ).defaultWorkerWeight
1621 ).toBeDefined()
1622 expect(
1623 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1624 workerChoiceStrategy
1625 ).workerVirtualTaskRunTime
1626 ).toBeDefined()
1627 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1628 expect(
1629 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1630 pool.workerChoiceStrategyContext.workerChoiceStrategy
1631 ).nextWorkerNodeKey
1632 ).toBe(0)
1633 expect(
1634 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1635 pool.workerChoiceStrategyContext.workerChoiceStrategy
1636 ).defaultWorkerWeight
1637 ).toBeGreaterThan(0)
1638 expect(
1639 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1640 workerChoiceStrategy
1641 ).workerVirtualTaskRunTime
1642 ).toBe(0)
1643 // We need to clean up the resources after our test
1644 await pool.destroy()
1645 })
1646
1647 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1648 const workerChoiceStrategy =
1649 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1650 let pool = new FixedThreadPool(
1651 max,
1652 './tests/worker-files/thread/testWorker.js',
1653 { workerChoiceStrategy }
1654 )
1655 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1656 dynamicWorkerUsage: false,
1657 dynamicWorkerReady: true
1658 })
1659 await pool.destroy()
1660 pool = new DynamicThreadPool(
1661 min,
1662 max,
1663 './tests/worker-files/thread/testWorker.js',
1664 { workerChoiceStrategy }
1665 )
1666 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1667 dynamicWorkerUsage: false,
1668 dynamicWorkerReady: true
1669 })
1670 // We need to clean up the resources after our test
1671 await pool.destroy()
1672 })
1673
1674 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1675 const workerChoiceStrategy =
1676 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1677 let pool = new FixedThreadPool(
1678 max,
1679 './tests/worker-files/thread/testWorker.js',
1680 { workerChoiceStrategy }
1681 )
1682 expect(
1683 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1684 ).toStrictEqual({
1685 runTime: {
1686 aggregate: false,
1687 average: false,
1688 median: false
1689 },
1690 waitTime: {
1691 aggregate: false,
1692 average: false,
1693 median: false
1694 },
1695 elu: {
1696 aggregate: false,
1697 average: false,
1698 median: false
1699 }
1700 })
1701 await pool.destroy()
1702 pool = new DynamicThreadPool(
1703 min,
1704 max,
1705 './tests/worker-files/thread/testWorker.js',
1706 { workerChoiceStrategy }
1707 )
1708 expect(
1709 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1710 ).toStrictEqual({
1711 runTime: {
1712 aggregate: false,
1713 average: false,
1714 median: false
1715 },
1716 waitTime: {
1717 aggregate: false,
1718 average: false,
1719 median: false
1720 },
1721 elu: {
1722 aggregate: false,
1723 average: false,
1724 median: false
1725 }
1726 })
1727 // We need to clean up the resources after our test
1728 await pool.destroy()
1729 })
1730
1731 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1732 const pool = new FixedThreadPool(
1733 max,
1734 './tests/worker-files/thread/testWorker.js',
1735 {
1736 workerChoiceStrategy:
1737 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1738 }
1739 )
1740 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1741 const promises = new Set()
1742 const maxMultiplier = 2
1743 for (let i = 0; i < max * maxMultiplier; i++) {
1744 promises.add(pool.execute())
1745 }
1746 await Promise.all(promises)
1747 for (const workerNode of pool.workerNodes) {
1748 expect(workerNode.usage).toStrictEqual({
1749 tasks: {
1750 executed: maxMultiplier,
1751 executing: 0,
1752 queued: 0,
1753 maxQueued: 0,
1754 failed: 0
1755 },
1756 runTime: {
1757 history: expect.any(CircularArray)
1758 },
1759 waitTime: {
1760 history: expect.any(CircularArray)
1761 },
1762 elu: {
1763 idle: {
1764 history: expect.any(CircularArray)
1765 },
1766 active: {
1767 history: expect.any(CircularArray)
1768 }
1769 }
1770 })
1771 }
1772 expect(
1773 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1774 pool.workerChoiceStrategyContext.workerChoiceStrategy
1775 ).defaultWorkerWeight
1776 ).toBeGreaterThan(0)
1777 expect(
1778 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1779 pool.workerChoiceStrategyContext.workerChoiceStrategy
1780 ).roundId
1781 ).toBe(0)
1782 expect(
1783 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1784 pool.workerChoiceStrategyContext.workerChoiceStrategy
1785 ).nextWorkerNodeKey
1786 ).toBe(0)
1787 expect(
1788 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1789 pool.workerChoiceStrategyContext.workerChoiceStrategy
1790 ).roundWeights
1791 ).toStrictEqual([
1792 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1793 pool.workerChoiceStrategyContext.workerChoiceStrategy
1794 ).defaultWorkerWeight
1795 ])
1796 // We need to clean up the resources after our test
1797 await pool.destroy()
1798 })
1799
1800 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1801 const pool = new DynamicThreadPool(
1802 min,
1803 max,
1804 './tests/worker-files/thread/testWorker.js',
1805 {
1806 workerChoiceStrategy:
1807 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1808 }
1809 )
1810 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1811 const promises = new Set()
1812 const maxMultiplier = 2
1813 for (let i = 0; i < max * maxMultiplier; i++) {
1814 promises.add(pool.execute())
1815 }
1816 await Promise.all(promises)
1817 for (const workerNode of pool.workerNodes) {
1818 expect(workerNode.usage).toStrictEqual({
1819 tasks: {
1820 executed: expect.any(Number),
1821 executing: 0,
1822 queued: 0,
1823 maxQueued: 0,
1824 failed: 0
1825 },
1826 runTime: {
1827 history: expect.any(CircularArray)
1828 },
1829 waitTime: {
1830 history: expect.any(CircularArray)
1831 },
1832 elu: {
1833 idle: {
1834 history: expect.any(CircularArray)
1835 },
1836 active: {
1837 history: expect.any(CircularArray)
1838 }
1839 }
1840 })
1841 }
1842 expect(
1843 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1844 pool.workerChoiceStrategyContext.workerChoiceStrategy
1845 ).defaultWorkerWeight
1846 ).toBeGreaterThan(0)
1847 expect(
1848 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1849 pool.workerChoiceStrategyContext.workerChoiceStrategy
1850 ).roundId
1851 ).toBe(0)
1852 expect(
1853 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1854 pool.workerChoiceStrategyContext.workerChoiceStrategy
1855 ).nextWorkerNodeKey
1856 ).toBe(0)
1857 expect(
1858 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1859 pool.workerChoiceStrategyContext.workerChoiceStrategy
1860 ).roundWeights
1861 ).toStrictEqual([
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 pool.workerChoiceStrategyContext.workerChoiceStrategy
1864 ).defaultWorkerWeight
1865 ])
1866 // We need to clean up the resources after our test
1867 await pool.destroy()
1868 })
1869
1870 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1871 const workerChoiceStrategy =
1872 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1873 let pool = new FixedThreadPool(
1874 max,
1875 './tests/worker-files/thread/testWorker.js'
1876 )
1877 expect(
1878 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1879 workerChoiceStrategy
1880 ).roundId
1881 ).toBeDefined()
1882 expect(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1884 workerChoiceStrategy
1885 ).nextWorkerNodeKey
1886 ).toBeDefined()
1887 expect(
1888 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1889 workerChoiceStrategy
1890 ).defaultWorkerWeight
1891 ).toBeDefined()
1892 expect(
1893 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1894 workerChoiceStrategy
1895 ).roundWeights
1896 ).toBeDefined()
1897 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1898 expect(
1899 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1900 workerChoiceStrategy
1901 ).roundId
1902 ).toBe(0)
1903 expect(
1904 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1905 pool.workerChoiceStrategyContext.workerChoiceStrategy
1906 ).nextWorkerNodeKey
1907 ).toBe(0)
1908 expect(
1909 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1910 pool.workerChoiceStrategyContext.workerChoiceStrategy
1911 ).defaultWorkerWeight
1912 ).toBeGreaterThan(0)
1913 expect(
1914 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1915 workerChoiceStrategy
1916 ).roundWeights
1917 ).toStrictEqual([
1918 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1919 pool.workerChoiceStrategyContext.workerChoiceStrategy
1920 ).defaultWorkerWeight
1921 ])
1922 await pool.destroy()
1923 pool = new DynamicThreadPool(
1924 min,
1925 max,
1926 './tests/worker-files/thread/testWorker.js'
1927 )
1928 expect(
1929 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1930 workerChoiceStrategy
1931 ).roundId
1932 ).toBeDefined()
1933 expect(
1934 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1935 workerChoiceStrategy
1936 ).nextWorkerNodeKey
1937 ).toBeDefined()
1938 expect(
1939 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1940 workerChoiceStrategy
1941 ).defaultWorkerWeight
1942 ).toBeDefined()
1943 expect(
1944 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1945 workerChoiceStrategy
1946 ).roundWeights
1947 ).toBeDefined()
1948 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1949 expect(
1950 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1951 pool.workerChoiceStrategyContext.workerChoiceStrategy
1952 ).nextWorkerNodeKey
1953 ).toBe(0)
1954 expect(
1955 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1956 pool.workerChoiceStrategyContext.workerChoiceStrategy
1957 ).defaultWorkerWeight
1958 ).toBeGreaterThan(0)
1959 expect(
1960 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1961 workerChoiceStrategy
1962 ).roundWeights
1963 ).toStrictEqual([
1964 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1965 pool.workerChoiceStrategyContext.workerChoiceStrategy
1966 ).defaultWorkerWeight
1967 ])
1968 // We need to clean up the resources after our test
1969 await pool.destroy()
1970 })
1971
1972 it('Verify unknown strategy throw error', () => {
1973 expect(
1974 () =>
1975 new DynamicThreadPool(
1976 min,
1977 max,
1978 './tests/worker-files/thread/testWorker.js',
1979 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
1980 )
1981 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
1982 })
1983 })