fix: refine pool statuses semantic
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 PoolEvents,
7 WorkerChoiceStrategies
8 } = require('../../../lib')
9 const { CircularArray } = require('../../../lib/circular-array')
10 const { waitPoolEvents } = require('../../test-utils')
11
12 describe('Selection strategies test suite', () => {
13 const min = 0
14 const max = 3
15
16 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
17 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
18 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
19 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
20 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
21 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
22 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
23 'WEIGHTED_ROUND_ROBIN'
24 )
25 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
26 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
27 )
28 })
29
30 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
31 const pool = new DynamicThreadPool(
32 min,
33 max,
34 './tests/worker-files/thread/testWorker.js'
35 )
36 expect(pool.opts.workerChoiceStrategy).toBe(
37 WorkerChoiceStrategies.ROUND_ROBIN
38 )
39 // We need to clean up the resources after our test
40 await pool.destroy()
41 })
42
43 it('Verify available strategies are taken at pool creation', async () => {
44 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
45 const pool = new FixedThreadPool(
46 max,
47 './tests/worker-files/thread/testWorker.js',
48 { workerChoiceStrategy }
49 )
50 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
51 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
52 workerChoiceStrategy
53 )
54 await pool.destroy()
55 }
56 })
57
58 it('Verify available strategies can be set after pool creation', async () => {
59 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
60 const pool = new DynamicThreadPool(
61 min,
62 max,
63 './tests/worker-files/thread/testWorker.js'
64 )
65 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
66 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
67 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
68 workerChoiceStrategy
69 )
70 await pool.destroy()
71 }
72 })
73
74 it('Verify available strategies default internals at pool creation', async () => {
75 const pool = new FixedThreadPool(
76 max,
77 './tests/worker-files/thread/testWorker.js'
78 )
79 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
80 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
81 expect(
82 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
83 workerChoiceStrategy
84 ).nextWorkerNodeKey
85 ).toBe(0)
86 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
87 expect(
88 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
89 workerChoiceStrategy
90 ).workersVirtualTaskEndTimestamp
91 ).toBeInstanceOf(Array)
92 expect(
93 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
94 workerChoiceStrategy
95 ).workersVirtualTaskEndTimestamp.length
96 ).toBe(0)
97 } else if (
98 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
99 ) {
100 expect(
101 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
102 workerChoiceStrategy
103 ).nextWorkerNodeKey
104 ).toBe(0)
105 expect(
106 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
107 workerChoiceStrategy
108 ).defaultWorkerWeight
109 ).toBeGreaterThan(0)
110 expect(
111 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
112 workerChoiceStrategy
113 ).workerVirtualTaskRunTime
114 ).toBe(0)
115 }
116 }
117 await pool.destroy()
118 })
119
120 it('Verify ROUND_ROBIN strategy default policy', async () => {
121 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
122 let pool = new FixedThreadPool(
123 max,
124 './tests/worker-files/thread/testWorker.js',
125 { workerChoiceStrategy }
126 )
127 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
128 useDynamicWorker: true
129 })
130 await pool.destroy()
131 pool = new DynamicThreadPool(
132 min,
133 max,
134 './tests/worker-files/thread/testWorker.js',
135 { workerChoiceStrategy }
136 )
137 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
138 useDynamicWorker: true
139 })
140 // We need to clean up the resources after our test
141 await pool.destroy()
142 })
143
144 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
145 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
146 let pool = new FixedThreadPool(
147 max,
148 './tests/worker-files/thread/testWorker.js',
149 { workerChoiceStrategy }
150 )
151 expect(
152 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
153 ).toStrictEqual({
154 runTime: {
155 aggregate: false,
156 average: false,
157 median: false
158 },
159 waitTime: {
160 aggregate: false,
161 average: false,
162 median: false
163 },
164 elu: {
165 aggregate: false,
166 average: false,
167 median: false
168 }
169 })
170 await pool.destroy()
171 pool = new DynamicThreadPool(
172 min,
173 max,
174 './tests/worker-files/thread/testWorker.js',
175 { workerChoiceStrategy }
176 )
177 expect(
178 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
179 ).toStrictEqual({
180 runTime: {
181 aggregate: false,
182 average: false,
183 median: false
184 },
185 waitTime: {
186 aggregate: false,
187 average: false,
188 median: false
189 },
190 elu: {
191 aggregate: false,
192 average: false,
193 median: false
194 }
195 })
196 // We need to clean up the resources after our test
197 await pool.destroy()
198 })
199
200 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
201 const pool = new FixedThreadPool(
202 max,
203 './tests/worker-files/thread/testWorker.js',
204 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
205 )
206 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
207 const promises = new Set()
208 const maxMultiplier = 2
209 for (let i = 0; i < max * maxMultiplier; i++) {
210 promises.add(pool.execute())
211 }
212 await Promise.all(promises)
213 for (const workerNode of pool.workerNodes) {
214 expect(workerNode.usage).toStrictEqual({
215 tasks: {
216 executed: maxMultiplier,
217 executing: 0,
218 queued: 0,
219 maxQueued: 0,
220 failed: 0
221 },
222 runTime: {
223 history: expect.any(CircularArray)
224 },
225 waitTime: {
226 history: expect.any(CircularArray)
227 },
228 elu: {
229 idle: {
230 history: expect.any(CircularArray)
231 },
232 active: {
233 history: expect.any(CircularArray)
234 }
235 }
236 })
237 }
238 expect(
239 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
240 WorkerChoiceStrategies.ROUND_ROBIN
241 ).nextWorkerNodeKey
242 ).toBe(0)
243 // We need to clean up the resources after our test
244 await pool.destroy()
245 })
246
247 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
248 const pool = new DynamicThreadPool(
249 min,
250 max,
251 './tests/worker-files/thread/testWorker.js',
252 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
253 )
254 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
255 const promises = new Set()
256 const maxMultiplier = 2
257 for (let i = 0; i < max * maxMultiplier; i++) {
258 promises.add(pool.execute())
259 }
260 await Promise.all(promises)
261 for (const workerNode of pool.workerNodes) {
262 expect(workerNode.usage).toStrictEqual({
263 tasks: {
264 executed: maxMultiplier,
265 executing: 0,
266 queued: 0,
267 maxQueued: 0,
268 failed: 0
269 },
270 runTime: {
271 history: expect.any(CircularArray)
272 },
273 waitTime: {
274 history: expect.any(CircularArray)
275 },
276 elu: {
277 idle: {
278 history: expect.any(CircularArray)
279 },
280 active: {
281 history: expect.any(CircularArray)
282 }
283 }
284 })
285 }
286 expect(
287 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
288 WorkerChoiceStrategies.ROUND_ROBIN
289 ).nextWorkerNodeKey
290 ).toBe(0)
291 // We need to clean up the resources after our test
292 await pool.destroy()
293 })
294
295 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
296 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
297 let pool = new FixedClusterPool(
298 max,
299 './tests/worker-files/cluster/testWorker.js',
300 { workerChoiceStrategy }
301 )
302 let results = new Set()
303 for (let i = 0; i < max; i++) {
304 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
305 }
306 expect(results.size).toBe(max)
307 await pool.destroy()
308 pool = new FixedThreadPool(
309 max,
310 './tests/worker-files/thread/testWorker.js',
311 { workerChoiceStrategy }
312 )
313 results = new Set()
314 for (let i = 0; i < max; i++) {
315 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
316 }
317 expect(results.size).toBe(max)
318 await pool.destroy()
319 })
320
321 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
322 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
323 let pool = new FixedThreadPool(
324 max,
325 './tests/worker-files/thread/testWorker.js',
326 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
327 )
328 expect(
329 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
330 workerChoiceStrategy
331 ).nextWorkerNodeKey
332 ).toBeDefined()
333 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
334 expect(
335 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
336 pool.workerChoiceStrategyContext.workerChoiceStrategy
337 ).nextWorkerNodeKey
338 ).toBe(0)
339 await pool.destroy()
340 pool = new DynamicThreadPool(
341 min,
342 max,
343 './tests/worker-files/thread/testWorker.js',
344 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
345 )
346 expect(
347 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
348 workerChoiceStrategy
349 ).nextWorkerNodeKey
350 ).toBeDefined()
351 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
352 expect(
353 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
354 pool.workerChoiceStrategyContext.workerChoiceStrategy
355 ).nextWorkerNodeKey
356 ).toBe(0)
357 // We need to clean up the resources after our test
358 await pool.destroy()
359 })
360
361 it('Verify LEAST_USED strategy default policy', async () => {
362 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
363 let pool = new FixedThreadPool(
364 max,
365 './tests/worker-files/thread/testWorker.js',
366 { workerChoiceStrategy }
367 )
368 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
369 useDynamicWorker: false
370 })
371 await pool.destroy()
372 pool = new DynamicThreadPool(
373 min,
374 max,
375 './tests/worker-files/thread/testWorker.js',
376 { workerChoiceStrategy }
377 )
378 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
379 useDynamicWorker: false
380 })
381 // We need to clean up the resources after our test
382 await pool.destroy()
383 })
384
385 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
386 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
387 let pool = new FixedThreadPool(
388 max,
389 './tests/worker-files/thread/testWorker.js',
390 { workerChoiceStrategy }
391 )
392 expect(
393 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 ).toStrictEqual({
395 runTime: {
396 aggregate: false,
397 average: false,
398 median: false
399 },
400 waitTime: {
401 aggregate: false,
402 average: false,
403 median: false
404 },
405 elu: {
406 aggregate: false,
407 average: false,
408 median: false
409 }
410 })
411 await pool.destroy()
412 pool = new DynamicThreadPool(
413 min,
414 max,
415 './tests/worker-files/thread/testWorker.js',
416 { workerChoiceStrategy }
417 )
418 expect(
419 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
420 ).toStrictEqual({
421 runTime: {
422 aggregate: false,
423 average: false,
424 median: false
425 },
426 waitTime: {
427 aggregate: false,
428 average: false,
429 median: false
430 },
431 elu: {
432 aggregate: false,
433 average: false,
434 median: false
435 }
436 })
437 // We need to clean up the resources after our test
438 await pool.destroy()
439 })
440
441 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
442 const pool = new FixedThreadPool(
443 max,
444 './tests/worker-files/thread/testWorker.js',
445 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
446 )
447 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
448 const promises = new Set()
449 const maxMultiplier = 2
450 for (let i = 0; i < max * maxMultiplier; i++) {
451 promises.add(pool.execute())
452 }
453 await Promise.all(promises)
454 for (const workerNode of pool.workerNodes) {
455 expect(workerNode.usage).toStrictEqual({
456 tasks: {
457 executed: expect.any(Number),
458 executing: 0,
459 queued: 0,
460 maxQueued: 0,
461 failed: 0
462 },
463 runTime: {
464 history: expect.any(CircularArray)
465 },
466 waitTime: {
467 history: expect.any(CircularArray)
468 },
469 elu: {
470 idle: {
471 history: expect.any(CircularArray)
472 },
473 active: {
474 history: expect.any(CircularArray)
475 }
476 }
477 })
478 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
479 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
480 max * maxMultiplier
481 )
482 }
483 // We need to clean up the resources after our test
484 await pool.destroy()
485 })
486
487 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
488 const pool = new DynamicThreadPool(
489 min,
490 max,
491 './tests/worker-files/thread/testWorker.js',
492 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
493 )
494 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
495 const promises = new Set()
496 const maxMultiplier = 2
497 for (let i = 0; i < max * maxMultiplier; i++) {
498 promises.add(pool.execute())
499 }
500 await Promise.all(promises)
501 for (const workerNode of pool.workerNodes) {
502 expect(workerNode.usage).toStrictEqual({
503 tasks: {
504 executed: expect.any(Number),
505 executing: 0,
506 queued: 0,
507 maxQueued: 0,
508 failed: 0
509 },
510 runTime: {
511 history: expect.any(CircularArray)
512 },
513 waitTime: {
514 history: expect.any(CircularArray)
515 },
516 elu: {
517 idle: {
518 history: expect.any(CircularArray)
519 },
520 active: {
521 history: expect.any(CircularArray)
522 }
523 }
524 })
525 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
526 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
527 max * maxMultiplier
528 )
529 }
530 // We need to clean up the resources after our test
531 await pool.destroy()
532 })
533
534 it('Verify LEAST_BUSY strategy default policy', async () => {
535 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
536 let pool = new FixedThreadPool(
537 max,
538 './tests/worker-files/thread/testWorker.js',
539 { workerChoiceStrategy }
540 )
541 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
542 useDynamicWorker: false
543 })
544 await pool.destroy()
545 pool = new DynamicThreadPool(
546 min,
547 max,
548 './tests/worker-files/thread/testWorker.js',
549 { workerChoiceStrategy }
550 )
551 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
552 useDynamicWorker: false
553 })
554 // We need to clean up the resources after our test
555 await pool.destroy()
556 })
557
558 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
559 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
560 let pool = new FixedThreadPool(
561 max,
562 './tests/worker-files/thread/testWorker.js',
563 { workerChoiceStrategy }
564 )
565 expect(
566 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
567 ).toStrictEqual({
568 runTime: {
569 aggregate: true,
570 average: false,
571 median: false
572 },
573 waitTime: {
574 aggregate: true,
575 average: false,
576 median: false
577 },
578 elu: {
579 aggregate: false,
580 average: false,
581 median: false
582 }
583 })
584 await pool.destroy()
585 pool = new DynamicThreadPool(
586 min,
587 max,
588 './tests/worker-files/thread/testWorker.js',
589 { workerChoiceStrategy }
590 )
591 expect(
592 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
593 ).toStrictEqual({
594 runTime: {
595 aggregate: true,
596 average: false,
597 median: false
598 },
599 waitTime: {
600 aggregate: true,
601 average: false,
602 median: false
603 },
604 elu: {
605 aggregate: false,
606 average: false,
607 median: false
608 }
609 })
610 // We need to clean up the resources after our test
611 await pool.destroy()
612 })
613
614 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
615 const pool = new FixedThreadPool(
616 max,
617 './tests/worker-files/thread/testWorker.js',
618 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
619 )
620 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
621 const promises = new Set()
622 const maxMultiplier = 2
623 for (let i = 0; i < max * maxMultiplier; i++) {
624 promises.add(pool.execute())
625 }
626 await Promise.all(promises)
627 for (const workerNode of pool.workerNodes) {
628 expect(workerNode.usage).toMatchObject({
629 tasks: {
630 executed: expect.any(Number),
631 executing: 0,
632 queued: 0,
633 maxQueued: 0,
634 failed: 0
635 },
636 runTime: {
637 history: expect.any(CircularArray)
638 },
639 waitTime: {
640 history: expect.any(CircularArray)
641 },
642 elu: {
643 idle: {
644 history: expect.any(CircularArray)
645 },
646 active: {
647 history: expect.any(CircularArray)
648 }
649 }
650 })
651 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
652 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
653 max * maxMultiplier
654 )
655 if (workerNode.usage.runTime.aggregate == null) {
656 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
657 } else {
658 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
659 }
660 if (workerNode.usage.waitTime.aggregate == null) {
661 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
662 } else {
663 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
664 }
665 }
666 // We need to clean up the resources after our test
667 await pool.destroy()
668 })
669
670 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
671 const pool = new DynamicThreadPool(
672 min,
673 max,
674 './tests/worker-files/thread/testWorker.js',
675 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
676 )
677 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
678 const promises = new Set()
679 const maxMultiplier = 2
680 for (let i = 0; i < max * maxMultiplier; i++) {
681 promises.add(pool.execute())
682 }
683 await Promise.all(promises)
684 for (const workerNode of pool.workerNodes) {
685 expect(workerNode.usage).toMatchObject({
686 tasks: {
687 executed: expect.any(Number),
688 executing: 0,
689 queued: 0,
690 maxQueued: 0,
691 failed: 0
692 },
693 runTime: {
694 history: expect.any(CircularArray)
695 },
696 waitTime: {
697 history: expect.any(CircularArray)
698 },
699 elu: {
700 idle: {
701 history: expect.any(CircularArray)
702 },
703 active: {
704 history: expect.any(CircularArray)
705 }
706 }
707 })
708 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
709 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
710 max * maxMultiplier
711 )
712 if (workerNode.usage.runTime.aggregate == null) {
713 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
714 } else {
715 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
716 }
717 if (workerNode.usage.waitTime.aggregate == null) {
718 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
719 } else {
720 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
721 }
722 }
723 // We need to clean up the resources after our test
724 await pool.destroy()
725 })
726
727 it('Verify LEAST_ELU strategy default policy', async () => {
728 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
729 let pool = new FixedThreadPool(
730 max,
731 './tests/worker-files/thread/testWorker.js',
732 { workerChoiceStrategy }
733 )
734 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
735 useDynamicWorker: false
736 })
737 await pool.destroy()
738 pool = new DynamicThreadPool(
739 min,
740 max,
741 './tests/worker-files/thread/testWorker.js',
742 { workerChoiceStrategy }
743 )
744 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
745 useDynamicWorker: false
746 })
747 // We need to clean up the resources after our test
748 await pool.destroy()
749 })
750
751 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
752 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
753 let pool = new FixedThreadPool(
754 max,
755 './tests/worker-files/thread/testWorker.js',
756 { workerChoiceStrategy }
757 )
758 expect(
759 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
760 ).toStrictEqual({
761 runTime: {
762 aggregate: false,
763 average: false,
764 median: false
765 },
766 waitTime: {
767 aggregate: false,
768 average: false,
769 median: false
770 },
771 elu: {
772 aggregate: true,
773 average: false,
774 median: false
775 }
776 })
777 await pool.destroy()
778 pool = new DynamicThreadPool(
779 min,
780 max,
781 './tests/worker-files/thread/testWorker.js',
782 { workerChoiceStrategy }
783 )
784 expect(
785 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
786 ).toStrictEqual({
787 runTime: {
788 aggregate: false,
789 average: false,
790 median: false
791 },
792 waitTime: {
793 aggregate: false,
794 average: false,
795 median: false
796 },
797 elu: {
798 aggregate: true,
799 average: false,
800 median: false
801 }
802 })
803 // We need to clean up the resources after our test
804 await pool.destroy()
805 })
806
807 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
808 const pool = new FixedThreadPool(
809 max,
810 './tests/worker-files/thread/testWorker.js',
811 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
812 )
813 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
814 const promises = new Set()
815 const maxMultiplier = 2
816 for (let i = 0; i < max * maxMultiplier; i++) {
817 promises.add(pool.execute())
818 }
819 await Promise.all(promises)
820 for (const workerNode of pool.workerNodes) {
821 expect(workerNode.usage).toMatchObject({
822 tasks: {
823 executed: expect.any(Number),
824 executing: 0,
825 queued: 0,
826 maxQueued: 0,
827 failed: 0
828 },
829 runTime: {
830 history: expect.any(CircularArray)
831 },
832 waitTime: {
833 history: expect.any(CircularArray)
834 },
835 elu: {
836 idle: expect.objectContaining({
837 history: expect.any(CircularArray)
838 }),
839 active: expect.objectContaining({
840 history: expect.any(CircularArray)
841 })
842 }
843 })
844 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
845 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
846 max * maxMultiplier
847 )
848 if (workerNode.usage.elu.utilization == null) {
849 expect(workerNode.usage.elu.utilization).toBeUndefined()
850 } else {
851 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
852 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
853 }
854 }
855 // We need to clean up the resources after our test
856 await pool.destroy()
857 })
858
859 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
860 const pool = new DynamicThreadPool(
861 min,
862 max,
863 './tests/worker-files/thread/testWorker.js',
864 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
865 )
866 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
867 const promises = new Set()
868 const maxMultiplier = 2
869 for (let i = 0; i < max * maxMultiplier; i++) {
870 promises.add(pool.execute())
871 }
872 await Promise.all(promises)
873 for (const workerNode of pool.workerNodes) {
874 expect(workerNode.usage).toMatchObject({
875 tasks: {
876 executed: expect.any(Number),
877 executing: 0,
878 queued: 0,
879 maxQueued: 0,
880 failed: 0
881 },
882 runTime: {
883 history: expect.any(CircularArray)
884 },
885 waitTime: {
886 history: expect.any(CircularArray)
887 },
888 elu: {
889 idle: expect.objectContaining({
890 history: expect.any(CircularArray)
891 }),
892 active: expect.objectContaining({
893 history: expect.any(CircularArray)
894 })
895 }
896 })
897 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
898 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
899 max * maxMultiplier
900 )
901 if (workerNode.usage.elu.utilization == null) {
902 expect(workerNode.usage.elu.utilization).toBeUndefined()
903 } else {
904 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
905 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
906 }
907 }
908 // We need to clean up the resources after our test
909 await pool.destroy()
910 })
911
912 it('Verify FAIR_SHARE strategy default policy', async () => {
913 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
914 let pool = new FixedThreadPool(
915 max,
916 './tests/worker-files/thread/testWorker.js',
917 { workerChoiceStrategy }
918 )
919 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
920 useDynamicWorker: false
921 })
922 await pool.destroy()
923 pool = new DynamicThreadPool(
924 min,
925 max,
926 './tests/worker-files/thread/testWorker.js',
927 { workerChoiceStrategy }
928 )
929 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
930 useDynamicWorker: false
931 })
932 // We need to clean up the resources after our test
933 await pool.destroy()
934 })
935
936 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
937 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
938 let pool = new FixedThreadPool(
939 max,
940 './tests/worker-files/thread/testWorker.js',
941 { workerChoiceStrategy }
942 )
943 expect(
944 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
945 ).toStrictEqual({
946 runTime: {
947 aggregate: true,
948 average: true,
949 median: false
950 },
951 waitTime: {
952 aggregate: false,
953 average: false,
954 median: false
955 },
956 elu: {
957 aggregate: true,
958 average: true,
959 median: false
960 }
961 })
962 await pool.destroy()
963 pool = new DynamicThreadPool(
964 min,
965 max,
966 './tests/worker-files/thread/testWorker.js',
967 { workerChoiceStrategy }
968 )
969 expect(
970 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
971 ).toStrictEqual({
972 runTime: {
973 aggregate: true,
974 average: true,
975 median: false
976 },
977 waitTime: {
978 aggregate: false,
979 average: false,
980 median: false
981 },
982 elu: {
983 aggregate: true,
984 average: true,
985 median: false
986 }
987 })
988 // We need to clean up the resources after our test
989 await pool.destroy()
990 })
991
992 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
993 const pool = new FixedThreadPool(
994 max,
995 './tests/worker-files/thread/testWorker.js',
996 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
997 )
998 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
999 const promises = new Set()
1000 const maxMultiplier = 2
1001 for (let i = 0; i < max * maxMultiplier; i++) {
1002 promises.add(pool.execute())
1003 }
1004 await Promise.all(promises)
1005 for (const workerNode of pool.workerNodes) {
1006 expect(workerNode.usage).toMatchObject({
1007 tasks: {
1008 executed: expect.any(Number),
1009 executing: 0,
1010 queued: 0,
1011 maxQueued: 0,
1012 failed: 0
1013 },
1014 runTime: expect.objectContaining({
1015 history: expect.any(CircularArray)
1016 }),
1017 waitTime: {
1018 history: expect.any(CircularArray)
1019 },
1020 elu: {
1021 idle: expect.objectContaining({
1022 history: expect.any(CircularArray)
1023 }),
1024 active: expect.objectContaining({
1025 history: expect.any(CircularArray)
1026 })
1027 }
1028 })
1029 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1030 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1031 max * maxMultiplier
1032 )
1033 if (workerNode.usage.runTime.aggregate == null) {
1034 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1035 } else {
1036 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1037 }
1038 if (workerNode.usage.runTime.average == null) {
1039 expect(workerNode.usage.runTime.average).toBeUndefined()
1040 } else {
1041 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1042 }
1043 if (workerNode.usage.elu.utilization == null) {
1044 expect(workerNode.usage.elu.utilization).toBeUndefined()
1045 } else {
1046 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1047 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1048 }
1049 }
1050 expect(
1051 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1052 pool.workerChoiceStrategyContext.workerChoiceStrategy
1053 ).workersVirtualTaskEndTimestamp.length
1054 ).toBe(pool.workerNodes.length)
1055 // We need to clean up the resources after our test
1056 await pool.destroy()
1057 })
1058
1059 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1060 const pool = new DynamicThreadPool(
1061 min,
1062 max,
1063 './tests/worker-files/thread/testWorker.js',
1064 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1065 )
1066 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1067 const promises = new Set()
1068 const maxMultiplier = 2
1069 for (let i = 0; i < max * maxMultiplier; i++) {
1070 promises.add(pool.execute())
1071 }
1072 await Promise.all(promises)
1073 for (const workerNode of pool.workerNodes) {
1074 expect(workerNode.usage).toMatchObject({
1075 tasks: {
1076 executed: expect.any(Number),
1077 executing: 0,
1078 queued: 0,
1079 maxQueued: 0,
1080 failed: 0
1081 },
1082 runTime: expect.objectContaining({
1083 history: expect.any(CircularArray)
1084 }),
1085 waitTime: {
1086 history: expect.any(CircularArray)
1087 },
1088 elu: {
1089 idle: expect.objectContaining({
1090 history: expect.any(CircularArray)
1091 }),
1092 active: expect.objectContaining({
1093 history: expect.any(CircularArray)
1094 })
1095 }
1096 })
1097 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1098 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1099 max * maxMultiplier
1100 )
1101 if (workerNode.usage.runTime.aggregate == null) {
1102 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1103 } else {
1104 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1105 }
1106 if (workerNode.usage.runTime.average == null) {
1107 expect(workerNode.usage.runTime.average).toBeUndefined()
1108 } else {
1109 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1110 }
1111 if (workerNode.usage.elu.utilization == null) {
1112 expect(workerNode.usage.elu.utilization).toBeUndefined()
1113 } else {
1114 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1115 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1116 }
1117 }
1118 expect(
1119 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1120 pool.workerChoiceStrategyContext.workerChoiceStrategy
1121 ).workersVirtualTaskEndTimestamp.length
1122 ).toBe(pool.workerNodes.length)
1123 // We need to clean up the resources after our test
1124 await pool.destroy()
1125 })
1126
1127 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1128 const pool = new DynamicThreadPool(
1129 min,
1130 max,
1131 './tests/worker-files/thread/testWorker.js',
1132 {
1133 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1134 workerChoiceStrategyOptions: {
1135 runTime: { median: true }
1136 }
1137 }
1138 )
1139 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1140 const promises = new Set()
1141 const maxMultiplier = 2
1142 for (let i = 0; i < max * maxMultiplier; i++) {
1143 promises.add(pool.execute())
1144 }
1145 await Promise.all(promises)
1146 for (const workerNode of pool.workerNodes) {
1147 expect(workerNode.usage).toMatchObject({
1148 tasks: {
1149 executed: expect.any(Number),
1150 executing: 0,
1151 queued: 0,
1152 maxQueued: 0,
1153 failed: 0
1154 },
1155 runTime: expect.objectContaining({
1156 history: expect.any(CircularArray)
1157 }),
1158 waitTime: {
1159 history: expect.any(CircularArray)
1160 },
1161 elu: {
1162 idle: expect.objectContaining({
1163 history: expect.any(CircularArray)
1164 }),
1165 active: expect.objectContaining({
1166 history: expect.any(CircularArray)
1167 })
1168 }
1169 })
1170 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1171 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1172 max * maxMultiplier
1173 )
1174 if (workerNode.usage.runTime.aggregate == null) {
1175 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1176 } else {
1177 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1178 }
1179 if (workerNode.usage.runTime.median == null) {
1180 expect(workerNode.usage.runTime.median).toBeUndefined()
1181 } else {
1182 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1183 }
1184 if (workerNode.usage.elu.utilization == null) {
1185 expect(workerNode.usage.elu.utilization).toBeUndefined()
1186 } else {
1187 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1188 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1189 }
1190 }
1191 expect(
1192 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1193 pool.workerChoiceStrategyContext.workerChoiceStrategy
1194 ).workersVirtualTaskEndTimestamp.length
1195 ).toBe(pool.workerNodes.length)
1196 // We need to clean up the resources after our test
1197 await pool.destroy()
1198 })
1199
1200 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1201 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1202 let pool = new FixedThreadPool(
1203 max,
1204 './tests/worker-files/thread/testWorker.js'
1205 )
1206 expect(
1207 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1208 workerChoiceStrategy
1209 ).workersVirtualTaskEndTimestamp
1210 ).toBeInstanceOf(Array)
1211 expect(
1212 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1213 workerChoiceStrategy
1214 ).workersVirtualTaskEndTimestamp.length
1215 ).toBe(0)
1216 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1217 workerChoiceStrategy
1218 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1219 expect(
1220 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1221 workerChoiceStrategy
1222 ).workersVirtualTaskEndTimestamp.length
1223 ).toBe(1)
1224 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1225 expect(
1226 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1227 workerChoiceStrategy
1228 ).workersVirtualTaskEndTimestamp
1229 ).toBeInstanceOf(Array)
1230 expect(
1231 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1232 workerChoiceStrategy
1233 ).workersVirtualTaskEndTimestamp.length
1234 ).toBe(0)
1235 await pool.destroy()
1236 pool = new DynamicThreadPool(
1237 min,
1238 max,
1239 './tests/worker-files/thread/testWorker.js'
1240 )
1241 expect(
1242 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1243 workerChoiceStrategy
1244 ).workersVirtualTaskEndTimestamp
1245 ).toBeInstanceOf(Array)
1246 expect(
1247 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1248 workerChoiceStrategy
1249 ).workersVirtualTaskEndTimestamp.length
1250 ).toBe(0)
1251 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1252 workerChoiceStrategy
1253 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1254 expect(
1255 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1256 workerChoiceStrategy
1257 ).workersVirtualTaskEndTimestamp.length
1258 ).toBe(1)
1259 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1260 expect(
1261 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1262 workerChoiceStrategy
1263 ).workersVirtualTaskEndTimestamp
1264 ).toBeInstanceOf(Array)
1265 expect(
1266 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1267 workerChoiceStrategy
1268 ).workersVirtualTaskEndTimestamp.length
1269 ).toBe(0)
1270 // We need to clean up the resources after our test
1271 await pool.destroy()
1272 })
1273
1274 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1275 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1276 let pool = new FixedThreadPool(
1277 max,
1278 './tests/worker-files/thread/testWorker.js',
1279 { workerChoiceStrategy }
1280 )
1281 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1282 useDynamicWorker: true
1283 })
1284 await pool.destroy()
1285 pool = new DynamicThreadPool(
1286 min,
1287 max,
1288 './tests/worker-files/thread/testWorker.js',
1289 { workerChoiceStrategy }
1290 )
1291 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1292 useDynamicWorker: true
1293 })
1294 // We need to clean up the resources after our test
1295 await pool.destroy()
1296 })
1297
1298 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1299 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1300 let pool = new FixedThreadPool(
1301 max,
1302 './tests/worker-files/thread/testWorker.js',
1303 { workerChoiceStrategy }
1304 )
1305 expect(
1306 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1307 ).toStrictEqual({
1308 runTime: {
1309 aggregate: true,
1310 average: true,
1311 median: false
1312 },
1313 waitTime: {
1314 aggregate: false,
1315 average: false,
1316 median: false
1317 },
1318 elu: {
1319 aggregate: false,
1320 average: false,
1321 median: false
1322 }
1323 })
1324 await pool.destroy()
1325 pool = new DynamicThreadPool(
1326 min,
1327 max,
1328 './tests/worker-files/thread/testWorker.js',
1329 { workerChoiceStrategy }
1330 )
1331 expect(
1332 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1333 ).toStrictEqual({
1334 runTime: {
1335 aggregate: true,
1336 average: true,
1337 median: false
1338 },
1339 waitTime: {
1340 aggregate: false,
1341 average: false,
1342 median: false
1343 },
1344 elu: {
1345 aggregate: false,
1346 average: false,
1347 median: false
1348 }
1349 })
1350 // We need to clean up the resources after our test
1351 await pool.destroy()
1352 })
1353
1354 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1355 const pool = new FixedThreadPool(
1356 max,
1357 './tests/worker-files/thread/testWorker.js',
1358 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1359 )
1360 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1361 const promises = new Set()
1362 const maxMultiplier = 2
1363 for (let i = 0; i < max * maxMultiplier; i++) {
1364 promises.add(pool.execute())
1365 }
1366 await Promise.all(promises)
1367 for (const workerNode of pool.workerNodes) {
1368 expect(workerNode.usage).toStrictEqual({
1369 tasks: {
1370 executed: expect.any(Number),
1371 executing: 0,
1372 queued: 0,
1373 maxQueued: 0,
1374 failed: 0
1375 },
1376 runTime: expect.objectContaining({
1377 history: expect.any(CircularArray)
1378 }),
1379 waitTime: {
1380 history: expect.any(CircularArray)
1381 },
1382 elu: {
1383 idle: {
1384 history: expect.any(CircularArray)
1385 },
1386 active: {
1387 history: expect.any(CircularArray)
1388 }
1389 }
1390 })
1391 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1392 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1393 max * maxMultiplier
1394 )
1395 if (workerNode.usage.runTime.aggregate == null) {
1396 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1397 } else {
1398 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1399 }
1400 if (workerNode.usage.runTime.average == null) {
1401 expect(workerNode.usage.runTime.average).toBeUndefined()
1402 } else {
1403 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1404 }
1405 }
1406 expect(
1407 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1408 pool.workerChoiceStrategyContext.workerChoiceStrategy
1409 ).defaultWorkerWeight
1410 ).toBeGreaterThan(0)
1411 expect(
1412 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1413 pool.workerChoiceStrategyContext.workerChoiceStrategy
1414 ).workerVirtualTaskRunTime
1415 ).toBeGreaterThanOrEqual(0)
1416 // We need to clean up the resources after our test
1417 await pool.destroy()
1418 })
1419
1420 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1421 const pool = new DynamicThreadPool(
1422 min,
1423 max,
1424 './tests/worker-files/thread/testWorker.js',
1425 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1426 )
1427 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1428 const promises = new Set()
1429 const maxMultiplier = 2
1430 for (let i = 0; i < max * maxMultiplier; i++) {
1431 promises.add(pool.execute())
1432 }
1433 await Promise.all(promises)
1434 for (const workerNode of pool.workerNodes) {
1435 expect(workerNode.usage).toStrictEqual({
1436 tasks: {
1437 executed: expect.any(Number),
1438 executing: 0,
1439 queued: 0,
1440 maxQueued: 0,
1441 failed: 0
1442 },
1443 runTime: {
1444 aggregate: expect.any(Number),
1445 maximum: expect.any(Number),
1446 minimum: expect.any(Number),
1447 average: expect.any(Number),
1448 history: expect.any(CircularArray)
1449 },
1450 waitTime: {
1451 history: expect.any(CircularArray)
1452 },
1453 elu: {
1454 idle: {
1455 history: expect.any(CircularArray)
1456 },
1457 active: {
1458 history: expect.any(CircularArray)
1459 }
1460 }
1461 })
1462 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1463 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1464 max * maxMultiplier
1465 )
1466 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1467 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1468 }
1469 expect(
1470 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1471 pool.workerChoiceStrategyContext.workerChoiceStrategy
1472 ).defaultWorkerWeight
1473 ).toBeGreaterThan(0)
1474 expect(
1475 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1476 pool.workerChoiceStrategyContext.workerChoiceStrategy
1477 ).workerVirtualTaskRunTime
1478 ).toBeGreaterThanOrEqual(0)
1479 // We need to clean up the resources after our test
1480 await pool.destroy()
1481 })
1482
1483 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1484 const pool = new DynamicThreadPool(
1485 min,
1486 max,
1487 './tests/worker-files/thread/testWorker.js',
1488 {
1489 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1490 workerChoiceStrategyOptions: {
1491 runTime: { median: true }
1492 }
1493 }
1494 )
1495 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1496 const promises = new Set()
1497 const maxMultiplier = 2
1498 for (let i = 0; i < max * maxMultiplier; i++) {
1499 promises.add(pool.execute())
1500 }
1501 await Promise.all(promises)
1502 for (const workerNode of pool.workerNodes) {
1503 expect(workerNode.usage).toStrictEqual({
1504 tasks: {
1505 executed: expect.any(Number),
1506 executing: 0,
1507 queued: 0,
1508 maxQueued: 0,
1509 failed: 0
1510 },
1511 runTime: {
1512 aggregate: expect.any(Number),
1513 maximum: expect.any(Number),
1514 minimum: expect.any(Number),
1515 median: expect.any(Number),
1516 history: expect.any(CircularArray)
1517 },
1518 waitTime: {
1519 history: expect.any(CircularArray)
1520 },
1521 elu: {
1522 idle: {
1523 history: expect.any(CircularArray)
1524 },
1525 active: {
1526 history: expect.any(CircularArray)
1527 }
1528 }
1529 })
1530 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1531 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1532 max * maxMultiplier
1533 )
1534 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1535 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1536 }
1537 expect(
1538 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1539 pool.workerChoiceStrategyContext.workerChoiceStrategy
1540 ).defaultWorkerWeight
1541 ).toBeGreaterThan(0)
1542 expect(
1543 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1544 pool.workerChoiceStrategyContext.workerChoiceStrategy
1545 ).workerVirtualTaskRunTime
1546 ).toBeGreaterThanOrEqual(0)
1547 // We need to clean up the resources after our test
1548 await pool.destroy()
1549 })
1550
1551 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1552 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1553 let pool = new FixedThreadPool(
1554 max,
1555 './tests/worker-files/thread/testWorker.js'
1556 )
1557 expect(
1558 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1559 workerChoiceStrategy
1560 ).nextWorkerNodeKey
1561 ).toBeDefined()
1562 expect(
1563 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1564 workerChoiceStrategy
1565 ).defaultWorkerWeight
1566 ).toBeDefined()
1567 expect(
1568 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1569 workerChoiceStrategy
1570 ).workerVirtualTaskRunTime
1571 ).toBeDefined()
1572 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1573 expect(
1574 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1575 pool.workerChoiceStrategyContext.workerChoiceStrategy
1576 ).nextWorkerNodeKey
1577 ).toBe(0)
1578 expect(
1579 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1580 pool.workerChoiceStrategyContext.workerChoiceStrategy
1581 ).defaultWorkerWeight
1582 ).toBeGreaterThan(0)
1583 expect(
1584 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1585 workerChoiceStrategy
1586 ).workerVirtualTaskRunTime
1587 ).toBe(0)
1588 await pool.destroy()
1589 pool = new DynamicThreadPool(
1590 min,
1591 max,
1592 './tests/worker-files/thread/testWorker.js'
1593 )
1594 expect(
1595 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1596 workerChoiceStrategy
1597 ).nextWorkerNodeKey
1598 ).toBeDefined()
1599 expect(
1600 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1601 workerChoiceStrategy
1602 ).defaultWorkerWeight
1603 ).toBeDefined()
1604 expect(
1605 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1606 workerChoiceStrategy
1607 ).workerVirtualTaskRunTime
1608 ).toBeDefined()
1609 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1610 expect(
1611 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1612 pool.workerChoiceStrategyContext.workerChoiceStrategy
1613 ).nextWorkerNodeKey
1614 ).toBe(0)
1615 expect(
1616 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1617 pool.workerChoiceStrategyContext.workerChoiceStrategy
1618 ).defaultWorkerWeight
1619 ).toBeGreaterThan(0)
1620 expect(
1621 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1622 workerChoiceStrategy
1623 ).workerVirtualTaskRunTime
1624 ).toBe(0)
1625 // We need to clean up the resources after our test
1626 await pool.destroy()
1627 })
1628
1629 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1630 const workerChoiceStrategy =
1631 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1632 let pool = new FixedThreadPool(
1633 max,
1634 './tests/worker-files/thread/testWorker.js',
1635 { workerChoiceStrategy }
1636 )
1637 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1638 useDynamicWorker: true
1639 })
1640 await pool.destroy()
1641 pool = new DynamicThreadPool(
1642 min,
1643 max,
1644 './tests/worker-files/thread/testWorker.js',
1645 { workerChoiceStrategy }
1646 )
1647 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1648 useDynamicWorker: true
1649 })
1650 // We need to clean up the resources after our test
1651 await pool.destroy()
1652 })
1653
1654 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1655 const workerChoiceStrategy =
1656 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1657 let pool = new FixedThreadPool(
1658 max,
1659 './tests/worker-files/thread/testWorker.js',
1660 { workerChoiceStrategy }
1661 )
1662 expect(
1663 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1664 ).toStrictEqual({
1665 runTime: {
1666 aggregate: false,
1667 average: false,
1668 median: false
1669 },
1670 waitTime: {
1671 aggregate: false,
1672 average: false,
1673 median: false
1674 },
1675 elu: {
1676 aggregate: false,
1677 average: false,
1678 median: false
1679 }
1680 })
1681 await pool.destroy()
1682 pool = new DynamicThreadPool(
1683 min,
1684 max,
1685 './tests/worker-files/thread/testWorker.js',
1686 { workerChoiceStrategy }
1687 )
1688 expect(
1689 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1690 ).toStrictEqual({
1691 runTime: {
1692 aggregate: false,
1693 average: false,
1694 median: false
1695 },
1696 waitTime: {
1697 aggregate: false,
1698 average: false,
1699 median: false
1700 },
1701 elu: {
1702 aggregate: false,
1703 average: false,
1704 median: false
1705 }
1706 })
1707 // We need to clean up the resources after our test
1708 await pool.destroy()
1709 })
1710
1711 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1712 const pool = new FixedThreadPool(
1713 max,
1714 './tests/worker-files/thread/testWorker.js',
1715 {
1716 workerChoiceStrategy:
1717 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1718 }
1719 )
1720 // FIXME: shall not be needed
1721 await waitPoolEvents(pool, PoolEvents.ready, 1)
1722 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1723 const promises = new Set()
1724 const maxMultiplier = 2
1725 for (let i = 0; i < max * maxMultiplier; i++) {
1726 promises.add(pool.execute())
1727 }
1728 await Promise.all(promises)
1729 for (const workerNode of pool.workerNodes) {
1730 expect(workerNode.usage).toStrictEqual({
1731 tasks: {
1732 executed: maxMultiplier,
1733 executing: 0,
1734 queued: 0,
1735 maxQueued: 0,
1736 failed: 0
1737 },
1738 runTime: {
1739 history: expect.any(CircularArray)
1740 },
1741 waitTime: {
1742 history: expect.any(CircularArray)
1743 },
1744 elu: {
1745 idle: {
1746 history: expect.any(CircularArray)
1747 },
1748 active: {
1749 history: expect.any(CircularArray)
1750 }
1751 }
1752 })
1753 }
1754 expect(
1755 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1756 pool.workerChoiceStrategyContext.workerChoiceStrategy
1757 ).defaultWorkerWeight
1758 ).toBeGreaterThan(0)
1759 expect(
1760 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1761 pool.workerChoiceStrategyContext.workerChoiceStrategy
1762 ).roundId
1763 ).toBe(0)
1764 expect(
1765 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1766 pool.workerChoiceStrategyContext.workerChoiceStrategy
1767 ).nextWorkerNodeKey
1768 ).toBe(0)
1769 expect(
1770 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1771 pool.workerChoiceStrategyContext.workerChoiceStrategy
1772 ).roundWeights
1773 ).toStrictEqual([
1774 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1775 pool.workerChoiceStrategyContext.workerChoiceStrategy
1776 ).defaultWorkerWeight
1777 ])
1778 // We need to clean up the resources after our test
1779 await pool.destroy()
1780 })
1781
1782 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1783 const pool = new DynamicThreadPool(
1784 min,
1785 max,
1786 './tests/worker-files/thread/testWorker.js',
1787 {
1788 workerChoiceStrategy:
1789 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1790 }
1791 )
1792 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1793 const promises = new Set()
1794 const maxMultiplier = 2
1795 for (let i = 0; i < max * maxMultiplier; i++) {
1796 promises.add(pool.execute())
1797 }
1798 await Promise.all(promises)
1799 for (const workerNode of pool.workerNodes) {
1800 expect(workerNode.usage).toStrictEqual({
1801 tasks: {
1802 executed: maxMultiplier,
1803 executing: 0,
1804 queued: 0,
1805 maxQueued: 0,
1806 failed: 0
1807 },
1808 runTime: {
1809 history: expect.any(CircularArray)
1810 },
1811 waitTime: {
1812 history: expect.any(CircularArray)
1813 },
1814 elu: {
1815 idle: {
1816 history: expect.any(CircularArray)
1817 },
1818 active: {
1819 history: expect.any(CircularArray)
1820 }
1821 }
1822 })
1823 }
1824 expect(
1825 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1826 pool.workerChoiceStrategyContext.workerChoiceStrategy
1827 ).defaultWorkerWeight
1828 ).toBeGreaterThan(0)
1829 expect(
1830 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategy
1832 ).roundId
1833 ).toBe(0)
1834 expect(
1835 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategy
1837 ).nextWorkerNodeKey
1838 ).toBe(0)
1839 expect(
1840 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1841 pool.workerChoiceStrategyContext.workerChoiceStrategy
1842 ).roundWeights
1843 ).toStrictEqual([
1844 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1845 pool.workerChoiceStrategyContext.workerChoiceStrategy
1846 ).defaultWorkerWeight
1847 ])
1848 // We need to clean up the resources after our test
1849 await pool.destroy()
1850 })
1851
1852 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1853 const workerChoiceStrategy =
1854 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1855 let pool = new FixedThreadPool(
1856 max,
1857 './tests/worker-files/thread/testWorker.js'
1858 )
1859 expect(
1860 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1861 workerChoiceStrategy
1862 ).roundId
1863 ).toBeDefined()
1864 expect(
1865 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1866 workerChoiceStrategy
1867 ).nextWorkerNodeKey
1868 ).toBeDefined()
1869 expect(
1870 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1871 workerChoiceStrategy
1872 ).defaultWorkerWeight
1873 ).toBeDefined()
1874 expect(
1875 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1876 workerChoiceStrategy
1877 ).roundWeights
1878 ).toBeDefined()
1879 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1880 expect(
1881 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1882 workerChoiceStrategy
1883 ).roundId
1884 ).toBe(0)
1885 expect(
1886 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1887 pool.workerChoiceStrategyContext.workerChoiceStrategy
1888 ).nextWorkerNodeKey
1889 ).toBe(0)
1890 expect(
1891 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1892 pool.workerChoiceStrategyContext.workerChoiceStrategy
1893 ).defaultWorkerWeight
1894 ).toBeGreaterThan(0)
1895 expect(
1896 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1897 workerChoiceStrategy
1898 ).roundWeights
1899 ).toStrictEqual([
1900 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1901 pool.workerChoiceStrategyContext.workerChoiceStrategy
1902 ).defaultWorkerWeight
1903 ])
1904 await pool.destroy()
1905 pool = new DynamicThreadPool(
1906 min,
1907 max,
1908 './tests/worker-files/thread/testWorker.js'
1909 )
1910 expect(
1911 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1912 workerChoiceStrategy
1913 ).roundId
1914 ).toBeDefined()
1915 expect(
1916 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1917 workerChoiceStrategy
1918 ).nextWorkerNodeKey
1919 ).toBeDefined()
1920 expect(
1921 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1922 workerChoiceStrategy
1923 ).defaultWorkerWeight
1924 ).toBeDefined()
1925 expect(
1926 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1927 workerChoiceStrategy
1928 ).roundWeights
1929 ).toBeDefined()
1930 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1931 expect(
1932 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1933 pool.workerChoiceStrategyContext.workerChoiceStrategy
1934 ).nextWorkerNodeKey
1935 ).toBe(0)
1936 expect(
1937 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1938 pool.workerChoiceStrategyContext.workerChoiceStrategy
1939 ).defaultWorkerWeight
1940 ).toBeGreaterThan(0)
1941 expect(
1942 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1943 workerChoiceStrategy
1944 ).roundWeights
1945 ).toStrictEqual([
1946 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1947 pool.workerChoiceStrategyContext.workerChoiceStrategy
1948 ).defaultWorkerWeight
1949 ])
1950 // We need to clean up the resources after our test
1951 await pool.destroy()
1952 })
1953
1954 it('Verify unknown strategy throw error', () => {
1955 expect(
1956 () =>
1957 new DynamicThreadPool(
1958 min,
1959 max,
1960 './tests/worker-files/thread/testWorker.js',
1961 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
1962 )
1963 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
1964 })
1965 })