feat: untangle worker choice strategies tasks distribution and dynamic worker creatio...
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 WorkerChoiceStrategies
7 } = require('../../../lib')
8 const { CircularArray } = require('../../../lib/circular-array')
9
10 describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeKey
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).nextWorkerNodeKey
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 dynamicWorkerUsage: false,
127 dynamicWorkerReady: true
128 })
129 await pool.destroy()
130 pool = new DynamicThreadPool(
131 min,
132 max,
133 './tests/worker-files/thread/testWorker.js',
134 { workerChoiceStrategy }
135 )
136 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
137 dynamicWorkerUsage: false,
138 dynamicWorkerReady: true
139 })
140 // We need to clean up the resources after our test
141 await pool.destroy()
142 })
143
144 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
145 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
146 let pool = new FixedThreadPool(
147 max,
148 './tests/worker-files/thread/testWorker.js',
149 { workerChoiceStrategy }
150 )
151 expect(
152 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
153 ).toStrictEqual({
154 runTime: {
155 aggregate: false,
156 average: false,
157 median: false
158 },
159 waitTime: {
160 aggregate: false,
161 average: false,
162 median: false
163 },
164 elu: {
165 aggregate: false,
166 average: false,
167 median: false
168 }
169 })
170 await pool.destroy()
171 pool = new DynamicThreadPool(
172 min,
173 max,
174 './tests/worker-files/thread/testWorker.js',
175 { workerChoiceStrategy }
176 )
177 expect(
178 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
179 ).toStrictEqual({
180 runTime: {
181 aggregate: false,
182 average: false,
183 median: false
184 },
185 waitTime: {
186 aggregate: false,
187 average: false,
188 median: false
189 },
190 elu: {
191 aggregate: false,
192 average: false,
193 median: false
194 }
195 })
196 // We need to clean up the resources after our test
197 await pool.destroy()
198 })
199
200 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
201 const pool = new FixedThreadPool(
202 max,
203 './tests/worker-files/thread/testWorker.js',
204 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
205 )
206 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
207 const promises = new Set()
208 const maxMultiplier = 2
209 for (let i = 0; i < max * maxMultiplier; i++) {
210 promises.add(pool.execute())
211 }
212 await Promise.all(promises)
213 for (const workerNode of pool.workerNodes) {
214 expect(workerNode.usage).toStrictEqual({
215 tasks: {
216 executed: maxMultiplier,
217 executing: 0,
218 queued: 0,
219 maxQueued: 0,
220 failed: 0
221 },
222 runTime: {
223 history: expect.any(CircularArray)
224 },
225 waitTime: {
226 history: expect.any(CircularArray)
227 },
228 elu: {
229 idle: {
230 history: expect.any(CircularArray)
231 },
232 active: {
233 history: expect.any(CircularArray)
234 }
235 }
236 })
237 }
238 expect(
239 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
240 WorkerChoiceStrategies.ROUND_ROBIN
241 ).nextWorkerNodeKey
242 ).toBe(0)
243 // We need to clean up the resources after our test
244 await pool.destroy()
245 })
246
247 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
248 const pool = new DynamicThreadPool(
249 min,
250 max,
251 './tests/worker-files/thread/testWorker.js',
252 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
253 )
254 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
255 const promises = new Set()
256 const maxMultiplier = 2
257 for (let i = 0; i < max * maxMultiplier; i++) {
258 promises.add(pool.execute())
259 }
260 await Promise.all(promises)
261 for (const workerNode of pool.workerNodes) {
262 expect(workerNode.usage).toStrictEqual({
263 tasks: {
264 executed: expect.any(Number),
265 executing: 0,
266 queued: 0,
267 maxQueued: 0,
268 failed: 0
269 },
270 runTime: {
271 history: expect.any(CircularArray)
272 },
273 waitTime: {
274 history: expect.any(CircularArray)
275 },
276 elu: {
277 idle: {
278 history: expect.any(CircularArray)
279 },
280 active: {
281 history: expect.any(CircularArray)
282 }
283 }
284 })
285 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
286 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
287 max * maxMultiplier
288 )
289 }
290 expect(
291 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
292 WorkerChoiceStrategies.ROUND_ROBIN
293 ).nextWorkerNodeKey
294 ).toBe(0)
295 // We need to clean up the resources after our test
296 await pool.destroy()
297 })
298
299 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
300 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
301 let pool = new FixedClusterPool(
302 max,
303 './tests/worker-files/cluster/testWorker.js',
304 { workerChoiceStrategy }
305 )
306 let results = new Set()
307 for (let i = 0; i < max; i++) {
308 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
309 }
310 expect(results.size).toBe(max)
311 await pool.destroy()
312 pool = new FixedThreadPool(
313 max,
314 './tests/worker-files/thread/testWorker.js',
315 { workerChoiceStrategy }
316 )
317 results = new Set()
318 for (let i = 0; i < max; i++) {
319 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
320 }
321 expect(results.size).toBe(max)
322 await pool.destroy()
323 })
324
325 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
326 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
327 let pool = new FixedThreadPool(
328 max,
329 './tests/worker-files/thread/testWorker.js',
330 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
331 )
332 expect(
333 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
334 workerChoiceStrategy
335 ).nextWorkerNodeKey
336 ).toBeDefined()
337 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
338 expect(
339 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
340 pool.workerChoiceStrategyContext.workerChoiceStrategy
341 ).nextWorkerNodeKey
342 ).toBe(0)
343 await pool.destroy()
344 pool = new DynamicThreadPool(
345 min,
346 max,
347 './tests/worker-files/thread/testWorker.js',
348 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
349 )
350 expect(
351 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
352 workerChoiceStrategy
353 ).nextWorkerNodeKey
354 ).toBeDefined()
355 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
356 expect(
357 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
358 pool.workerChoiceStrategyContext.workerChoiceStrategy
359 ).nextWorkerNodeKey
360 ).toBe(0)
361 // We need to clean up the resources after our test
362 await pool.destroy()
363 })
364
365 it('Verify LEAST_USED strategy default policy', async () => {
366 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
367 let pool = new FixedThreadPool(
368 max,
369 './tests/worker-files/thread/testWorker.js',
370 { workerChoiceStrategy }
371 )
372 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
373 dynamicWorkerUsage: false,
374 dynamicWorkerReady: true
375 })
376 await pool.destroy()
377 pool = new DynamicThreadPool(
378 min,
379 max,
380 './tests/worker-files/thread/testWorker.js',
381 { workerChoiceStrategy }
382 )
383 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
384 dynamicWorkerUsage: false,
385 dynamicWorkerReady: true
386 })
387 // We need to clean up the resources after our test
388 await pool.destroy()
389 })
390
391 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
392 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
393 let pool = new FixedThreadPool(
394 max,
395 './tests/worker-files/thread/testWorker.js',
396 { workerChoiceStrategy }
397 )
398 expect(
399 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
400 ).toStrictEqual({
401 runTime: {
402 aggregate: false,
403 average: false,
404 median: false
405 },
406 waitTime: {
407 aggregate: false,
408 average: false,
409 median: false
410 },
411 elu: {
412 aggregate: false,
413 average: false,
414 median: false
415 }
416 })
417 await pool.destroy()
418 pool = new DynamicThreadPool(
419 min,
420 max,
421 './tests/worker-files/thread/testWorker.js',
422 { workerChoiceStrategy }
423 )
424 expect(
425 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 ).toStrictEqual({
427 runTime: {
428 aggregate: false,
429 average: false,
430 median: false
431 },
432 waitTime: {
433 aggregate: false,
434 average: false,
435 median: false
436 },
437 elu: {
438 aggregate: false,
439 average: false,
440 median: false
441 }
442 })
443 // We need to clean up the resources after our test
444 await pool.destroy()
445 })
446
447 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
448 const pool = new FixedThreadPool(
449 max,
450 './tests/worker-files/thread/testWorker.js',
451 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
452 )
453 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
454 const promises = new Set()
455 const maxMultiplier = 2
456 for (let i = 0; i < max * maxMultiplier; i++) {
457 promises.add(pool.execute())
458 }
459 await Promise.all(promises)
460 for (const workerNode of pool.workerNodes) {
461 expect(workerNode.usage).toStrictEqual({
462 tasks: {
463 executed: expect.any(Number),
464 executing: 0,
465 queued: 0,
466 maxQueued: 0,
467 failed: 0
468 },
469 runTime: {
470 history: expect.any(CircularArray)
471 },
472 waitTime: {
473 history: expect.any(CircularArray)
474 },
475 elu: {
476 idle: {
477 history: expect.any(CircularArray)
478 },
479 active: {
480 history: expect.any(CircularArray)
481 }
482 }
483 })
484 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
485 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
486 max * maxMultiplier
487 )
488 }
489 // We need to clean up the resources after our test
490 await pool.destroy()
491 })
492
493 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
494 const pool = new DynamicThreadPool(
495 min,
496 max,
497 './tests/worker-files/thread/testWorker.js',
498 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
499 )
500 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
501 const promises = new Set()
502 const maxMultiplier = 2
503 for (let i = 0; i < max * maxMultiplier; i++) {
504 promises.add(pool.execute())
505 }
506 await Promise.all(promises)
507 for (const workerNode of pool.workerNodes) {
508 expect(workerNode.usage).toStrictEqual({
509 tasks: {
510 executed: expect.any(Number),
511 executing: 0,
512 queued: 0,
513 maxQueued: 0,
514 failed: 0
515 },
516 runTime: {
517 history: expect.any(CircularArray)
518 },
519 waitTime: {
520 history: expect.any(CircularArray)
521 },
522 elu: {
523 idle: {
524 history: expect.any(CircularArray)
525 },
526 active: {
527 history: expect.any(CircularArray)
528 }
529 }
530 })
531 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
532 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
533 max * maxMultiplier
534 )
535 }
536 // We need to clean up the resources after our test
537 await pool.destroy()
538 })
539
540 it('Verify LEAST_BUSY strategy default policy', async () => {
541 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
542 let pool = new FixedThreadPool(
543 max,
544 './tests/worker-files/thread/testWorker.js',
545 { workerChoiceStrategy }
546 )
547 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
548 dynamicWorkerUsage: false,
549 dynamicWorkerReady: true
550 })
551 await pool.destroy()
552 pool = new DynamicThreadPool(
553 min,
554 max,
555 './tests/worker-files/thread/testWorker.js',
556 { workerChoiceStrategy }
557 )
558 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
559 dynamicWorkerUsage: false,
560 dynamicWorkerReady: true
561 })
562 // We need to clean up the resources after our test
563 await pool.destroy()
564 })
565
566 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
567 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
568 let pool = new FixedThreadPool(
569 max,
570 './tests/worker-files/thread/testWorker.js',
571 { workerChoiceStrategy }
572 )
573 expect(
574 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
575 ).toStrictEqual({
576 runTime: {
577 aggregate: true,
578 average: false,
579 median: false
580 },
581 waitTime: {
582 aggregate: true,
583 average: false,
584 median: false
585 },
586 elu: {
587 aggregate: false,
588 average: false,
589 median: false
590 }
591 })
592 await pool.destroy()
593 pool = new DynamicThreadPool(
594 min,
595 max,
596 './tests/worker-files/thread/testWorker.js',
597 { workerChoiceStrategy }
598 )
599 expect(
600 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
601 ).toStrictEqual({
602 runTime: {
603 aggregate: true,
604 average: false,
605 median: false
606 },
607 waitTime: {
608 aggregate: true,
609 average: false,
610 median: false
611 },
612 elu: {
613 aggregate: false,
614 average: false,
615 median: false
616 }
617 })
618 // We need to clean up the resources after our test
619 await pool.destroy()
620 })
621
622 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
623 const pool = new FixedThreadPool(
624 max,
625 './tests/worker-files/thread/testWorker.js',
626 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
627 )
628 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
629 const promises = new Set()
630 const maxMultiplier = 2
631 for (let i = 0; i < max * maxMultiplier; i++) {
632 promises.add(pool.execute())
633 }
634 await Promise.all(promises)
635 for (const workerNode of pool.workerNodes) {
636 expect(workerNode.usage).toMatchObject({
637 tasks: {
638 executed: expect.any(Number),
639 executing: 0,
640 queued: 0,
641 maxQueued: 0,
642 failed: 0
643 },
644 runTime: {
645 history: expect.any(CircularArray)
646 },
647 waitTime: {
648 history: expect.any(CircularArray)
649 },
650 elu: {
651 idle: {
652 history: expect.any(CircularArray)
653 },
654 active: {
655 history: expect.any(CircularArray)
656 }
657 }
658 })
659 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
660 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
661 max * maxMultiplier
662 )
663 if (workerNode.usage.runTime.aggregate == null) {
664 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
665 } else {
666 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
667 }
668 if (workerNode.usage.waitTime.aggregate == null) {
669 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
670 } else {
671 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
672 }
673 }
674 // We need to clean up the resources after our test
675 await pool.destroy()
676 })
677
678 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
679 const pool = new DynamicThreadPool(
680 min,
681 max,
682 './tests/worker-files/thread/testWorker.js',
683 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
684 )
685 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
686 const promises = new Set()
687 const maxMultiplier = 2
688 for (let i = 0; i < max * maxMultiplier; i++) {
689 promises.add(pool.execute())
690 }
691 await Promise.all(promises)
692 for (const workerNode of pool.workerNodes) {
693 expect(workerNode.usage).toMatchObject({
694 tasks: {
695 executed: expect.any(Number),
696 executing: 0,
697 queued: 0,
698 maxQueued: 0,
699 failed: 0
700 },
701 runTime: {
702 history: expect.any(CircularArray)
703 },
704 waitTime: {
705 history: expect.any(CircularArray)
706 },
707 elu: {
708 idle: {
709 history: expect.any(CircularArray)
710 },
711 active: {
712 history: expect.any(CircularArray)
713 }
714 }
715 })
716 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
717 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
718 max * maxMultiplier
719 )
720 if (workerNode.usage.runTime.aggregate == null) {
721 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
722 } else {
723 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
724 }
725 if (workerNode.usage.waitTime.aggregate == null) {
726 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
727 } else {
728 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
729 }
730 }
731 // We need to clean up the resources after our test
732 await pool.destroy()
733 })
734
735 it('Verify LEAST_ELU strategy default policy', async () => {
736 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
737 let pool = new FixedThreadPool(
738 max,
739 './tests/worker-files/thread/testWorker.js',
740 { workerChoiceStrategy }
741 )
742 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
743 dynamicWorkerUsage: false,
744 dynamicWorkerReady: true
745 })
746 await pool.destroy()
747 pool = new DynamicThreadPool(
748 min,
749 max,
750 './tests/worker-files/thread/testWorker.js',
751 { workerChoiceStrategy }
752 )
753 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
754 dynamicWorkerUsage: false,
755 dynamicWorkerReady: true
756 })
757 // We need to clean up the resources after our test
758 await pool.destroy()
759 })
760
761 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
762 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
763 let pool = new FixedThreadPool(
764 max,
765 './tests/worker-files/thread/testWorker.js',
766 { workerChoiceStrategy }
767 )
768 expect(
769 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
770 ).toStrictEqual({
771 runTime: {
772 aggregate: false,
773 average: false,
774 median: false
775 },
776 waitTime: {
777 aggregate: false,
778 average: false,
779 median: false
780 },
781 elu: {
782 aggregate: true,
783 average: false,
784 median: false
785 }
786 })
787 await pool.destroy()
788 pool = new DynamicThreadPool(
789 min,
790 max,
791 './tests/worker-files/thread/testWorker.js',
792 { workerChoiceStrategy }
793 )
794 expect(
795 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
796 ).toStrictEqual({
797 runTime: {
798 aggregate: false,
799 average: false,
800 median: false
801 },
802 waitTime: {
803 aggregate: false,
804 average: false,
805 median: false
806 },
807 elu: {
808 aggregate: true,
809 average: false,
810 median: false
811 }
812 })
813 // We need to clean up the resources after our test
814 await pool.destroy()
815 })
816
817 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
818 const pool = new FixedThreadPool(
819 max,
820 './tests/worker-files/thread/testWorker.js',
821 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
822 )
823 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
824 const promises = new Set()
825 const maxMultiplier = 2
826 for (let i = 0; i < max * maxMultiplier; i++) {
827 promises.add(pool.execute())
828 }
829 await Promise.all(promises)
830 for (const workerNode of pool.workerNodes) {
831 expect(workerNode.usage).toMatchObject({
832 tasks: {
833 executed: expect.any(Number),
834 executing: 0,
835 queued: 0,
836 maxQueued: 0,
837 failed: 0
838 },
839 runTime: {
840 history: expect.any(CircularArray)
841 },
842 waitTime: {
843 history: expect.any(CircularArray)
844 },
845 elu: {
846 idle: expect.objectContaining({
847 history: expect.any(CircularArray)
848 }),
849 active: expect.objectContaining({
850 history: expect.any(CircularArray)
851 })
852 }
853 })
854 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
855 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
856 max * maxMultiplier
857 )
858 if (workerNode.usage.elu.utilization == null) {
859 expect(workerNode.usage.elu.utilization).toBeUndefined()
860 } else {
861 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
862 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
863 }
864 }
865 // We need to clean up the resources after our test
866 await pool.destroy()
867 })
868
869 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
870 const pool = new DynamicThreadPool(
871 min,
872 max,
873 './tests/worker-files/thread/testWorker.js',
874 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
875 )
876 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
877 const promises = new Set()
878 const maxMultiplier = 2
879 for (let i = 0; i < max * maxMultiplier; i++) {
880 promises.add(pool.execute())
881 }
882 await Promise.all(promises)
883 for (const workerNode of pool.workerNodes) {
884 expect(workerNode.usage).toMatchObject({
885 tasks: {
886 executed: expect.any(Number),
887 executing: 0,
888 queued: 0,
889 maxQueued: 0,
890 failed: 0
891 },
892 runTime: {
893 history: expect.any(CircularArray)
894 },
895 waitTime: {
896 history: expect.any(CircularArray)
897 },
898 elu: {
899 idle: expect.objectContaining({
900 history: expect.any(CircularArray)
901 }),
902 active: expect.objectContaining({
903 history: expect.any(CircularArray)
904 })
905 }
906 })
907 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
908 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
909 max * maxMultiplier
910 )
911 if (workerNode.usage.elu.utilization == null) {
912 expect(workerNode.usage.elu.utilization).toBeUndefined()
913 } else {
914 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
915 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
916 }
917 }
918 // We need to clean up the resources after our test
919 await pool.destroy()
920 })
921
922 it('Verify FAIR_SHARE strategy default policy', async () => {
923 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
924 let pool = new FixedThreadPool(
925 max,
926 './tests/worker-files/thread/testWorker.js',
927 { workerChoiceStrategy }
928 )
929 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
930 dynamicWorkerUsage: false,
931 dynamicWorkerReady: true
932 })
933 await pool.destroy()
934 pool = new DynamicThreadPool(
935 min,
936 max,
937 './tests/worker-files/thread/testWorker.js',
938 { workerChoiceStrategy }
939 )
940 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
941 dynamicWorkerUsage: false,
942 dynamicWorkerReady: true
943 })
944 // We need to clean up the resources after our test
945 await pool.destroy()
946 })
947
948 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
949 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
950 let pool = new FixedThreadPool(
951 max,
952 './tests/worker-files/thread/testWorker.js',
953 { workerChoiceStrategy }
954 )
955 expect(
956 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
957 ).toStrictEqual({
958 runTime: {
959 aggregate: true,
960 average: true,
961 median: false
962 },
963 waitTime: {
964 aggregate: false,
965 average: false,
966 median: false
967 },
968 elu: {
969 aggregate: true,
970 average: true,
971 median: false
972 }
973 })
974 await pool.destroy()
975 pool = new DynamicThreadPool(
976 min,
977 max,
978 './tests/worker-files/thread/testWorker.js',
979 { workerChoiceStrategy }
980 )
981 expect(
982 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
983 ).toStrictEqual({
984 runTime: {
985 aggregate: true,
986 average: true,
987 median: false
988 },
989 waitTime: {
990 aggregate: false,
991 average: false,
992 median: false
993 },
994 elu: {
995 aggregate: true,
996 average: true,
997 median: false
998 }
999 })
1000 // We need to clean up the resources after our test
1001 await pool.destroy()
1002 })
1003
1004 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1005 const pool = new FixedThreadPool(
1006 max,
1007 './tests/worker-files/thread/testWorker.js',
1008 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1009 )
1010 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1011 const promises = new Set()
1012 const maxMultiplier = 2
1013 for (let i = 0; i < max * maxMultiplier; i++) {
1014 promises.add(pool.execute())
1015 }
1016 await Promise.all(promises)
1017 for (const workerNode of pool.workerNodes) {
1018 expect(workerNode.usage).toMatchObject({
1019 tasks: {
1020 executed: expect.any(Number),
1021 executing: 0,
1022 queued: 0,
1023 maxQueued: 0,
1024 failed: 0
1025 },
1026 runTime: expect.objectContaining({
1027 history: expect.any(CircularArray)
1028 }),
1029 waitTime: {
1030 history: expect.any(CircularArray)
1031 },
1032 elu: {
1033 idle: expect.objectContaining({
1034 history: expect.any(CircularArray)
1035 }),
1036 active: expect.objectContaining({
1037 history: expect.any(CircularArray)
1038 })
1039 }
1040 })
1041 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1042 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1043 max * maxMultiplier
1044 )
1045 if (workerNode.usage.runTime.aggregate == null) {
1046 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1047 } else {
1048 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1049 }
1050 if (workerNode.usage.runTime.average == null) {
1051 expect(workerNode.usage.runTime.average).toBeUndefined()
1052 } else {
1053 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1054 }
1055 if (workerNode.usage.elu.utilization == null) {
1056 expect(workerNode.usage.elu.utilization).toBeUndefined()
1057 } else {
1058 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1059 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1060 }
1061 }
1062 expect(
1063 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1064 pool.workerChoiceStrategyContext.workerChoiceStrategy
1065 ).workersVirtualTaskEndTimestamp.length
1066 ).toBe(pool.workerNodes.length)
1067 // We need to clean up the resources after our test
1068 await pool.destroy()
1069 })
1070
1071 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1072 const pool = new DynamicThreadPool(
1073 min,
1074 max,
1075 './tests/worker-files/thread/testWorker.js',
1076 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1077 )
1078 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1079 const promises = new Set()
1080 const maxMultiplier = 2
1081 for (let i = 0; i < max * maxMultiplier; i++) {
1082 promises.add(pool.execute())
1083 }
1084 await Promise.all(promises)
1085 for (const workerNode of pool.workerNodes) {
1086 expect(workerNode.usage).toMatchObject({
1087 tasks: {
1088 executed: expect.any(Number),
1089 executing: 0,
1090 queued: 0,
1091 maxQueued: 0,
1092 failed: 0
1093 },
1094 runTime: expect.objectContaining({
1095 history: expect.any(CircularArray)
1096 }),
1097 waitTime: {
1098 history: expect.any(CircularArray)
1099 },
1100 elu: {
1101 idle: expect.objectContaining({
1102 history: expect.any(CircularArray)
1103 }),
1104 active: expect.objectContaining({
1105 history: expect.any(CircularArray)
1106 })
1107 }
1108 })
1109 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1110 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1111 max * maxMultiplier
1112 )
1113 if (workerNode.usage.runTime.aggregate == null) {
1114 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1115 } else {
1116 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1117 }
1118 if (workerNode.usage.runTime.average == null) {
1119 expect(workerNode.usage.runTime.average).toBeUndefined()
1120 } else {
1121 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1122 }
1123 if (workerNode.usage.elu.utilization == null) {
1124 expect(workerNode.usage.elu.utilization).toBeUndefined()
1125 } else {
1126 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1127 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1128 }
1129 }
1130 expect(
1131 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1132 pool.workerChoiceStrategyContext.workerChoiceStrategy
1133 ).workersVirtualTaskEndTimestamp.length
1134 ).toBe(pool.workerNodes.length)
1135 // We need to clean up the resources after our test
1136 await pool.destroy()
1137 })
1138
1139 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1140 const pool = new DynamicThreadPool(
1141 min,
1142 max,
1143 './tests/worker-files/thread/testWorker.js',
1144 {
1145 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1146 workerChoiceStrategyOptions: {
1147 runTime: { median: true }
1148 }
1149 }
1150 )
1151 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1152 const promises = new Set()
1153 const maxMultiplier = 2
1154 for (let i = 0; i < max * maxMultiplier; i++) {
1155 promises.add(pool.execute())
1156 }
1157 await Promise.all(promises)
1158 for (const workerNode of pool.workerNodes) {
1159 expect(workerNode.usage).toMatchObject({
1160 tasks: {
1161 executed: expect.any(Number),
1162 executing: 0,
1163 queued: 0,
1164 maxQueued: 0,
1165 failed: 0
1166 },
1167 runTime: expect.objectContaining({
1168 history: expect.any(CircularArray)
1169 }),
1170 waitTime: {
1171 history: expect.any(CircularArray)
1172 },
1173 elu: {
1174 idle: expect.objectContaining({
1175 history: expect.any(CircularArray)
1176 }),
1177 active: expect.objectContaining({
1178 history: expect.any(CircularArray)
1179 })
1180 }
1181 })
1182 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1183 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1184 max * maxMultiplier
1185 )
1186 if (workerNode.usage.runTime.aggregate == null) {
1187 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1188 } else {
1189 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1190 }
1191 if (workerNode.usage.runTime.median == null) {
1192 expect(workerNode.usage.runTime.median).toBeUndefined()
1193 } else {
1194 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1195 }
1196 if (workerNode.usage.elu.utilization == null) {
1197 expect(workerNode.usage.elu.utilization).toBeUndefined()
1198 } else {
1199 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1200 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1201 }
1202 }
1203 expect(
1204 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1205 pool.workerChoiceStrategyContext.workerChoiceStrategy
1206 ).workersVirtualTaskEndTimestamp.length
1207 ).toBe(pool.workerNodes.length)
1208 // We need to clean up the resources after our test
1209 await pool.destroy()
1210 })
1211
1212 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1213 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1214 let pool = new FixedThreadPool(
1215 max,
1216 './tests/worker-files/thread/testWorker.js'
1217 )
1218 expect(
1219 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1220 workerChoiceStrategy
1221 ).workersVirtualTaskEndTimestamp
1222 ).toBeInstanceOf(Array)
1223 expect(
1224 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1225 workerChoiceStrategy
1226 ).workersVirtualTaskEndTimestamp.length
1227 ).toBe(0)
1228 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1229 workerChoiceStrategy
1230 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1231 expect(
1232 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1233 workerChoiceStrategy
1234 ).workersVirtualTaskEndTimestamp.length
1235 ).toBe(1)
1236 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1237 expect(
1238 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1239 workerChoiceStrategy
1240 ).workersVirtualTaskEndTimestamp
1241 ).toBeInstanceOf(Array)
1242 expect(
1243 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1244 workerChoiceStrategy
1245 ).workersVirtualTaskEndTimestamp.length
1246 ).toBe(0)
1247 await pool.destroy()
1248 pool = new DynamicThreadPool(
1249 min,
1250 max,
1251 './tests/worker-files/thread/testWorker.js'
1252 )
1253 expect(
1254 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1255 workerChoiceStrategy
1256 ).workersVirtualTaskEndTimestamp
1257 ).toBeInstanceOf(Array)
1258 expect(
1259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1260 workerChoiceStrategy
1261 ).workersVirtualTaskEndTimestamp.length
1262 ).toBe(0)
1263 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1264 workerChoiceStrategy
1265 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1266 expect(
1267 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1268 workerChoiceStrategy
1269 ).workersVirtualTaskEndTimestamp.length
1270 ).toBe(1)
1271 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1272 expect(
1273 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1274 workerChoiceStrategy
1275 ).workersVirtualTaskEndTimestamp
1276 ).toBeInstanceOf(Array)
1277 expect(
1278 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1279 workerChoiceStrategy
1280 ).workersVirtualTaskEndTimestamp.length
1281 ).toBe(0)
1282 // We need to clean up the resources after our test
1283 await pool.destroy()
1284 })
1285
1286 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1287 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1288 let pool = new FixedThreadPool(
1289 max,
1290 './tests/worker-files/thread/testWorker.js',
1291 { workerChoiceStrategy }
1292 )
1293 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1294 dynamicWorkerUsage: false,
1295 dynamicWorkerReady: true
1296 })
1297 await pool.destroy()
1298 pool = new DynamicThreadPool(
1299 min,
1300 max,
1301 './tests/worker-files/thread/testWorker.js',
1302 { workerChoiceStrategy }
1303 )
1304 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1305 dynamicWorkerUsage: false,
1306 dynamicWorkerReady: true
1307 })
1308 // We need to clean up the resources after our test
1309 await pool.destroy()
1310 })
1311
1312 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1313 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1314 let pool = new FixedThreadPool(
1315 max,
1316 './tests/worker-files/thread/testWorker.js',
1317 { workerChoiceStrategy }
1318 )
1319 expect(
1320 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1321 ).toStrictEqual({
1322 runTime: {
1323 aggregate: true,
1324 average: true,
1325 median: false
1326 },
1327 waitTime: {
1328 aggregate: false,
1329 average: false,
1330 median: false
1331 },
1332 elu: {
1333 aggregate: false,
1334 average: false,
1335 median: false
1336 }
1337 })
1338 await pool.destroy()
1339 pool = new DynamicThreadPool(
1340 min,
1341 max,
1342 './tests/worker-files/thread/testWorker.js',
1343 { workerChoiceStrategy }
1344 )
1345 expect(
1346 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1347 ).toStrictEqual({
1348 runTime: {
1349 aggregate: true,
1350 average: true,
1351 median: false
1352 },
1353 waitTime: {
1354 aggregate: false,
1355 average: false,
1356 median: false
1357 },
1358 elu: {
1359 aggregate: false,
1360 average: false,
1361 median: false
1362 }
1363 })
1364 // We need to clean up the resources after our test
1365 await pool.destroy()
1366 })
1367
1368 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1369 const pool = new FixedThreadPool(
1370 max,
1371 './tests/worker-files/thread/testWorker.js',
1372 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1373 )
1374 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1375 const promises = new Set()
1376 const maxMultiplier = 2
1377 for (let i = 0; i < max * maxMultiplier; i++) {
1378 promises.add(pool.execute())
1379 }
1380 await Promise.all(promises)
1381 for (const workerNode of pool.workerNodes) {
1382 expect(workerNode.usage).toStrictEqual({
1383 tasks: {
1384 executed: expect.any(Number),
1385 executing: 0,
1386 queued: 0,
1387 maxQueued: 0,
1388 failed: 0
1389 },
1390 runTime: expect.objectContaining({
1391 history: expect.any(CircularArray)
1392 }),
1393 waitTime: {
1394 history: expect.any(CircularArray)
1395 },
1396 elu: {
1397 idle: {
1398 history: expect.any(CircularArray)
1399 },
1400 active: {
1401 history: expect.any(CircularArray)
1402 }
1403 }
1404 })
1405 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1406 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1407 max * maxMultiplier
1408 )
1409 if (workerNode.usage.runTime.aggregate == null) {
1410 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1411 } else {
1412 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1413 }
1414 if (workerNode.usage.runTime.average == null) {
1415 expect(workerNode.usage.runTime.average).toBeUndefined()
1416 } else {
1417 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1418 }
1419 }
1420 expect(
1421 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1422 pool.workerChoiceStrategyContext.workerChoiceStrategy
1423 ).defaultWorkerWeight
1424 ).toBeGreaterThan(0)
1425 expect(
1426 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1427 pool.workerChoiceStrategyContext.workerChoiceStrategy
1428 ).workerVirtualTaskRunTime
1429 ).toBeGreaterThanOrEqual(0)
1430 // We need to clean up the resources after our test
1431 await pool.destroy()
1432 })
1433
1434 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1435 const pool = new DynamicThreadPool(
1436 min,
1437 max,
1438 './tests/worker-files/thread/testWorker.js',
1439 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1440 )
1441 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1442 const promises = new Set()
1443 const maxMultiplier = 2
1444 for (let i = 0; i < max * maxMultiplier; i++) {
1445 promises.add(pool.execute())
1446 }
1447 await Promise.all(promises)
1448 for (const workerNode of pool.workerNodes) {
1449 expect(workerNode.usage).toStrictEqual({
1450 tasks: {
1451 executed: expect.any(Number),
1452 executing: 0,
1453 queued: 0,
1454 maxQueued: 0,
1455 failed: 0
1456 },
1457 runTime: expect.objectContaining({
1458 history: expect.any(CircularArray)
1459 }),
1460 waitTime: {
1461 history: expect.any(CircularArray)
1462 },
1463 elu: {
1464 idle: {
1465 history: expect.any(CircularArray)
1466 },
1467 active: {
1468 history: expect.any(CircularArray)
1469 }
1470 }
1471 })
1472 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1473 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1474 max * maxMultiplier
1475 )
1476 if (workerNode.usage.runTime.aggregate == null) {
1477 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1478 } else {
1479 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1480 }
1481 if (workerNode.usage.runTime.average == null) {
1482 expect(workerNode.usage.runTime.average).toBeUndefined()
1483 } else {
1484 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1485 }
1486 }
1487 expect(
1488 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1489 pool.workerChoiceStrategyContext.workerChoiceStrategy
1490 ).defaultWorkerWeight
1491 ).toBeGreaterThan(0)
1492 expect(
1493 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1494 pool.workerChoiceStrategyContext.workerChoiceStrategy
1495 ).workerVirtualTaskRunTime
1496 ).toBeGreaterThanOrEqual(0)
1497 // We need to clean up the resources after our test
1498 await pool.destroy()
1499 })
1500
1501 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1502 const pool = new DynamicThreadPool(
1503 min,
1504 max,
1505 './tests/worker-files/thread/testWorker.js',
1506 {
1507 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1508 workerChoiceStrategyOptions: {
1509 runTime: { median: true }
1510 }
1511 }
1512 )
1513 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1514 const promises = new Set()
1515 const maxMultiplier = 2
1516 for (let i = 0; i < max * maxMultiplier; i++) {
1517 promises.add(pool.execute())
1518 }
1519 await Promise.all(promises)
1520 for (const workerNode of pool.workerNodes) {
1521 expect(workerNode.usage).toStrictEqual({
1522 tasks: {
1523 executed: expect.any(Number),
1524 executing: 0,
1525 queued: 0,
1526 maxQueued: 0,
1527 failed: 0
1528 },
1529 runTime: expect.objectContaining({
1530 history: expect.any(CircularArray)
1531 }),
1532 waitTime: {
1533 history: expect.any(CircularArray)
1534 },
1535 elu: {
1536 idle: {
1537 history: expect.any(CircularArray)
1538 },
1539 active: {
1540 history: expect.any(CircularArray)
1541 }
1542 }
1543 })
1544 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1545 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1546 max * maxMultiplier
1547 )
1548 if (workerNode.usage.runTime.aggregate == null) {
1549 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1550 } else {
1551 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1552 }
1553 if (workerNode.usage.runTime.median == null) {
1554 expect(workerNode.usage.runTime.median).toBeUndefined()
1555 } else {
1556 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1557 }
1558 }
1559 expect(
1560 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1561 pool.workerChoiceStrategyContext.workerChoiceStrategy
1562 ).defaultWorkerWeight
1563 ).toBeGreaterThan(0)
1564 expect(
1565 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1566 pool.workerChoiceStrategyContext.workerChoiceStrategy
1567 ).workerVirtualTaskRunTime
1568 ).toBeGreaterThanOrEqual(0)
1569 // We need to clean up the resources after our test
1570 await pool.destroy()
1571 })
1572
1573 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1574 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1575 let pool = new FixedThreadPool(
1576 max,
1577 './tests/worker-files/thread/testWorker.js'
1578 )
1579 expect(
1580 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1581 workerChoiceStrategy
1582 ).nextWorkerNodeKey
1583 ).toBeDefined()
1584 expect(
1585 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1586 workerChoiceStrategy
1587 ).defaultWorkerWeight
1588 ).toBeDefined()
1589 expect(
1590 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1591 workerChoiceStrategy
1592 ).workerVirtualTaskRunTime
1593 ).toBeDefined()
1594 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1595 expect(
1596 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1597 pool.workerChoiceStrategyContext.workerChoiceStrategy
1598 ).nextWorkerNodeKey
1599 ).toBe(0)
1600 expect(
1601 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1602 pool.workerChoiceStrategyContext.workerChoiceStrategy
1603 ).defaultWorkerWeight
1604 ).toBeGreaterThan(0)
1605 expect(
1606 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1607 workerChoiceStrategy
1608 ).workerVirtualTaskRunTime
1609 ).toBe(0)
1610 await pool.destroy()
1611 pool = new DynamicThreadPool(
1612 min,
1613 max,
1614 './tests/worker-files/thread/testWorker.js'
1615 )
1616 expect(
1617 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1618 workerChoiceStrategy
1619 ).nextWorkerNodeKey
1620 ).toBeDefined()
1621 expect(
1622 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1623 workerChoiceStrategy
1624 ).defaultWorkerWeight
1625 ).toBeDefined()
1626 expect(
1627 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1628 workerChoiceStrategy
1629 ).workerVirtualTaskRunTime
1630 ).toBeDefined()
1631 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1632 expect(
1633 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1634 pool.workerChoiceStrategyContext.workerChoiceStrategy
1635 ).nextWorkerNodeKey
1636 ).toBe(0)
1637 expect(
1638 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1639 pool.workerChoiceStrategyContext.workerChoiceStrategy
1640 ).defaultWorkerWeight
1641 ).toBeGreaterThan(0)
1642 expect(
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 workerChoiceStrategy
1645 ).workerVirtualTaskRunTime
1646 ).toBe(0)
1647 // We need to clean up the resources after our test
1648 await pool.destroy()
1649 })
1650
1651 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1652 const workerChoiceStrategy =
1653 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1654 let pool = new FixedThreadPool(
1655 max,
1656 './tests/worker-files/thread/testWorker.js',
1657 { workerChoiceStrategy }
1658 )
1659 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1660 dynamicWorkerUsage: false,
1661 dynamicWorkerReady: true
1662 })
1663 await pool.destroy()
1664 pool = new DynamicThreadPool(
1665 min,
1666 max,
1667 './tests/worker-files/thread/testWorker.js',
1668 { workerChoiceStrategy }
1669 )
1670 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1671 dynamicWorkerUsage: false,
1672 dynamicWorkerReady: true
1673 })
1674 // We need to clean up the resources after our test
1675 await pool.destroy()
1676 })
1677
1678 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1679 const workerChoiceStrategy =
1680 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1681 let pool = new FixedThreadPool(
1682 max,
1683 './tests/worker-files/thread/testWorker.js',
1684 { workerChoiceStrategy }
1685 )
1686 expect(
1687 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1688 ).toStrictEqual({
1689 runTime: {
1690 aggregate: false,
1691 average: false,
1692 median: false
1693 },
1694 waitTime: {
1695 aggregate: false,
1696 average: false,
1697 median: false
1698 },
1699 elu: {
1700 aggregate: false,
1701 average: false,
1702 median: false
1703 }
1704 })
1705 await pool.destroy()
1706 pool = new DynamicThreadPool(
1707 min,
1708 max,
1709 './tests/worker-files/thread/testWorker.js',
1710 { workerChoiceStrategy }
1711 )
1712 expect(
1713 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1714 ).toStrictEqual({
1715 runTime: {
1716 aggregate: false,
1717 average: false,
1718 median: false
1719 },
1720 waitTime: {
1721 aggregate: false,
1722 average: false,
1723 median: false
1724 },
1725 elu: {
1726 aggregate: false,
1727 average: false,
1728 median: false
1729 }
1730 })
1731 // We need to clean up the resources after our test
1732 await pool.destroy()
1733 })
1734
1735 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1736 const pool = new FixedThreadPool(
1737 max,
1738 './tests/worker-files/thread/testWorker.js',
1739 {
1740 workerChoiceStrategy:
1741 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1742 }
1743 )
1744 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1745 const promises = new Set()
1746 const maxMultiplier = 2
1747 for (let i = 0; i < max * maxMultiplier; i++) {
1748 promises.add(pool.execute())
1749 }
1750 await Promise.all(promises)
1751 for (const workerNode of pool.workerNodes) {
1752 expect(workerNode.usage).toStrictEqual({
1753 tasks: {
1754 executed: maxMultiplier,
1755 executing: 0,
1756 queued: 0,
1757 maxQueued: 0,
1758 failed: 0
1759 },
1760 runTime: {
1761 history: expect.any(CircularArray)
1762 },
1763 waitTime: {
1764 history: expect.any(CircularArray)
1765 },
1766 elu: {
1767 idle: {
1768 history: expect.any(CircularArray)
1769 },
1770 active: {
1771 history: expect.any(CircularArray)
1772 }
1773 }
1774 })
1775 }
1776 expect(
1777 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1778 pool.workerChoiceStrategyContext.workerChoiceStrategy
1779 ).defaultWorkerWeight
1780 ).toBeGreaterThan(0)
1781 expect(
1782 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1783 pool.workerChoiceStrategyContext.workerChoiceStrategy
1784 ).roundId
1785 ).toBe(0)
1786 expect(
1787 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1788 pool.workerChoiceStrategyContext.workerChoiceStrategy
1789 ).nextWorkerNodeKey
1790 ).toBe(0)
1791 expect(
1792 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1793 pool.workerChoiceStrategyContext.workerChoiceStrategy
1794 ).roundWeights
1795 ).toStrictEqual([
1796 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategy
1798 ).defaultWorkerWeight
1799 ])
1800 // We need to clean up the resources after our test
1801 await pool.destroy()
1802 })
1803
1804 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1805 const pool = new DynamicThreadPool(
1806 min,
1807 max,
1808 './tests/worker-files/thread/testWorker.js',
1809 {
1810 workerChoiceStrategy:
1811 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1812 }
1813 )
1814 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1815 const promises = new Set()
1816 const maxMultiplier = 2
1817 for (let i = 0; i < max * maxMultiplier; i++) {
1818 promises.add(pool.execute())
1819 }
1820 await Promise.all(promises)
1821 for (const workerNode of pool.workerNodes) {
1822 expect(workerNode.usage).toStrictEqual({
1823 tasks: {
1824 executed: expect.any(Number),
1825 executing: 0,
1826 queued: 0,
1827 maxQueued: 0,
1828 failed: 0
1829 },
1830 runTime: {
1831 history: expect.any(CircularArray)
1832 },
1833 waitTime: {
1834 history: expect.any(CircularArray)
1835 },
1836 elu: {
1837 idle: {
1838 history: expect.any(CircularArray)
1839 },
1840 active: {
1841 history: expect.any(CircularArray)
1842 }
1843 }
1844 })
1845 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1846 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1847 max * maxMultiplier
1848 )
1849 }
1850 expect(
1851 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1852 pool.workerChoiceStrategyContext.workerChoiceStrategy
1853 ).defaultWorkerWeight
1854 ).toBeGreaterThan(0)
1855 expect(
1856 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1857 pool.workerChoiceStrategyContext.workerChoiceStrategy
1858 ).roundId
1859 ).toBe(0)
1860 expect(
1861 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1862 pool.workerChoiceStrategyContext.workerChoiceStrategy
1863 ).nextWorkerNodeKey
1864 ).toBe(0)
1865 expect(
1866 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1867 pool.workerChoiceStrategyContext.workerChoiceStrategy
1868 ).roundWeights
1869 ).toStrictEqual([
1870 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1871 pool.workerChoiceStrategyContext.workerChoiceStrategy
1872 ).defaultWorkerWeight
1873 ])
1874 // We need to clean up the resources after our test
1875 await pool.destroy()
1876 })
1877
1878 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1879 const workerChoiceStrategy =
1880 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1881 let pool = new FixedThreadPool(
1882 max,
1883 './tests/worker-files/thread/testWorker.js'
1884 )
1885 expect(
1886 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1887 workerChoiceStrategy
1888 ).roundId
1889 ).toBeDefined()
1890 expect(
1891 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1892 workerChoiceStrategy
1893 ).nextWorkerNodeKey
1894 ).toBeDefined()
1895 expect(
1896 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1897 workerChoiceStrategy
1898 ).defaultWorkerWeight
1899 ).toBeDefined()
1900 expect(
1901 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1902 workerChoiceStrategy
1903 ).roundWeights
1904 ).toBeDefined()
1905 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1906 expect(
1907 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1908 workerChoiceStrategy
1909 ).roundId
1910 ).toBe(0)
1911 expect(
1912 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1913 pool.workerChoiceStrategyContext.workerChoiceStrategy
1914 ).nextWorkerNodeKey
1915 ).toBe(0)
1916 expect(
1917 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1918 pool.workerChoiceStrategyContext.workerChoiceStrategy
1919 ).defaultWorkerWeight
1920 ).toBeGreaterThan(0)
1921 expect(
1922 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1923 workerChoiceStrategy
1924 ).roundWeights
1925 ).toStrictEqual([
1926 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1927 pool.workerChoiceStrategyContext.workerChoiceStrategy
1928 ).defaultWorkerWeight
1929 ])
1930 await pool.destroy()
1931 pool = new DynamicThreadPool(
1932 min,
1933 max,
1934 './tests/worker-files/thread/testWorker.js'
1935 )
1936 expect(
1937 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1938 workerChoiceStrategy
1939 ).roundId
1940 ).toBeDefined()
1941 expect(
1942 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1943 workerChoiceStrategy
1944 ).nextWorkerNodeKey
1945 ).toBeDefined()
1946 expect(
1947 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1948 workerChoiceStrategy
1949 ).defaultWorkerWeight
1950 ).toBeDefined()
1951 expect(
1952 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1953 workerChoiceStrategy
1954 ).roundWeights
1955 ).toBeDefined()
1956 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1957 expect(
1958 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1959 pool.workerChoiceStrategyContext.workerChoiceStrategy
1960 ).nextWorkerNodeKey
1961 ).toBe(0)
1962 expect(
1963 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1964 pool.workerChoiceStrategyContext.workerChoiceStrategy
1965 ).defaultWorkerWeight
1966 ).toBeGreaterThan(0)
1967 expect(
1968 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1969 workerChoiceStrategy
1970 ).roundWeights
1971 ).toStrictEqual([
1972 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1973 pool.workerChoiceStrategyContext.workerChoiceStrategy
1974 ).defaultWorkerWeight
1975 ])
1976 // We need to clean up the resources after our test
1977 await pool.destroy()
1978 })
1979
1980 it('Verify unknown strategy throw error', () => {
1981 expect(
1982 () =>
1983 new DynamicThreadPool(
1984 min,
1985 max,
1986 './tests/worker-files/thread/testWorker.js',
1987 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
1988 )
1989 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
1990 })
1991 })