127377a26118c5d5be57cf98661eb9139fe5d81f
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10 import { sleep } from '../../test-utils.js'
11
12 describe('Selection strategies test suite', () => {
13 const min = 0
14 const max = 3
15
16 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
17 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
18 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
19 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
20 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
21 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
22 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
23 'WEIGHTED_ROUND_ROBIN'
24 )
25 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
26 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
27 )
28 })
29
30 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
31 const pool = new DynamicThreadPool(
32 min,
33 max,
34 './tests/worker-files/thread/testWorker.mjs'
35 )
36 expect(pool.opts.workerChoiceStrategy).toBe(
37 WorkerChoiceStrategies.ROUND_ROBIN
38 )
39 // We need to clean up the resources after our test
40 await pool.destroy()
41 })
42
43 it('Verify available strategies are taken at pool creation', async () => {
44 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
45 const pool = new FixedThreadPool(
46 max,
47 './tests/worker-files/thread/testWorker.mjs',
48 { workerChoiceStrategy }
49 )
50 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
51 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
52 workerChoiceStrategy
53 )
54 await pool.destroy()
55 }
56 })
57
58 it('Verify available strategies can be set after pool creation', async () => {
59 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
60 const pool = new DynamicThreadPool(
61 min,
62 max,
63 './tests/worker-files/thread/testWorker.mjs'
64 )
65 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
66 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
67 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
68 workerChoiceStrategy
69 )
70 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
71 retries: 6,
72 runTime: { median: false },
73 waitTime: { median: false },
74 elu: { median: false }
75 })
76 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
77 retries: 6,
78 runTime: { median: false },
79 waitTime: { median: false },
80 elu: { median: false }
81 })
82 await pool.destroy()
83 }
84 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
85 const pool = new DynamicClusterPool(
86 min,
87 max,
88 './tests/worker-files/cluster/testWorker.js'
89 )
90 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
91 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
92 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
93 workerChoiceStrategy
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 retries: 3,
97 runTime: { median: false },
98 waitTime: { median: false },
99 elu: { median: false }
100 })
101 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
102 retries: 3,
103 runTime: { median: false },
104 waitTime: { median: false },
105 elu: { median: false }
106 })
107 await pool.destroy()
108 }
109 })
110
111 it('Verify available strategies default internals at pool creation', async () => {
112 const pool = new FixedThreadPool(
113 max,
114 './tests/worker-files/thread/testWorker.mjs'
115 )
116 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
117 expect(
118 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
119 workerChoiceStrategy
120 ).nextWorkerNodeKey
121 ).toBe(0)
122 expect(
123 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
124 workerChoiceStrategy
125 ).previousWorkerNodeKey
126 ).toBe(0)
127 if (
128 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
129 ) {
130 expect(
131 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
132 workerChoiceStrategy
133 ).defaultWorkerWeight
134 ).toBeGreaterThan(0)
135 expect(
136 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
137 workerChoiceStrategy
138 ).workerNodeVirtualTaskRunTime
139 ).toBe(0)
140 } else if (
141 workerChoiceStrategy ===
142 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
143 ) {
144 expect(
145 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
146 workerChoiceStrategy
147 ).defaultWorkerWeight
148 ).toBeGreaterThan(0)
149 expect(
150 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
151 workerChoiceStrategy
152 ).workerNodeVirtualTaskRunTime
153 ).toBe(0)
154 expect(
155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
156 workerChoiceStrategy
157 ).roundId
158 ).toBe(0)
159 expect(
160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
161 workerChoiceStrategy
162 ).workerNodeId
163 ).toBe(0)
164 expect(
165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
166 workerChoiceStrategy
167 ).roundWeights
168 ).toStrictEqual([
169 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
170 workerChoiceStrategy
171 ).defaultWorkerWeight
172 ])
173 }
174 }
175 await pool.destroy()
176 })
177
178 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
179 const pool = new DynamicThreadPool(
180 min,
181 max,
182 './tests/worker-files/thread/testWorker.mjs'
183 )
184 await sleep(600)
185 expect(pool.starting).toBe(false)
186 expect(pool.workerNodes.length).toBe(min)
187 const maxMultiplier = 10000
188 const promises = new Set()
189 for (let i = 0; i < max * maxMultiplier; i++) {
190 promises.add(pool.execute())
191 }
192 await Promise.all(promises)
193 expect(pool.workerNodes.length).toBe(max)
194 // We need to clean up the resources after our test
195 await pool.destroy()
196 })
197
198 it('Verify ROUND_ROBIN strategy default policy', async () => {
199 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
200 let pool = new FixedThreadPool(
201 max,
202 './tests/worker-files/thread/testWorker.mjs',
203 { workerChoiceStrategy }
204 )
205 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
206 dynamicWorkerUsage: false,
207 dynamicWorkerReady: true
208 })
209 await pool.destroy()
210 pool = new DynamicThreadPool(
211 min,
212 max,
213 './tests/worker-files/thread/testWorker.mjs',
214 { workerChoiceStrategy }
215 )
216 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
217 dynamicWorkerUsage: false,
218 dynamicWorkerReady: true
219 })
220 // We need to clean up the resources after our test
221 await pool.destroy()
222 })
223
224 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
225 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
226 let pool = new FixedThreadPool(
227 max,
228 './tests/worker-files/thread/testWorker.mjs',
229 { workerChoiceStrategy }
230 )
231 expect(
232 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
233 ).toStrictEqual({
234 runTime: {
235 aggregate: false,
236 average: false,
237 median: false
238 },
239 waitTime: {
240 aggregate: false,
241 average: false,
242 median: false
243 },
244 elu: {
245 aggregate: false,
246 average: false,
247 median: false
248 }
249 })
250 await pool.destroy()
251 pool = new DynamicThreadPool(
252 min,
253 max,
254 './tests/worker-files/thread/testWorker.mjs',
255 { workerChoiceStrategy }
256 )
257 expect(
258 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 ).toStrictEqual({
260 runTime: {
261 aggregate: false,
262 average: false,
263 median: false
264 },
265 waitTime: {
266 aggregate: false,
267 average: false,
268 median: false
269 },
270 elu: {
271 aggregate: false,
272 average: false,
273 median: false
274 }
275 })
276 // We need to clean up the resources after our test
277 await pool.destroy()
278 })
279
280 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
281 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
282 const pool = new FixedThreadPool(
283 max,
284 './tests/worker-files/thread/testWorker.mjs',
285 { workerChoiceStrategy }
286 )
287 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
288 const promises = new Set()
289 const maxMultiplier = 2
290 for (let i = 0; i < max * maxMultiplier; i++) {
291 promises.add(pool.execute())
292 }
293 await Promise.all(promises)
294 for (const workerNode of pool.workerNodes) {
295 expect(workerNode.usage).toStrictEqual({
296 tasks: {
297 executed: maxMultiplier,
298 executing: 0,
299 queued: 0,
300 maxQueued: 0,
301 sequentiallyStolen: 0,
302 stolen: 0,
303 failed: 0
304 },
305 runTime: {
306 history: new CircularArray()
307 },
308 waitTime: {
309 history: new CircularArray()
310 },
311 elu: {
312 idle: {
313 history: new CircularArray()
314 },
315 active: {
316 history: new CircularArray()
317 }
318 }
319 })
320 }
321 expect(
322 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
323 pool.workerChoiceStrategyContext.workerChoiceStrategy
324 ).nextWorkerNodeKey
325 ).toBe(0)
326 expect(
327 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
328 pool.workerChoiceStrategyContext.workerChoiceStrategy
329 ).previousWorkerNodeKey
330 ).toBe(pool.workerNodes.length - 1)
331 // We need to clean up the resources after our test
332 await pool.destroy()
333 })
334
335 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
336 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
337 const pool = new DynamicThreadPool(
338 min,
339 max,
340 './tests/worker-files/thread/testWorker.mjs',
341 { workerChoiceStrategy }
342 )
343 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
344 const promises = new Set()
345 const maxMultiplier = 2
346 for (let i = 0; i < max * maxMultiplier; i++) {
347 promises.add(pool.execute())
348 }
349 await Promise.all(promises)
350 for (const workerNode of pool.workerNodes) {
351 expect(workerNode.usage).toStrictEqual({
352 tasks: {
353 executed: expect.any(Number),
354 executing: 0,
355 queued: 0,
356 maxQueued: 0,
357 sequentiallyStolen: 0,
358 stolen: 0,
359 failed: 0
360 },
361 runTime: {
362 history: new CircularArray()
363 },
364 waitTime: {
365 history: new CircularArray()
366 },
367 elu: {
368 idle: {
369 history: new CircularArray()
370 },
371 active: {
372 history: new CircularArray()
373 }
374 }
375 })
376 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
377 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
378 max * maxMultiplier
379 )
380 }
381 expect(
382 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
383 pool.workerChoiceStrategyContext.workerChoiceStrategy
384 ).nextWorkerNodeKey
385 ).toBe(0)
386 expect(
387 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
388 pool.workerChoiceStrategyContext.workerChoiceStrategy
389 ).previousWorkerNodeKey
390 ).toBe(pool.workerNodes.length - 1)
391 // We need to clean up the resources after our test
392 await pool.destroy()
393 })
394
395 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
396 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
397 let pool = new FixedClusterPool(
398 max,
399 './tests/worker-files/cluster/testWorker.js',
400 { workerChoiceStrategy }
401 )
402 let results = new Set()
403 for (let i = 0; i < max; i++) {
404 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
405 }
406 expect(results.size).toBe(max)
407 await pool.destroy()
408 pool = new FixedThreadPool(
409 max,
410 './tests/worker-files/thread/testWorker.mjs',
411 { workerChoiceStrategy }
412 )
413 results = new Set()
414 for (let i = 0; i < max; i++) {
415 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
416 }
417 expect(results.size).toBe(max)
418 await pool.destroy()
419 })
420
421 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
422 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
423 let pool = new FixedThreadPool(
424 max,
425 './tests/worker-files/thread/testWorker.mjs',
426 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
427 )
428 expect(
429 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
430 pool.workerChoiceStrategyContext.workerChoiceStrategy
431 ).nextWorkerNodeKey
432 ).toBeDefined()
433 expect(
434 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
435 pool.workerChoiceStrategyContext.workerChoiceStrategy
436 ).previousWorkerNodeKey
437 ).toBeDefined()
438 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
439 expect(
440 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
441 pool.workerChoiceStrategyContext.workerChoiceStrategy
442 ).nextWorkerNodeKey
443 ).toBe(0)
444 expect(
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
447 ).previousWorkerNodeKey
448 ).toBe(0)
449 await pool.destroy()
450 pool = new DynamicThreadPool(
451 min,
452 max,
453 './tests/worker-files/thread/testWorker.mjs',
454 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
455 )
456 expect(
457 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
458 pool.workerChoiceStrategyContext.workerChoiceStrategy
459 ).nextWorkerNodeKey
460 ).toBeDefined()
461 expect(
462 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
463 pool.workerChoiceStrategyContext.workerChoiceStrategy
464 ).previousWorkerNodeKey
465 ).toBeDefined()
466 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
467 expect(
468 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
469 pool.workerChoiceStrategyContext.workerChoiceStrategy
470 ).nextWorkerNodeKey
471 ).toBe(0)
472 expect(
473 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
474 pool.workerChoiceStrategyContext.workerChoiceStrategy
475 ).previousWorkerNodeKey
476 ).toBe(0)
477 // We need to clean up the resources after our test
478 await pool.destroy()
479 })
480
481 it('Verify LEAST_USED strategy default policy', async () => {
482 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
483 let pool = new FixedThreadPool(
484 max,
485 './tests/worker-files/thread/testWorker.mjs',
486 { workerChoiceStrategy }
487 )
488 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
489 dynamicWorkerUsage: false,
490 dynamicWorkerReady: true
491 })
492 await pool.destroy()
493 pool = new DynamicThreadPool(
494 min,
495 max,
496 './tests/worker-files/thread/testWorker.mjs',
497 { workerChoiceStrategy }
498 )
499 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
500 dynamicWorkerUsage: false,
501 dynamicWorkerReady: true
502 })
503 // We need to clean up the resources after our test
504 await pool.destroy()
505 })
506
507 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
508 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
509 let pool = new FixedThreadPool(
510 max,
511 './tests/worker-files/thread/testWorker.mjs',
512 { workerChoiceStrategy }
513 )
514 expect(
515 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
516 ).toStrictEqual({
517 runTime: {
518 aggregate: false,
519 average: false,
520 median: false
521 },
522 waitTime: {
523 aggregate: false,
524 average: false,
525 median: false
526 },
527 elu: {
528 aggregate: false,
529 average: false,
530 median: false
531 }
532 })
533 await pool.destroy()
534 pool = new DynamicThreadPool(
535 min,
536 max,
537 './tests/worker-files/thread/testWorker.mjs',
538 { workerChoiceStrategy }
539 )
540 expect(
541 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
542 ).toStrictEqual({
543 runTime: {
544 aggregate: false,
545 average: false,
546 median: false
547 },
548 waitTime: {
549 aggregate: false,
550 average: false,
551 median: false
552 },
553 elu: {
554 aggregate: false,
555 average: false,
556 median: false
557 }
558 })
559 // We need to clean up the resources after our test
560 await pool.destroy()
561 })
562
563 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
564 const pool = new FixedThreadPool(
565 max,
566 './tests/worker-files/thread/testWorker.mjs',
567 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
568 )
569 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
570 const promises = new Set()
571 const maxMultiplier = 2
572 for (let i = 0; i < max * maxMultiplier; i++) {
573 promises.add(pool.execute())
574 }
575 await Promise.all(promises)
576 for (const workerNode of pool.workerNodes) {
577 expect(workerNode.usage).toStrictEqual({
578 tasks: {
579 executed: expect.any(Number),
580 executing: 0,
581 queued: 0,
582 maxQueued: 0,
583 sequentiallyStolen: 0,
584 stolen: 0,
585 failed: 0
586 },
587 runTime: {
588 history: new CircularArray()
589 },
590 waitTime: {
591 history: new CircularArray()
592 },
593 elu: {
594 idle: {
595 history: new CircularArray()
596 },
597 active: {
598 history: new CircularArray()
599 }
600 }
601 })
602 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
603 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
604 max * maxMultiplier
605 )
606 }
607 expect(
608 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
609 pool.workerChoiceStrategyContext.workerChoiceStrategy
610 ).nextWorkerNodeKey
611 ).toEqual(expect.any(Number))
612 expect(
613 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
614 pool.workerChoiceStrategyContext.workerChoiceStrategy
615 ).previousWorkerNodeKey
616 ).toEqual(expect.any(Number))
617 // We need to clean up the resources after our test
618 await pool.destroy()
619 })
620
621 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
622 const pool = new DynamicThreadPool(
623 min,
624 max,
625 './tests/worker-files/thread/testWorker.mjs',
626 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
627 )
628 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
629 const promises = new Set()
630 const maxMultiplier = 2
631 for (let i = 0; i < max * maxMultiplier; i++) {
632 promises.add(pool.execute())
633 }
634 await Promise.all(promises)
635 for (const workerNode of pool.workerNodes) {
636 expect(workerNode.usage).toStrictEqual({
637 tasks: {
638 executed: expect.any(Number),
639 executing: 0,
640 queued: 0,
641 maxQueued: 0,
642 sequentiallyStolen: 0,
643 stolen: 0,
644 failed: 0
645 },
646 runTime: {
647 history: new CircularArray()
648 },
649 waitTime: {
650 history: new CircularArray()
651 },
652 elu: {
653 idle: {
654 history: new CircularArray()
655 },
656 active: {
657 history: new CircularArray()
658 }
659 }
660 })
661 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
662 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
663 max * maxMultiplier
664 )
665 }
666 expect(
667 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
668 pool.workerChoiceStrategyContext.workerChoiceStrategy
669 ).nextWorkerNodeKey
670 ).toEqual(expect.any(Number))
671 expect(
672 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
673 pool.workerChoiceStrategyContext.workerChoiceStrategy
674 ).previousWorkerNodeKey
675 ).toEqual(expect.any(Number))
676 // We need to clean up the resources after our test
677 await pool.destroy()
678 })
679
680 it('Verify LEAST_BUSY strategy default policy', async () => {
681 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
682 let pool = new FixedThreadPool(
683 max,
684 './tests/worker-files/thread/testWorker.mjs',
685 { workerChoiceStrategy }
686 )
687 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
688 dynamicWorkerUsage: false,
689 dynamicWorkerReady: true
690 })
691 await pool.destroy()
692 pool = new DynamicThreadPool(
693 min,
694 max,
695 './tests/worker-files/thread/testWorker.mjs',
696 { workerChoiceStrategy }
697 )
698 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
699 dynamicWorkerUsage: false,
700 dynamicWorkerReady: true
701 })
702 // We need to clean up the resources after our test
703 await pool.destroy()
704 })
705
706 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
707 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
708 let pool = new FixedThreadPool(
709 max,
710 './tests/worker-files/thread/testWorker.mjs',
711 { workerChoiceStrategy }
712 )
713 expect(
714 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
715 ).toStrictEqual({
716 runTime: {
717 aggregate: true,
718 average: false,
719 median: false
720 },
721 waitTime: {
722 aggregate: true,
723 average: false,
724 median: false
725 },
726 elu: {
727 aggregate: false,
728 average: false,
729 median: false
730 }
731 })
732 await pool.destroy()
733 pool = new DynamicThreadPool(
734 min,
735 max,
736 './tests/worker-files/thread/testWorker.mjs',
737 { workerChoiceStrategy }
738 )
739 expect(
740 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
741 ).toStrictEqual({
742 runTime: {
743 aggregate: true,
744 average: false,
745 median: false
746 },
747 waitTime: {
748 aggregate: true,
749 average: false,
750 median: false
751 },
752 elu: {
753 aggregate: false,
754 average: false,
755 median: false
756 }
757 })
758 // We need to clean up the resources after our test
759 await pool.destroy()
760 })
761
762 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
763 const pool = new FixedThreadPool(
764 max,
765 './tests/worker-files/thread/testWorker.mjs',
766 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
767 )
768 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
769 const promises = new Set()
770 const maxMultiplier = 2
771 for (let i = 0; i < max * maxMultiplier; i++) {
772 promises.add(pool.execute())
773 }
774 await Promise.all(promises)
775 for (const workerNode of pool.workerNodes) {
776 expect(workerNode.usage).toStrictEqual({
777 tasks: {
778 executed: expect.any(Number),
779 executing: 0,
780 queued: 0,
781 maxQueued: 0,
782 sequentiallyStolen: 0,
783 stolen: 0,
784 failed: 0
785 },
786 runTime: expect.objectContaining({
787 history: expect.any(CircularArray)
788 }),
789 waitTime: expect.objectContaining({
790 history: expect.any(CircularArray)
791 }),
792 elu: {
793 idle: {
794 history: new CircularArray()
795 },
796 active: {
797 history: new CircularArray()
798 }
799 }
800 })
801 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
802 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
803 max * maxMultiplier
804 )
805 if (workerNode.usage.runTime.aggregate == null) {
806 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
807 } else {
808 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
809 }
810 if (workerNode.usage.waitTime.aggregate == null) {
811 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
812 } else {
813 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
814 }
815 }
816 expect(
817 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
818 pool.workerChoiceStrategyContext.workerChoiceStrategy
819 ).nextWorkerNodeKey
820 ).toEqual(expect.any(Number))
821 expect(
822 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
823 pool.workerChoiceStrategyContext.workerChoiceStrategy
824 ).previousWorkerNodeKey
825 ).toEqual(expect.any(Number))
826 // We need to clean up the resources after our test
827 await pool.destroy()
828 })
829
830 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
831 const pool = new DynamicThreadPool(
832 min,
833 max,
834 './tests/worker-files/thread/testWorker.mjs',
835 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
836 )
837 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
838 const promises = new Set()
839 const maxMultiplier = 2
840 for (let i = 0; i < max * maxMultiplier; i++) {
841 promises.add(pool.execute())
842 }
843 await Promise.all(promises)
844 for (const workerNode of pool.workerNodes) {
845 expect(workerNode.usage).toStrictEqual({
846 tasks: {
847 executed: expect.any(Number),
848 executing: 0,
849 queued: 0,
850 maxQueued: 0,
851 sequentiallyStolen: 0,
852 stolen: 0,
853 failed: 0
854 },
855 runTime: expect.objectContaining({
856 history: expect.any(CircularArray)
857 }),
858 waitTime: expect.objectContaining({
859 history: expect.any(CircularArray)
860 }),
861 elu: {
862 idle: {
863 history: new CircularArray()
864 },
865 active: {
866 history: new CircularArray()
867 }
868 }
869 })
870 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
871 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
872 max * maxMultiplier
873 )
874 if (workerNode.usage.runTime.aggregate == null) {
875 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
876 } else {
877 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
878 }
879 if (workerNode.usage.waitTime.aggregate == null) {
880 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
881 } else {
882 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
883 }
884 }
885 expect(
886 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
887 pool.workerChoiceStrategyContext.workerChoiceStrategy
888 ).nextWorkerNodeKey
889 ).toEqual(expect.any(Number))
890 expect(
891 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
892 pool.workerChoiceStrategyContext.workerChoiceStrategy
893 ).previousWorkerNodeKey
894 ).toEqual(expect.any(Number))
895 // We need to clean up the resources after our test
896 await pool.destroy()
897 })
898
899 it('Verify LEAST_ELU strategy default policy', async () => {
900 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
901 let pool = new FixedThreadPool(
902 max,
903 './tests/worker-files/thread/testWorker.mjs',
904 { workerChoiceStrategy }
905 )
906 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
907 dynamicWorkerUsage: false,
908 dynamicWorkerReady: true
909 })
910 await pool.destroy()
911 pool = new DynamicThreadPool(
912 min,
913 max,
914 './tests/worker-files/thread/testWorker.mjs',
915 { workerChoiceStrategy }
916 )
917 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
918 dynamicWorkerUsage: false,
919 dynamicWorkerReady: true
920 })
921 // We need to clean up the resources after our test
922 await pool.destroy()
923 })
924
925 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
926 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
927 let pool = new FixedThreadPool(
928 max,
929 './tests/worker-files/thread/testWorker.mjs',
930 { workerChoiceStrategy }
931 )
932 expect(
933 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
934 ).toStrictEqual({
935 runTime: {
936 aggregate: false,
937 average: false,
938 median: false
939 },
940 waitTime: {
941 aggregate: false,
942 average: false,
943 median: false
944 },
945 elu: {
946 aggregate: true,
947 average: false,
948 median: false
949 }
950 })
951 await pool.destroy()
952 pool = new DynamicThreadPool(
953 min,
954 max,
955 './tests/worker-files/thread/testWorker.mjs',
956 { workerChoiceStrategy }
957 )
958 expect(
959 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
960 ).toStrictEqual({
961 runTime: {
962 aggregate: false,
963 average: false,
964 median: false
965 },
966 waitTime: {
967 aggregate: false,
968 average: false,
969 median: false
970 },
971 elu: {
972 aggregate: true,
973 average: false,
974 median: false
975 }
976 })
977 // We need to clean up the resources after our test
978 await pool.destroy()
979 })
980
981 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
982 const pool = new FixedThreadPool(
983 max,
984 './tests/worker-files/thread/testWorker.mjs',
985 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
986 )
987 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
988 const promises = new Set()
989 const maxMultiplier = 2
990 for (let i = 0; i < max * maxMultiplier; i++) {
991 promises.add(pool.execute())
992 }
993 await Promise.all(promises)
994 for (const workerNode of pool.workerNodes) {
995 expect(workerNode.usage).toStrictEqual({
996 tasks: {
997 executed: expect.any(Number),
998 executing: 0,
999 queued: 0,
1000 maxQueued: 0,
1001 sequentiallyStolen: 0,
1002 stolen: 0,
1003 failed: 0
1004 },
1005 runTime: {
1006 history: new CircularArray()
1007 },
1008 waitTime: {
1009 history: new CircularArray()
1010 },
1011 elu: expect.objectContaining({
1012 idle: expect.objectContaining({
1013 history: expect.any(CircularArray)
1014 }),
1015 active: expect.objectContaining({
1016 history: expect.any(CircularArray)
1017 })
1018 })
1019 })
1020 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1021 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1022 max * maxMultiplier
1023 )
1024 if (workerNode.usage.elu.active.aggregate == null) {
1025 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1026 } else {
1027 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1028 }
1029 if (workerNode.usage.elu.idle.aggregate == null) {
1030 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1031 } else {
1032 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1033 }
1034 if (workerNode.usage.elu.utilization == null) {
1035 expect(workerNode.usage.elu.utilization).toBeUndefined()
1036 } else {
1037 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1038 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1039 }
1040 }
1041 expect(
1042 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1043 pool.workerChoiceStrategyContext.workerChoiceStrategy
1044 ).nextWorkerNodeKey
1045 ).toEqual(expect.any(Number))
1046 expect(
1047 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1048 pool.workerChoiceStrategyContext.workerChoiceStrategy
1049 ).previousWorkerNodeKey
1050 ).toEqual(expect.any(Number))
1051 // We need to clean up the resources after our test
1052 await pool.destroy()
1053 })
1054
1055 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1056 const pool = new DynamicThreadPool(
1057 min,
1058 max,
1059 './tests/worker-files/thread/testWorker.mjs',
1060 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1061 )
1062 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1063 const promises = new Set()
1064 const maxMultiplier = 2
1065 for (let i = 0; i < max * maxMultiplier; i++) {
1066 promises.add(pool.execute())
1067 }
1068 await Promise.all(promises)
1069 for (const workerNode of pool.workerNodes) {
1070 expect(workerNode.usage).toStrictEqual({
1071 tasks: {
1072 executed: expect.any(Number),
1073 executing: 0,
1074 queued: 0,
1075 maxQueued: 0,
1076 sequentiallyStolen: 0,
1077 stolen: 0,
1078 failed: 0
1079 },
1080 runTime: {
1081 history: new CircularArray()
1082 },
1083 waitTime: {
1084 history: new CircularArray()
1085 },
1086 elu: expect.objectContaining({
1087 idle: expect.objectContaining({
1088 history: expect.any(CircularArray)
1089 }),
1090 active: expect.objectContaining({
1091 history: expect.any(CircularArray)
1092 })
1093 })
1094 })
1095 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1096 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1097 max * maxMultiplier
1098 )
1099 if (workerNode.usage.elu.active.aggregate == null) {
1100 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1101 } else {
1102 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1103 }
1104 if (workerNode.usage.elu.idle.aggregate == null) {
1105 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1106 } else {
1107 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1108 }
1109 if (workerNode.usage.elu.utilization == null) {
1110 expect(workerNode.usage.elu.utilization).toBeUndefined()
1111 } else {
1112 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1113 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1114 }
1115 }
1116 expect(
1117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1118 pool.workerChoiceStrategyContext.workerChoiceStrategy
1119 ).nextWorkerNodeKey
1120 ).toEqual(expect.any(Number))
1121 expect(
1122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1123 pool.workerChoiceStrategyContext.workerChoiceStrategy
1124 ).previousWorkerNodeKey
1125 ).toEqual(expect.any(Number))
1126 // We need to clean up the resources after our test
1127 await pool.destroy()
1128 })
1129
1130 it('Verify FAIR_SHARE strategy default policy', async () => {
1131 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1132 let pool = new FixedThreadPool(
1133 max,
1134 './tests/worker-files/thread/testWorker.mjs',
1135 { workerChoiceStrategy }
1136 )
1137 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1138 dynamicWorkerUsage: false,
1139 dynamicWorkerReady: true
1140 })
1141 await pool.destroy()
1142 pool = new DynamicThreadPool(
1143 min,
1144 max,
1145 './tests/worker-files/thread/testWorker.mjs',
1146 { workerChoiceStrategy }
1147 )
1148 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1149 dynamicWorkerUsage: false,
1150 dynamicWorkerReady: true
1151 })
1152 // We need to clean up the resources after our test
1153 await pool.destroy()
1154 })
1155
1156 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1157 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1158 let pool = new FixedThreadPool(
1159 max,
1160 './tests/worker-files/thread/testWorker.mjs',
1161 { workerChoiceStrategy }
1162 )
1163 expect(
1164 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1165 ).toStrictEqual({
1166 runTime: {
1167 aggregate: true,
1168 average: true,
1169 median: false
1170 },
1171 waitTime: {
1172 aggregate: false,
1173 average: false,
1174 median: false
1175 },
1176 elu: {
1177 aggregate: true,
1178 average: true,
1179 median: false
1180 }
1181 })
1182 await pool.destroy()
1183 pool = new DynamicThreadPool(
1184 min,
1185 max,
1186 './tests/worker-files/thread/testWorker.mjs',
1187 { workerChoiceStrategy }
1188 )
1189 expect(
1190 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1191 ).toStrictEqual({
1192 runTime: {
1193 aggregate: true,
1194 average: true,
1195 median: false
1196 },
1197 waitTime: {
1198 aggregate: false,
1199 average: false,
1200 median: false
1201 },
1202 elu: {
1203 aggregate: true,
1204 average: true,
1205 median: false
1206 }
1207 })
1208 // We need to clean up the resources after our test
1209 await pool.destroy()
1210 })
1211
1212 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1213 const pool = new FixedThreadPool(
1214 max,
1215 './tests/worker-files/thread/testWorker.mjs',
1216 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1217 )
1218 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1219 const promises = new Set()
1220 const maxMultiplier = 2
1221 for (let i = 0; i < max * maxMultiplier; i++) {
1222 promises.add(pool.execute())
1223 }
1224 await Promise.all(promises)
1225 for (const workerNode of pool.workerNodes) {
1226 expect(workerNode.usage).toStrictEqual({
1227 tasks: {
1228 executed: expect.any(Number),
1229 executing: 0,
1230 queued: 0,
1231 maxQueued: 0,
1232 sequentiallyStolen: 0,
1233 stolen: 0,
1234 failed: 0
1235 },
1236 runTime: expect.objectContaining({
1237 history: expect.any(CircularArray)
1238 }),
1239 waitTime: {
1240 history: new CircularArray()
1241 },
1242 elu: expect.objectContaining({
1243 idle: expect.objectContaining({
1244 history: expect.any(CircularArray)
1245 }),
1246 active: expect.objectContaining({
1247 history: expect.any(CircularArray)
1248 })
1249 })
1250 })
1251 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1252 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1253 max * maxMultiplier
1254 )
1255 if (workerNode.usage.runTime.aggregate == null) {
1256 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1257 } else {
1258 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1259 }
1260 if (workerNode.usage.runTime.average == null) {
1261 expect(workerNode.usage.runTime.average).toBeUndefined()
1262 } else {
1263 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1264 }
1265 if (workerNode.usage.elu.active.aggregate == null) {
1266 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1267 } else {
1268 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1269 }
1270 if (workerNode.usage.elu.idle.aggregate == null) {
1271 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1272 } else {
1273 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1274 }
1275 if (workerNode.usage.elu.utilization == null) {
1276 expect(workerNode.usage.elu.utilization).toBeUndefined()
1277 } else {
1278 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1279 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1280 }
1281 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1282 }
1283 expect(
1284 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1285 pool.workerChoiceStrategyContext.workerChoiceStrategy
1286 ).nextWorkerNodeKey
1287 ).toEqual(expect.any(Number))
1288 expect(
1289 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1290 pool.workerChoiceStrategyContext.workerChoiceStrategy
1291 ).previousWorkerNodeKey
1292 ).toEqual(expect.any(Number))
1293 // We need to clean up the resources after our test
1294 await pool.destroy()
1295 })
1296
1297 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1298 const pool = new DynamicThreadPool(
1299 min,
1300 max,
1301 './tests/worker-files/thread/testWorker.mjs',
1302 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1303 )
1304 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1305 const promises = new Set()
1306 const maxMultiplier = 2
1307 for (let i = 0; i < max * maxMultiplier; i++) {
1308 promises.add(pool.execute())
1309 }
1310 await Promise.all(promises)
1311 for (const workerNode of pool.workerNodes) {
1312 expect(workerNode.usage).toStrictEqual({
1313 tasks: {
1314 executed: expect.any(Number),
1315 executing: 0,
1316 queued: 0,
1317 maxQueued: 0,
1318 sequentiallyStolen: 0,
1319 stolen: 0,
1320 failed: 0
1321 },
1322 runTime: expect.objectContaining({
1323 history: expect.any(CircularArray)
1324 }),
1325 waitTime: {
1326 history: new CircularArray()
1327 },
1328 elu: expect.objectContaining({
1329 idle: expect.objectContaining({
1330 history: expect.any(CircularArray)
1331 }),
1332 active: expect.objectContaining({
1333 history: expect.any(CircularArray)
1334 })
1335 })
1336 })
1337 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1338 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1339 max * maxMultiplier
1340 )
1341 if (workerNode.usage.runTime.aggregate == null) {
1342 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1343 } else {
1344 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1345 }
1346 if (workerNode.usage.runTime.average == null) {
1347 expect(workerNode.usage.runTime.average).toBeUndefined()
1348 } else {
1349 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1350 }
1351 if (workerNode.usage.elu.active.aggregate == null) {
1352 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1353 } else {
1354 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1355 }
1356 if (workerNode.usage.elu.idle.aggregate == null) {
1357 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1358 } else {
1359 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1360 }
1361 if (workerNode.usage.elu.utilization == null) {
1362 expect(workerNode.usage.elu.utilization).toBeUndefined()
1363 } else {
1364 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1365 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1366 }
1367 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1368 }
1369 expect(
1370 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1371 pool.workerChoiceStrategyContext.workerChoiceStrategy
1372 ).nextWorkerNodeKey
1373 ).toEqual(expect.any(Number))
1374 expect(
1375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1376 pool.workerChoiceStrategyContext.workerChoiceStrategy
1377 ).previousWorkerNodeKey
1378 ).toEqual(expect.any(Number))
1379 // We need to clean up the resources after our test
1380 await pool.destroy()
1381 })
1382
1383 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1384 const pool = new DynamicThreadPool(
1385 min,
1386 max,
1387 './tests/worker-files/thread/testWorker.mjs',
1388 {
1389 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1390 workerChoiceStrategyOptions: {
1391 runTime: { median: true }
1392 }
1393 }
1394 )
1395 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1396 const promises = new Set()
1397 const maxMultiplier = 2
1398 for (let i = 0; i < max * maxMultiplier; i++) {
1399 promises.add(pool.execute())
1400 }
1401 await Promise.all(promises)
1402 for (const workerNode of pool.workerNodes) {
1403 expect(workerNode.usage).toStrictEqual({
1404 tasks: {
1405 executed: expect.any(Number),
1406 executing: 0,
1407 queued: 0,
1408 maxQueued: 0,
1409 sequentiallyStolen: 0,
1410 stolen: 0,
1411 failed: 0
1412 },
1413 runTime: expect.objectContaining({
1414 history: expect.any(CircularArray)
1415 }),
1416 waitTime: {
1417 history: new CircularArray()
1418 },
1419 elu: expect.objectContaining({
1420 idle: expect.objectContaining({
1421 history: expect.any(CircularArray)
1422 }),
1423 active: expect.objectContaining({
1424 history: expect.any(CircularArray)
1425 })
1426 })
1427 })
1428 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1429 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1430 max * maxMultiplier
1431 )
1432 if (workerNode.usage.runTime.aggregate == null) {
1433 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1434 } else {
1435 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1436 }
1437 if (workerNode.usage.runTime.median == null) {
1438 expect(workerNode.usage.runTime.median).toBeUndefined()
1439 } else {
1440 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1441 }
1442 if (workerNode.usage.elu.active.aggregate == null) {
1443 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1444 } else {
1445 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1446 }
1447 if (workerNode.usage.elu.idle.aggregate == null) {
1448 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1449 } else {
1450 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1451 }
1452 if (workerNode.usage.elu.utilization == null) {
1453 expect(workerNode.usage.elu.utilization).toBeUndefined()
1454 } else {
1455 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1456 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1457 }
1458 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1459 }
1460 expect(
1461 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1462 pool.workerChoiceStrategyContext.workerChoiceStrategy
1463 ).nextWorkerNodeKey
1464 ).toEqual(expect.any(Number))
1465 expect(
1466 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1467 pool.workerChoiceStrategyContext.workerChoiceStrategy
1468 ).previousWorkerNodeKey
1469 ).toEqual(expect.any(Number))
1470 // We need to clean up the resources after our test
1471 await pool.destroy()
1472 })
1473
1474 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1475 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1476 let pool = new FixedThreadPool(
1477 max,
1478 './tests/worker-files/thread/testWorker.mjs'
1479 )
1480 for (const workerNode of pool.workerNodes) {
1481 workerNode.strategyData = {
1482 virtualTaskEndTimestamp: performance.now()
1483 }
1484 }
1485 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1486 for (const workerNode of pool.workerNodes) {
1487 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1488 }
1489 await pool.destroy()
1490 pool = new DynamicThreadPool(
1491 min,
1492 max,
1493 './tests/worker-files/thread/testWorker.mjs'
1494 )
1495 for (const workerNode of pool.workerNodes) {
1496 workerNode.strategyData = {
1497 virtualTaskEndTimestamp: performance.now()
1498 }
1499 }
1500 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1501 for (const workerNode of pool.workerNodes) {
1502 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1503 }
1504 // We need to clean up the resources after our test
1505 await pool.destroy()
1506 })
1507
1508 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1509 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1510 let pool = new FixedThreadPool(
1511 max,
1512 './tests/worker-files/thread/testWorker.mjs',
1513 { workerChoiceStrategy }
1514 )
1515 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1516 dynamicWorkerUsage: false,
1517 dynamicWorkerReady: true
1518 })
1519 await pool.destroy()
1520 pool = new DynamicThreadPool(
1521 min,
1522 max,
1523 './tests/worker-files/thread/testWorker.mjs',
1524 { workerChoiceStrategy }
1525 )
1526 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1527 dynamicWorkerUsage: false,
1528 dynamicWorkerReady: true
1529 })
1530 // We need to clean up the resources after our test
1531 await pool.destroy()
1532 })
1533
1534 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1535 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1536 let pool = new FixedThreadPool(
1537 max,
1538 './tests/worker-files/thread/testWorker.mjs',
1539 { workerChoiceStrategy }
1540 )
1541 expect(
1542 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1543 ).toStrictEqual({
1544 runTime: {
1545 aggregate: true,
1546 average: true,
1547 median: false
1548 },
1549 waitTime: {
1550 aggregate: false,
1551 average: false,
1552 median: false
1553 },
1554 elu: {
1555 aggregate: false,
1556 average: false,
1557 median: false
1558 }
1559 })
1560 await pool.destroy()
1561 pool = new DynamicThreadPool(
1562 min,
1563 max,
1564 './tests/worker-files/thread/testWorker.mjs',
1565 { workerChoiceStrategy }
1566 )
1567 expect(
1568 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1569 ).toStrictEqual({
1570 runTime: {
1571 aggregate: true,
1572 average: true,
1573 median: false
1574 },
1575 waitTime: {
1576 aggregate: false,
1577 average: false,
1578 median: false
1579 },
1580 elu: {
1581 aggregate: false,
1582 average: false,
1583 median: false
1584 }
1585 })
1586 // We need to clean up the resources after our test
1587 await pool.destroy()
1588 })
1589
1590 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1591 const pool = new FixedThreadPool(
1592 max,
1593 './tests/worker-files/thread/testWorker.mjs',
1594 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1595 )
1596 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1597 const promises = new Set()
1598 const maxMultiplier = 2
1599 for (let i = 0; i < max * maxMultiplier; i++) {
1600 promises.add(pool.execute())
1601 }
1602 await Promise.all(promises)
1603 for (const workerNode of pool.workerNodes) {
1604 expect(workerNode.usage).toStrictEqual({
1605 tasks: {
1606 executed: expect.any(Number),
1607 executing: 0,
1608 queued: 0,
1609 maxQueued: 0,
1610 sequentiallyStolen: 0,
1611 stolen: 0,
1612 failed: 0
1613 },
1614 runTime: expect.objectContaining({
1615 history: expect.any(CircularArray)
1616 }),
1617 waitTime: {
1618 history: new CircularArray()
1619 },
1620 elu: {
1621 idle: {
1622 history: new CircularArray()
1623 },
1624 active: {
1625 history: new CircularArray()
1626 }
1627 }
1628 })
1629 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1630 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1631 max * maxMultiplier
1632 )
1633 if (workerNode.usage.runTime.aggregate == null) {
1634 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1635 } else {
1636 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1637 }
1638 if (workerNode.usage.runTime.average == null) {
1639 expect(workerNode.usage.runTime.average).toBeUndefined()
1640 } else {
1641 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1642 }
1643 }
1644 expect(
1645 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1646 pool.workerChoiceStrategyContext.workerChoiceStrategy
1647 ).nextWorkerNodeKey
1648 ).toBe(0)
1649 expect(
1650 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1651 pool.workerChoiceStrategyContext.workerChoiceStrategy
1652 ).previousWorkerNodeKey
1653 ).toBe(0)
1654 expect(
1655 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1656 pool.workerChoiceStrategyContext.workerChoiceStrategy
1657 ).defaultWorkerWeight
1658 ).toBeGreaterThan(0)
1659 expect(
1660 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1661 pool.workerChoiceStrategyContext.workerChoiceStrategy
1662 ).workerNodeVirtualTaskRunTime
1663 ).toBeGreaterThanOrEqual(0)
1664 // We need to clean up the resources after our test
1665 await pool.destroy()
1666 })
1667
1668 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1669 const pool = new DynamicThreadPool(
1670 min,
1671 max,
1672 './tests/worker-files/thread/testWorker.mjs',
1673 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1674 )
1675 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1676 const promises = new Set()
1677 const maxMultiplier = 2
1678 for (let i = 0; i < max * maxMultiplier; i++) {
1679 promises.add(pool.execute())
1680 }
1681 await Promise.all(promises)
1682 for (const workerNode of pool.workerNodes) {
1683 expect(workerNode.usage).toStrictEqual({
1684 tasks: {
1685 executed: expect.any(Number),
1686 executing: 0,
1687 queued: 0,
1688 maxQueued: 0,
1689 sequentiallyStolen: 0,
1690 stolen: 0,
1691 failed: 0
1692 },
1693 runTime: expect.objectContaining({
1694 history: expect.any(CircularArray)
1695 }),
1696 waitTime: {
1697 history: new CircularArray()
1698 },
1699 elu: {
1700 idle: {
1701 history: new CircularArray()
1702 },
1703 active: {
1704 history: new CircularArray()
1705 }
1706 }
1707 })
1708 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1709 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1710 max * maxMultiplier
1711 )
1712 if (workerNode.usage.runTime.aggregate == null) {
1713 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1714 } else {
1715 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1716 }
1717 if (workerNode.usage.runTime.average == null) {
1718 expect(workerNode.usage.runTime.average).toBeUndefined()
1719 } else {
1720 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1721 }
1722 }
1723 expect(
1724 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1725 pool.workerChoiceStrategyContext.workerChoiceStrategy
1726 ).nextWorkerNodeKey
1727 ).toBe(0)
1728 expect(
1729 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1730 pool.workerChoiceStrategyContext.workerChoiceStrategy
1731 ).previousWorkerNodeKey
1732 ).toBe(0)
1733 expect(
1734 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1735 pool.workerChoiceStrategyContext.workerChoiceStrategy
1736 ).defaultWorkerWeight
1737 ).toBeGreaterThan(0)
1738 expect(
1739 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1740 pool.workerChoiceStrategyContext.workerChoiceStrategy
1741 ).workerNodeVirtualTaskRunTime
1742 ).toBeGreaterThanOrEqual(0)
1743 // We need to clean up the resources after our test
1744 await pool.destroy()
1745 })
1746
1747 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1748 const pool = new DynamicThreadPool(
1749 min,
1750 max,
1751 './tests/worker-files/thread/testWorker.mjs',
1752 {
1753 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1754 workerChoiceStrategyOptions: {
1755 runTime: { median: true }
1756 }
1757 }
1758 )
1759 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1760 const promises = new Set()
1761 const maxMultiplier = 2
1762 for (let i = 0; i < max * maxMultiplier; i++) {
1763 promises.add(pool.execute())
1764 }
1765 await Promise.all(promises)
1766 for (const workerNode of pool.workerNodes) {
1767 expect(workerNode.usage).toStrictEqual({
1768 tasks: {
1769 executed: expect.any(Number),
1770 executing: 0,
1771 queued: 0,
1772 maxQueued: 0,
1773 sequentiallyStolen: 0,
1774 stolen: 0,
1775 failed: 0
1776 },
1777 runTime: expect.objectContaining({
1778 history: expect.any(CircularArray)
1779 }),
1780 waitTime: {
1781 history: new CircularArray()
1782 },
1783 elu: {
1784 idle: {
1785 history: new CircularArray()
1786 },
1787 active: {
1788 history: new CircularArray()
1789 }
1790 }
1791 })
1792 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1793 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1794 max * maxMultiplier
1795 )
1796 if (workerNode.usage.runTime.aggregate == null) {
1797 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1798 } else {
1799 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1800 }
1801 if (workerNode.usage.runTime.median == null) {
1802 expect(workerNode.usage.runTime.median).toBeUndefined()
1803 } else {
1804 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1805 }
1806 }
1807 expect(
1808 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1809 pool.workerChoiceStrategyContext.workerChoiceStrategy
1810 ).nextWorkerNodeKey
1811 ).toBe(0)
1812 expect(
1813 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1814 pool.workerChoiceStrategyContext.workerChoiceStrategy
1815 ).previousWorkerNodeKey
1816 ).toBe(0)
1817 expect(
1818 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1819 pool.workerChoiceStrategyContext.workerChoiceStrategy
1820 ).defaultWorkerWeight
1821 ).toBeGreaterThan(0)
1822 expect(
1823 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1824 pool.workerChoiceStrategyContext.workerChoiceStrategy
1825 ).workerNodeVirtualTaskRunTime
1826 ).toBeGreaterThanOrEqual(0)
1827 // We need to clean up the resources after our test
1828 await pool.destroy()
1829 })
1830
1831 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1832 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1833 let pool = new FixedThreadPool(
1834 max,
1835 './tests/worker-files/thread/testWorker.mjs'
1836 )
1837 expect(
1838 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1839 workerChoiceStrategy
1840 ).nextWorkerNodeKey
1841 ).toBeDefined()
1842 expect(
1843 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1844 workerChoiceStrategy
1845 ).previousWorkerNodeKey
1846 ).toBeDefined()
1847 expect(
1848 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1849 workerChoiceStrategy
1850 ).defaultWorkerWeight
1851 ).toBeDefined()
1852 expect(
1853 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1854 workerChoiceStrategy
1855 ).workerNodeVirtualTaskRunTime
1856 ).toBeDefined()
1857 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1858 expect(
1859 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1860 pool.workerChoiceStrategyContext.workerChoiceStrategy
1861 ).nextWorkerNodeKey
1862 ).toBe(0)
1863 expect(
1864 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1865 pool.workerChoiceStrategyContext.workerChoiceStrategy
1866 ).previousWorkerNodeKey
1867 ).toBe(0)
1868 expect(
1869 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1870 pool.workerChoiceStrategyContext.workerChoiceStrategy
1871 ).defaultWorkerWeight
1872 ).toBeGreaterThan(0)
1873 expect(
1874 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1875 pool.workerChoiceStrategyContext.workerChoiceStrategy
1876 ).workerNodeVirtualTaskRunTime
1877 ).toBe(0)
1878 await pool.destroy()
1879 pool = new DynamicThreadPool(
1880 min,
1881 max,
1882 './tests/worker-files/thread/testWorker.mjs'
1883 )
1884 expect(
1885 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1886 workerChoiceStrategy
1887 ).nextWorkerNodeKey
1888 ).toBeDefined()
1889 expect(
1890 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1891 workerChoiceStrategy
1892 ).previousWorkerNodeKey
1893 ).toBeDefined()
1894 expect(
1895 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1896 workerChoiceStrategy
1897 ).defaultWorkerWeight
1898 ).toBeDefined()
1899 expect(
1900 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1901 workerChoiceStrategy
1902 ).workerNodeVirtualTaskRunTime
1903 ).toBeDefined()
1904 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1905 expect(
1906 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1907 pool.workerChoiceStrategyContext.workerChoiceStrategy
1908 ).nextWorkerNodeKey
1909 ).toBe(0)
1910 expect(
1911 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1912 pool.workerChoiceStrategyContext.workerChoiceStrategy
1913 ).previousWorkerNodeKey
1914 ).toBe(0)
1915 expect(
1916 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1917 pool.workerChoiceStrategyContext.workerChoiceStrategy
1918 ).defaultWorkerWeight
1919 ).toBeGreaterThan(0)
1920 expect(
1921 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1922 pool.workerChoiceStrategyContext.workerChoiceStrategy
1923 ).workerNodeVirtualTaskRunTime
1924 ).toBe(0)
1925 // We need to clean up the resources after our test
1926 await pool.destroy()
1927 })
1928
1929 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1930 const workerChoiceStrategy =
1931 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1932 let pool = new FixedThreadPool(
1933 max,
1934 './tests/worker-files/thread/testWorker.mjs',
1935 { workerChoiceStrategy }
1936 )
1937 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1938 dynamicWorkerUsage: false,
1939 dynamicWorkerReady: true
1940 })
1941 await pool.destroy()
1942 pool = new DynamicThreadPool(
1943 min,
1944 max,
1945 './tests/worker-files/thread/testWorker.mjs',
1946 { workerChoiceStrategy }
1947 )
1948 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1949 dynamicWorkerUsage: false,
1950 dynamicWorkerReady: true
1951 })
1952 // We need to clean up the resources after our test
1953 await pool.destroy()
1954 })
1955
1956 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1957 const workerChoiceStrategy =
1958 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1959 let pool = new FixedThreadPool(
1960 max,
1961 './tests/worker-files/thread/testWorker.mjs',
1962 { workerChoiceStrategy }
1963 )
1964 expect(
1965 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1966 ).toStrictEqual({
1967 runTime: {
1968 aggregate: true,
1969 average: true,
1970 median: false
1971 },
1972 waitTime: {
1973 aggregate: false,
1974 average: false,
1975 median: false
1976 },
1977 elu: {
1978 aggregate: false,
1979 average: false,
1980 median: false
1981 }
1982 })
1983 await pool.destroy()
1984 pool = new DynamicThreadPool(
1985 min,
1986 max,
1987 './tests/worker-files/thread/testWorker.mjs',
1988 { workerChoiceStrategy }
1989 )
1990 expect(
1991 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1992 ).toStrictEqual({
1993 runTime: {
1994 aggregate: true,
1995 average: true,
1996 median: false
1997 },
1998 waitTime: {
1999 aggregate: false,
2000 average: false,
2001 median: false
2002 },
2003 elu: {
2004 aggregate: false,
2005 average: false,
2006 median: false
2007 }
2008 })
2009 // We need to clean up the resources after our test
2010 await pool.destroy()
2011 })
2012
2013 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
2014 const pool = new FixedThreadPool(
2015 max,
2016 './tests/worker-files/thread/testWorker.mjs',
2017 {
2018 workerChoiceStrategy:
2019 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2020 }
2021 )
2022 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2023 const promises = new Set()
2024 const maxMultiplier = 2
2025 for (let i = 0; i < max * maxMultiplier; i++) {
2026 promises.add(pool.execute())
2027 }
2028 await Promise.all(promises)
2029 for (const workerNode of pool.workerNodes) {
2030 expect(workerNode.usage).toStrictEqual({
2031 tasks: {
2032 executed: expect.any(Number),
2033 executing: 0,
2034 queued: 0,
2035 maxQueued: 0,
2036 sequentiallyStolen: 0,
2037 stolen: 0,
2038 failed: 0
2039 },
2040 runTime: expect.objectContaining({
2041 history: expect.any(CircularArray)
2042 }),
2043 waitTime: {
2044 history: new CircularArray()
2045 },
2046 elu: {
2047 idle: {
2048 history: new CircularArray()
2049 },
2050 active: {
2051 history: new CircularArray()
2052 }
2053 }
2054 })
2055 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2056 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2057 max * maxMultiplier
2058 )
2059 }
2060 expect(
2061 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2062 pool.workerChoiceStrategyContext.workerChoiceStrategy
2063 ).defaultWorkerWeight
2064 ).toBeGreaterThan(0)
2065 expect(
2066 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2067 pool.workerChoiceStrategyContext.workerChoiceStrategy
2068 ).roundId
2069 ).toBe(0)
2070 expect(
2071 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2072 pool.workerChoiceStrategyContext.workerChoiceStrategy
2073 ).workerNodeId
2074 ).toBe(0)
2075 expect(
2076 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2077 pool.workerChoiceStrategyContext.workerChoiceStrategy
2078 ).nextWorkerNodeKey
2079 ).toBe(0)
2080 expect(
2081 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2082 pool.workerChoiceStrategyContext.workerChoiceStrategy
2083 ).previousWorkerNodeKey
2084 ).toEqual(expect.any(Number))
2085 expect(
2086 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2087 pool.workerChoiceStrategyContext.workerChoiceStrategy
2088 ).roundWeights
2089 ).toStrictEqual([
2090 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2091 pool.workerChoiceStrategyContext.workerChoiceStrategy
2092 ).defaultWorkerWeight
2093 ])
2094 // We need to clean up the resources after our test
2095 await pool.destroy()
2096 })
2097
2098 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2099 const pool = new DynamicThreadPool(
2100 min,
2101 max,
2102 './tests/worker-files/thread/testWorker.mjs',
2103 {
2104 workerChoiceStrategy:
2105 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2106 }
2107 )
2108 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2109 const promises = new Set()
2110 const maxMultiplier = 2
2111 for (let i = 0; i < max * maxMultiplier; i++) {
2112 promises.add(pool.execute())
2113 }
2114 await Promise.all(promises)
2115 for (const workerNode of pool.workerNodes) {
2116 expect(workerNode.usage).toStrictEqual({
2117 tasks: {
2118 executed: expect.any(Number),
2119 executing: 0,
2120 queued: 0,
2121 maxQueued: 0,
2122 sequentiallyStolen: 0,
2123 stolen: 0,
2124 failed: 0
2125 },
2126 runTime: expect.objectContaining({
2127 history: expect.any(CircularArray)
2128 }),
2129 waitTime: {
2130 history: new CircularArray()
2131 },
2132 elu: {
2133 idle: {
2134 history: new CircularArray()
2135 },
2136 active: {
2137 history: new CircularArray()
2138 }
2139 }
2140 })
2141 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2142 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2143 max * maxMultiplier
2144 )
2145 }
2146 expect(
2147 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2148 pool.workerChoiceStrategyContext.workerChoiceStrategy
2149 ).defaultWorkerWeight
2150 ).toBeGreaterThan(0)
2151 expect(
2152 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2153 pool.workerChoiceStrategyContext.workerChoiceStrategy
2154 ).roundId
2155 ).toBe(0)
2156 expect(
2157 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2158 pool.workerChoiceStrategyContext.workerChoiceStrategy
2159 ).workerNodeId
2160 ).toBe(0)
2161 expect(
2162 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2163 pool.workerChoiceStrategyContext.workerChoiceStrategy
2164 ).nextWorkerNodeKey
2165 ).toBe(0)
2166 expect(
2167 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2168 pool.workerChoiceStrategyContext.workerChoiceStrategy
2169 ).previousWorkerNodeKey
2170 ).toEqual(expect.any(Number))
2171 expect(
2172 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2173 pool.workerChoiceStrategyContext.workerChoiceStrategy
2174 ).roundWeights
2175 ).toStrictEqual([
2176 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2177 pool.workerChoiceStrategyContext.workerChoiceStrategy
2178 ).defaultWorkerWeight
2179 ])
2180 // We need to clean up the resources after our test
2181 await pool.destroy()
2182 })
2183
2184 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2185 const workerChoiceStrategy =
2186 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2187 let pool = new FixedThreadPool(
2188 max,
2189 './tests/worker-files/thread/testWorker.mjs'
2190 )
2191 expect(
2192 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2193 workerChoiceStrategy
2194 ).roundId
2195 ).toBeDefined()
2196 expect(
2197 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2198 workerChoiceStrategy
2199 ).workerNodeId
2200 ).toBeDefined()
2201 expect(
2202 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2203 workerChoiceStrategy
2204 ).nextWorkerNodeKey
2205 ).toBeDefined()
2206 expect(
2207 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2208 workerChoiceStrategy
2209 ).previousWorkerNodeKey
2210 ).toBeDefined()
2211 expect(
2212 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2213 workerChoiceStrategy
2214 ).defaultWorkerWeight
2215 ).toBeDefined()
2216 expect(
2217 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2218 workerChoiceStrategy
2219 ).roundWeights
2220 ).toBeDefined()
2221 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2222 expect(
2223 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2224 pool.workerChoiceStrategyContext.workerChoiceStrategy
2225 ).roundId
2226 ).toBe(0)
2227 expect(
2228 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2229 pool.workerChoiceStrategyContext.workerChoiceStrategy
2230 ).workerNodeId
2231 ).toBe(0)
2232 expect(
2233 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2234 pool.workerChoiceStrategyContext.workerChoiceStrategy
2235 ).nextWorkerNodeKey
2236 ).toBe(0)
2237 expect(
2238 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2239 pool.workerChoiceStrategyContext.workerChoiceStrategy
2240 ).previousWorkerNodeKey
2241 ).toBe(0)
2242 expect(
2243 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2244 pool.workerChoiceStrategyContext.workerChoiceStrategy
2245 ).defaultWorkerWeight
2246 ).toBeGreaterThan(0)
2247 expect(
2248 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2249 pool.workerChoiceStrategyContext.workerChoiceStrategy
2250 ).roundWeights
2251 ).toStrictEqual([
2252 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2253 pool.workerChoiceStrategyContext.workerChoiceStrategy
2254 ).defaultWorkerWeight
2255 ])
2256 await pool.destroy()
2257 pool = new DynamicThreadPool(
2258 min,
2259 max,
2260 './tests/worker-files/thread/testWorker.mjs'
2261 )
2262 expect(
2263 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2264 workerChoiceStrategy
2265 ).roundId
2266 ).toBeDefined()
2267 expect(
2268 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2269 workerChoiceStrategy
2270 ).workerNodeId
2271 ).toBeDefined()
2272 expect(
2273 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2274 workerChoiceStrategy
2275 ).nextWorkerNodeKey
2276 ).toBeDefined()
2277 expect(
2278 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2279 workerChoiceStrategy
2280 ).previousWorkerNodeKey
2281 ).toBeDefined()
2282 expect(
2283 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2284 workerChoiceStrategy
2285 ).defaultWorkerWeight
2286 ).toBeDefined()
2287 expect(
2288 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2289 workerChoiceStrategy
2290 ).roundWeights
2291 ).toBeDefined()
2292 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2293 expect(
2294 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2295 pool.workerChoiceStrategyContext.workerChoiceStrategy
2296 ).roundId
2297 ).toBe(0)
2298 expect(
2299 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2300 pool.workerChoiceStrategyContext.workerChoiceStrategy
2301 ).workerNodeId
2302 ).toBe(0)
2303 expect(
2304 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2305 pool.workerChoiceStrategyContext.workerChoiceStrategy
2306 ).nextWorkerNodeKey
2307 ).toBe(0)
2308 expect(
2309 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2310 pool.workerChoiceStrategyContext.workerChoiceStrategy
2311 ).previousWorkerNodeKey
2312 ).toBe(0)
2313 expect(
2314 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2315 pool.workerChoiceStrategyContext.workerChoiceStrategy
2316 ).defaultWorkerWeight
2317 ).toBeGreaterThan(0)
2318 expect(
2319 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2320 pool.workerChoiceStrategyContext.workerChoiceStrategy
2321 ).roundWeights
2322 ).toStrictEqual([
2323 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2324 pool.workerChoiceStrategyContext.workerChoiceStrategy
2325 ).defaultWorkerWeight
2326 ])
2327 // We need to clean up the resources after our test
2328 await pool.destroy()
2329 })
2330
2331 it('Verify unknown strategy throw error', () => {
2332 expect(
2333 () =>
2334 new DynamicThreadPool(
2335 min,
2336 max,
2337 './tests/worker-files/thread/testWorker.mjs',
2338 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2339 )
2340 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2341 })
2342 })