refactor: cleanup eslint configuration
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2
3 import { CircularArray } from '../../../lib/circular-array.cjs'
4 import {
5 DynamicClusterPool,
6 DynamicThreadPool,
7 FixedClusterPool,
8 FixedThreadPool,
9 WorkerChoiceStrategies
10 } from '../../../lib/index.cjs'
11
12 describe('Selection strategies test suite', () => {
13 const min = 0
14 const max = 3
15
16 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
17 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
18 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
19 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
20 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
21 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
22 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
23 'WEIGHTED_ROUND_ROBIN'
24 )
25 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
26 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
27 )
28 })
29
30 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
31 const pool = new DynamicThreadPool(
32 min,
33 max,
34 './tests/worker-files/thread/testWorker.mjs'
35 )
36 expect(pool.opts.workerChoiceStrategy).toBe(
37 WorkerChoiceStrategies.ROUND_ROBIN
38 )
39 // We need to clean up the resources after our test
40 await pool.destroy()
41 })
42
43 it('Verify available strategies are taken at pool creation', async () => {
44 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
45 const pool = new FixedThreadPool(
46 max,
47 './tests/worker-files/thread/testWorker.mjs',
48 { workerChoiceStrategy }
49 )
50 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
51 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
52 workerChoiceStrategy
53 )
54 await pool.destroy()
55 }
56 })
57
58 it('Verify available strategies can be set after pool creation', async () => {
59 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
60 const pool = new DynamicThreadPool(
61 min,
62 max,
63 './tests/worker-files/thread/testWorker.mjs'
64 )
65 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
66 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
67 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
68 workerChoiceStrategy
69 )
70 await pool.destroy()
71 }
72 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
73 const pool = new DynamicClusterPool(
74 min,
75 max,
76 './tests/worker-files/cluster/testWorker.cjs'
77 )
78 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
79 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
80 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
81 workerChoiceStrategy
82 )
83 await pool.destroy()
84 }
85 })
86
87 it('Verify available strategies default internals at pool creation', async () => {
88 const pool = new FixedThreadPool(
89 max,
90 './tests/worker-files/thread/testWorker.mjs'
91 )
92 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
93 expect(
94 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
95 workerChoiceStrategy
96 ).nextWorkerNodeKey
97 ).toBe(0)
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).previousWorkerNodeKey
102 ).toBe(0)
103 if (
104 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
105 ) {
106 expect(
107 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
108 workerChoiceStrategy
109 ).workerNodeVirtualTaskRunTime
110 ).toBe(0)
111 } else if (
112 workerChoiceStrategy ===
113 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
114 ) {
115 expect(
116 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
117 workerChoiceStrategy
118 ).workerNodeVirtualTaskRunTime
119 ).toBe(0)
120 expect(
121 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
122 workerChoiceStrategy
123 ).roundId
124 ).toBe(0)
125 expect(
126 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
127 workerChoiceStrategy
128 ).workerNodeId
129 ).toBe(0)
130 expect(
131 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
132 workerChoiceStrategy
133 ).roundWeights.length
134 ).toBe(1)
135 expect(
136 Number.isSafeInteger(
137 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
138 workerChoiceStrategy
139 ).roundWeights[0]
140 )
141 ).toBe(true)
142 }
143 }
144 await pool.destroy()
145 })
146
147 it('Verify ROUND_ROBIN strategy default policy', async () => {
148 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
149 let pool = new FixedThreadPool(
150 max,
151 './tests/worker-files/thread/testWorker.mjs',
152 { workerChoiceStrategy }
153 )
154 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
155 dynamicWorkerUsage: false,
156 dynamicWorkerReady: true
157 })
158 await pool.destroy()
159 pool = new DynamicThreadPool(
160 min,
161 max,
162 './tests/worker-files/thread/testWorker.mjs',
163 { workerChoiceStrategy }
164 )
165 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
166 dynamicWorkerUsage: false,
167 dynamicWorkerReady: true
168 })
169 // We need to clean up the resources after our test
170 await pool.destroy()
171 })
172
173 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
174 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
175 let pool = new FixedThreadPool(
176 max,
177 './tests/worker-files/thread/testWorker.mjs',
178 { workerChoiceStrategy }
179 )
180 expect(
181 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
182 ).toStrictEqual({
183 runTime: {
184 aggregate: false,
185 average: false,
186 median: false
187 },
188 waitTime: {
189 aggregate: false,
190 average: false,
191 median: false
192 },
193 elu: {
194 aggregate: false,
195 average: false,
196 median: false
197 }
198 })
199 await pool.destroy()
200 pool = new DynamicThreadPool(
201 min,
202 max,
203 './tests/worker-files/thread/testWorker.mjs',
204 { workerChoiceStrategy }
205 )
206 expect(
207 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
208 ).toStrictEqual({
209 runTime: {
210 aggregate: false,
211 average: false,
212 median: false
213 },
214 waitTime: {
215 aggregate: false,
216 average: false,
217 median: false
218 },
219 elu: {
220 aggregate: false,
221 average: false,
222 median: false
223 }
224 })
225 // We need to clean up the resources after our test
226 await pool.destroy()
227 })
228
229 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
230 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
231 const pool = new FixedThreadPool(
232 max,
233 './tests/worker-files/thread/testWorker.mjs',
234 { workerChoiceStrategy }
235 )
236 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
237 const promises = new Set()
238 const maxMultiplier = 2
239 for (let i = 0; i < max * maxMultiplier; i++) {
240 promises.add(pool.execute())
241 }
242 await Promise.all(promises)
243 for (const workerNode of pool.workerNodes) {
244 expect(workerNode.usage).toStrictEqual({
245 tasks: {
246 executed: maxMultiplier,
247 executing: 0,
248 queued: 0,
249 maxQueued: 0,
250 sequentiallyStolen: 0,
251 stolen: 0,
252 failed: 0
253 },
254 runTime: {
255 history: new CircularArray()
256 },
257 waitTime: {
258 history: new CircularArray()
259 },
260 elu: {
261 idle: {
262 history: new CircularArray()
263 },
264 active: {
265 history: new CircularArray()
266 }
267 }
268 })
269 }
270 expect(
271 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
272 pool.workerChoiceStrategyContext.workerChoiceStrategy
273 ).nextWorkerNodeKey
274 ).toBe(0)
275 expect(
276 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
277 pool.workerChoiceStrategyContext.workerChoiceStrategy
278 ).previousWorkerNodeKey
279 ).toBe(pool.workerNodes.length - 1)
280 // We need to clean up the resources after our test
281 await pool.destroy()
282 })
283
284 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
285 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
286 const pool = new DynamicThreadPool(
287 min,
288 max,
289 './tests/worker-files/thread/testWorker.mjs',
290 { workerChoiceStrategy }
291 )
292 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
293 const promises = new Set()
294 const maxMultiplier = 2
295 for (let i = 0; i < max * maxMultiplier; i++) {
296 promises.add(pool.execute())
297 }
298 await Promise.all(promises)
299 for (const workerNode of pool.workerNodes) {
300 expect(workerNode.usage).toStrictEqual({
301 tasks: {
302 executed: expect.any(Number),
303 executing: 0,
304 queued: 0,
305 maxQueued: 0,
306 sequentiallyStolen: 0,
307 stolen: 0,
308 failed: 0
309 },
310 runTime: {
311 history: new CircularArray()
312 },
313 waitTime: {
314 history: new CircularArray()
315 },
316 elu: {
317 idle: {
318 history: new CircularArray()
319 },
320 active: {
321 history: new CircularArray()
322 }
323 }
324 })
325 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
326 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
327 max * maxMultiplier
328 )
329 }
330 expect(
331 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
332 pool.workerChoiceStrategyContext.workerChoiceStrategy
333 ).nextWorkerNodeKey
334 ).toBe(0)
335 expect(
336 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
337 pool.workerChoiceStrategyContext.workerChoiceStrategy
338 ).previousWorkerNodeKey
339 ).toBe(pool.workerNodes.length - 1)
340 // We need to clean up the resources after our test
341 await pool.destroy()
342 })
343
344 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
345 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
346 let pool = new FixedClusterPool(
347 max,
348 './tests/worker-files/cluster/testWorker.cjs',
349 { workerChoiceStrategy }
350 )
351 let results = new Set()
352 for (let i = 0; i < max; i++) {
353 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
354 }
355 expect(results.size).toBe(max)
356 await pool.destroy()
357 pool = new FixedThreadPool(
358 max,
359 './tests/worker-files/thread/testWorker.mjs',
360 { workerChoiceStrategy }
361 )
362 results = new Set()
363 for (let i = 0; i < max; i++) {
364 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
365 }
366 expect(results.size).toBe(max)
367 await pool.destroy()
368 })
369
370 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
371 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
372 let pool = new FixedThreadPool(
373 max,
374 './tests/worker-files/thread/testWorker.mjs',
375 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
376 )
377 expect(
378 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
379 pool.workerChoiceStrategyContext.workerChoiceStrategy
380 ).nextWorkerNodeKey
381 ).toBeDefined()
382 expect(
383 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
384 pool.workerChoiceStrategyContext.workerChoiceStrategy
385 ).previousWorkerNodeKey
386 ).toBeDefined()
387 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
388 expect(
389 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
390 pool.workerChoiceStrategyContext.workerChoiceStrategy
391 ).nextWorkerNodeKey
392 ).toBe(0)
393 expect(
394 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
395 pool.workerChoiceStrategyContext.workerChoiceStrategy
396 ).previousWorkerNodeKey
397 ).toBe(0)
398 await pool.destroy()
399 pool = new DynamicThreadPool(
400 min,
401 max,
402 './tests/worker-files/thread/testWorker.mjs',
403 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
404 )
405 expect(
406 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
407 pool.workerChoiceStrategyContext.workerChoiceStrategy
408 ).nextWorkerNodeKey
409 ).toBeDefined()
410 expect(
411 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
412 pool.workerChoiceStrategyContext.workerChoiceStrategy
413 ).previousWorkerNodeKey
414 ).toBeDefined()
415 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
416 expect(
417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
418 pool.workerChoiceStrategyContext.workerChoiceStrategy
419 ).nextWorkerNodeKey
420 ).toBe(0)
421 expect(
422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
423 pool.workerChoiceStrategyContext.workerChoiceStrategy
424 ).previousWorkerNodeKey
425 ).toBe(0)
426 // We need to clean up the resources after our test
427 await pool.destroy()
428 })
429
430 it('Verify LEAST_USED strategy default policy', async () => {
431 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
432 let pool = new FixedThreadPool(
433 max,
434 './tests/worker-files/thread/testWorker.mjs',
435 { workerChoiceStrategy }
436 )
437 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
438 dynamicWorkerUsage: false,
439 dynamicWorkerReady: true
440 })
441 await pool.destroy()
442 pool = new DynamicThreadPool(
443 min,
444 max,
445 './tests/worker-files/thread/testWorker.mjs',
446 { workerChoiceStrategy }
447 )
448 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
449 dynamicWorkerUsage: false,
450 dynamicWorkerReady: true
451 })
452 // We need to clean up the resources after our test
453 await pool.destroy()
454 })
455
456 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
457 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
458 let pool = new FixedThreadPool(
459 max,
460 './tests/worker-files/thread/testWorker.mjs',
461 { workerChoiceStrategy }
462 )
463 expect(
464 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
465 ).toStrictEqual({
466 runTime: {
467 aggregate: false,
468 average: false,
469 median: false
470 },
471 waitTime: {
472 aggregate: false,
473 average: false,
474 median: false
475 },
476 elu: {
477 aggregate: false,
478 average: false,
479 median: false
480 }
481 })
482 await pool.destroy()
483 pool = new DynamicThreadPool(
484 min,
485 max,
486 './tests/worker-files/thread/testWorker.mjs',
487 { workerChoiceStrategy }
488 )
489 expect(
490 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 ).toStrictEqual({
492 runTime: {
493 aggregate: false,
494 average: false,
495 median: false
496 },
497 waitTime: {
498 aggregate: false,
499 average: false,
500 median: false
501 },
502 elu: {
503 aggregate: false,
504 average: false,
505 median: false
506 }
507 })
508 // We need to clean up the resources after our test
509 await pool.destroy()
510 })
511
512 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
513 const pool = new FixedThreadPool(
514 max,
515 './tests/worker-files/thread/testWorker.mjs',
516 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
517 )
518 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
519 const promises = new Set()
520 const maxMultiplier = 2
521 for (let i = 0; i < max * maxMultiplier; i++) {
522 promises.add(pool.execute())
523 }
524 await Promise.all(promises)
525 for (const workerNode of pool.workerNodes) {
526 expect(workerNode.usage).toStrictEqual({
527 tasks: {
528 executed: expect.any(Number),
529 executing: 0,
530 queued: 0,
531 maxQueued: 0,
532 sequentiallyStolen: 0,
533 stolen: 0,
534 failed: 0
535 },
536 runTime: {
537 history: new CircularArray()
538 },
539 waitTime: {
540 history: new CircularArray()
541 },
542 elu: {
543 idle: {
544 history: new CircularArray()
545 },
546 active: {
547 history: new CircularArray()
548 }
549 }
550 })
551 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
552 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
553 max * maxMultiplier
554 )
555 }
556 expect(
557 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
558 pool.workerChoiceStrategyContext.workerChoiceStrategy
559 ).nextWorkerNodeKey
560 ).toEqual(expect.any(Number))
561 expect(
562 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
563 pool.workerChoiceStrategyContext.workerChoiceStrategy
564 ).previousWorkerNodeKey
565 ).toEqual(expect.any(Number))
566 // We need to clean up the resources after our test
567 await pool.destroy()
568 })
569
570 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
571 const pool = new DynamicThreadPool(
572 min,
573 max,
574 './tests/worker-files/thread/testWorker.mjs',
575 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
576 )
577 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
578 const promises = new Set()
579 const maxMultiplier = 2
580 for (let i = 0; i < max * maxMultiplier; i++) {
581 promises.add(pool.execute())
582 }
583 await Promise.all(promises)
584 for (const workerNode of pool.workerNodes) {
585 expect(workerNode.usage).toStrictEqual({
586 tasks: {
587 executed: expect.any(Number),
588 executing: 0,
589 queued: 0,
590 maxQueued: 0,
591 sequentiallyStolen: 0,
592 stolen: 0,
593 failed: 0
594 },
595 runTime: {
596 history: new CircularArray()
597 },
598 waitTime: {
599 history: new CircularArray()
600 },
601 elu: {
602 idle: {
603 history: new CircularArray()
604 },
605 active: {
606 history: new CircularArray()
607 }
608 }
609 })
610 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
611 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
612 max * maxMultiplier
613 )
614 }
615 expect(
616 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
617 pool.workerChoiceStrategyContext.workerChoiceStrategy
618 ).nextWorkerNodeKey
619 ).toEqual(expect.any(Number))
620 expect(
621 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
622 pool.workerChoiceStrategyContext.workerChoiceStrategy
623 ).previousWorkerNodeKey
624 ).toEqual(expect.any(Number))
625 // We need to clean up the resources after our test
626 await pool.destroy()
627 })
628
629 it('Verify LEAST_BUSY strategy default policy', async () => {
630 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
631 let pool = new FixedThreadPool(
632 max,
633 './tests/worker-files/thread/testWorker.mjs',
634 { workerChoiceStrategy }
635 )
636 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
637 dynamicWorkerUsage: false,
638 dynamicWorkerReady: true
639 })
640 await pool.destroy()
641 pool = new DynamicThreadPool(
642 min,
643 max,
644 './tests/worker-files/thread/testWorker.mjs',
645 { workerChoiceStrategy }
646 )
647 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
648 dynamicWorkerUsage: false,
649 dynamicWorkerReady: true
650 })
651 // We need to clean up the resources after our test
652 await pool.destroy()
653 })
654
655 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
656 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
657 let pool = new FixedThreadPool(
658 max,
659 './tests/worker-files/thread/testWorker.mjs',
660 { workerChoiceStrategy }
661 )
662 expect(
663 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
664 ).toStrictEqual({
665 runTime: {
666 aggregate: true,
667 average: false,
668 median: false
669 },
670 waitTime: {
671 aggregate: true,
672 average: false,
673 median: false
674 },
675 elu: {
676 aggregate: false,
677 average: false,
678 median: false
679 }
680 })
681 await pool.destroy()
682 pool = new DynamicThreadPool(
683 min,
684 max,
685 './tests/worker-files/thread/testWorker.mjs',
686 { workerChoiceStrategy }
687 )
688 expect(
689 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
690 ).toStrictEqual({
691 runTime: {
692 aggregate: true,
693 average: false,
694 median: false
695 },
696 waitTime: {
697 aggregate: true,
698 average: false,
699 median: false
700 },
701 elu: {
702 aggregate: false,
703 average: false,
704 median: false
705 }
706 })
707 // We need to clean up the resources after our test
708 await pool.destroy()
709 })
710
711 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
712 const pool = new FixedThreadPool(
713 max,
714 './tests/worker-files/thread/testWorker.mjs',
715 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
716 )
717 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
718 const promises = new Set()
719 const maxMultiplier = 2
720 for (let i = 0; i < max * maxMultiplier; i++) {
721 promises.add(pool.execute())
722 }
723 await Promise.all(promises)
724 for (const workerNode of pool.workerNodes) {
725 expect(workerNode.usage).toStrictEqual({
726 tasks: {
727 executed: expect.any(Number),
728 executing: 0,
729 queued: 0,
730 maxQueued: 0,
731 sequentiallyStolen: 0,
732 stolen: 0,
733 failed: 0
734 },
735 runTime: expect.objectContaining({
736 history: expect.any(CircularArray)
737 }),
738 waitTime: expect.objectContaining({
739 history: expect.any(CircularArray)
740 }),
741 elu: {
742 idle: {
743 history: new CircularArray()
744 },
745 active: {
746 history: new CircularArray()
747 }
748 }
749 })
750 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
751 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
752 max * maxMultiplier
753 )
754 if (workerNode.usage.runTime.aggregate == null) {
755 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
756 } else {
757 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
758 }
759 if (workerNode.usage.waitTime.aggregate == null) {
760 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
761 } else {
762 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
763 }
764 }
765 expect(
766 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
767 pool.workerChoiceStrategyContext.workerChoiceStrategy
768 ).nextWorkerNodeKey
769 ).toEqual(expect.any(Number))
770 expect(
771 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
772 pool.workerChoiceStrategyContext.workerChoiceStrategy
773 ).previousWorkerNodeKey
774 ).toEqual(expect.any(Number))
775 // We need to clean up the resources after our test
776 await pool.destroy()
777 })
778
779 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
780 const pool = new DynamicThreadPool(
781 min,
782 max,
783 './tests/worker-files/thread/testWorker.mjs',
784 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
785 )
786 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
787 const promises = new Set()
788 const maxMultiplier = 2
789 for (let i = 0; i < max * maxMultiplier; i++) {
790 promises.add(pool.execute())
791 }
792 await Promise.all(promises)
793 for (const workerNode of pool.workerNodes) {
794 expect(workerNode.usage).toStrictEqual({
795 tasks: {
796 executed: expect.any(Number),
797 executing: 0,
798 queued: 0,
799 maxQueued: 0,
800 sequentiallyStolen: 0,
801 stolen: 0,
802 failed: 0
803 },
804 runTime: expect.objectContaining({
805 history: expect.any(CircularArray)
806 }),
807 waitTime: expect.objectContaining({
808 history: expect.any(CircularArray)
809 }),
810 elu: {
811 idle: {
812 history: new CircularArray()
813 },
814 active: {
815 history: new CircularArray()
816 }
817 }
818 })
819 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
820 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
821 max * maxMultiplier
822 )
823 if (workerNode.usage.runTime.aggregate == null) {
824 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
825 } else {
826 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
827 }
828 if (workerNode.usage.waitTime.aggregate == null) {
829 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
830 } else {
831 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
832 }
833 }
834 expect(
835 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
836 pool.workerChoiceStrategyContext.workerChoiceStrategy
837 ).nextWorkerNodeKey
838 ).toEqual(expect.any(Number))
839 expect(
840 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
841 pool.workerChoiceStrategyContext.workerChoiceStrategy
842 ).previousWorkerNodeKey
843 ).toEqual(expect.any(Number))
844 // We need to clean up the resources after our test
845 await pool.destroy()
846 })
847
848 it('Verify LEAST_ELU strategy default policy', async () => {
849 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
850 let pool = new FixedThreadPool(
851 max,
852 './tests/worker-files/thread/testWorker.mjs',
853 { workerChoiceStrategy }
854 )
855 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
856 dynamicWorkerUsage: false,
857 dynamicWorkerReady: true
858 })
859 await pool.destroy()
860 pool = new DynamicThreadPool(
861 min,
862 max,
863 './tests/worker-files/thread/testWorker.mjs',
864 { workerChoiceStrategy }
865 )
866 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
867 dynamicWorkerUsage: false,
868 dynamicWorkerReady: true
869 })
870 // We need to clean up the resources after our test
871 await pool.destroy()
872 })
873
874 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
875 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
876 let pool = new FixedThreadPool(
877 max,
878 './tests/worker-files/thread/testWorker.mjs',
879 { workerChoiceStrategy }
880 )
881 expect(
882 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
883 ).toStrictEqual({
884 runTime: {
885 aggregate: false,
886 average: false,
887 median: false
888 },
889 waitTime: {
890 aggregate: false,
891 average: false,
892 median: false
893 },
894 elu: {
895 aggregate: true,
896 average: false,
897 median: false
898 }
899 })
900 await pool.destroy()
901 pool = new DynamicThreadPool(
902 min,
903 max,
904 './tests/worker-files/thread/testWorker.mjs',
905 { workerChoiceStrategy }
906 )
907 expect(
908 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
909 ).toStrictEqual({
910 runTime: {
911 aggregate: false,
912 average: false,
913 median: false
914 },
915 waitTime: {
916 aggregate: false,
917 average: false,
918 median: false
919 },
920 elu: {
921 aggregate: true,
922 average: false,
923 median: false
924 }
925 })
926 // We need to clean up the resources after our test
927 await pool.destroy()
928 })
929
930 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
931 const pool = new FixedThreadPool(
932 max,
933 './tests/worker-files/thread/testWorker.mjs',
934 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
935 )
936 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
937 const promises = new Set()
938 const maxMultiplier = 2
939 for (let i = 0; i < max * maxMultiplier; i++) {
940 promises.add(pool.execute())
941 }
942 await Promise.all(promises)
943 for (const workerNode of pool.workerNodes) {
944 expect(workerNode.usage).toStrictEqual({
945 tasks: {
946 executed: expect.any(Number),
947 executing: 0,
948 queued: 0,
949 maxQueued: 0,
950 sequentiallyStolen: 0,
951 stolen: 0,
952 failed: 0
953 },
954 runTime: {
955 history: new CircularArray()
956 },
957 waitTime: {
958 history: new CircularArray()
959 },
960 elu: expect.objectContaining({
961 idle: expect.objectContaining({
962 history: expect.any(CircularArray)
963 }),
964 active: expect.objectContaining({
965 history: expect.any(CircularArray)
966 })
967 })
968 })
969 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
970 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
971 max * maxMultiplier
972 )
973 if (workerNode.usage.elu.active.aggregate == null) {
974 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
975 } else {
976 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
977 }
978 if (workerNode.usage.elu.idle.aggregate == null) {
979 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
980 } else {
981 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
982 }
983 if (workerNode.usage.elu.utilization == null) {
984 expect(workerNode.usage.elu.utilization).toBeUndefined()
985 } else {
986 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
987 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
988 }
989 }
990 expect(
991 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
992 pool.workerChoiceStrategyContext.workerChoiceStrategy
993 ).nextWorkerNodeKey
994 ).toEqual(expect.any(Number))
995 expect(
996 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
997 pool.workerChoiceStrategyContext.workerChoiceStrategy
998 ).previousWorkerNodeKey
999 ).toEqual(expect.any(Number))
1000 // We need to clean up the resources after our test
1001 await pool.destroy()
1002 })
1003
1004 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1005 const pool = new DynamicThreadPool(
1006 min,
1007 max,
1008 './tests/worker-files/thread/testWorker.mjs',
1009 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1010 )
1011 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1012 const promises = new Set()
1013 const maxMultiplier = 2
1014 for (let i = 0; i < max * maxMultiplier; i++) {
1015 promises.add(pool.execute())
1016 }
1017 await Promise.all(promises)
1018 for (const workerNode of pool.workerNodes) {
1019 expect(workerNode.usage).toStrictEqual({
1020 tasks: {
1021 executed: expect.any(Number),
1022 executing: 0,
1023 queued: 0,
1024 maxQueued: 0,
1025 sequentiallyStolen: 0,
1026 stolen: 0,
1027 failed: 0
1028 },
1029 runTime: {
1030 history: new CircularArray()
1031 },
1032 waitTime: {
1033 history: new CircularArray()
1034 },
1035 elu: expect.objectContaining({
1036 idle: expect.objectContaining({
1037 history: expect.any(CircularArray)
1038 }),
1039 active: expect.objectContaining({
1040 history: expect.any(CircularArray)
1041 })
1042 })
1043 })
1044 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1045 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1046 max * maxMultiplier
1047 )
1048 if (workerNode.usage.elu.active.aggregate == null) {
1049 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1050 } else {
1051 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1052 }
1053 if (workerNode.usage.elu.idle.aggregate == null) {
1054 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1055 } else {
1056 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1057 }
1058 if (workerNode.usage.elu.utilization == null) {
1059 expect(workerNode.usage.elu.utilization).toBeUndefined()
1060 } else {
1061 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1062 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1063 }
1064 }
1065 expect(
1066 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1067 pool.workerChoiceStrategyContext.workerChoiceStrategy
1068 ).nextWorkerNodeKey
1069 ).toEqual(expect.any(Number))
1070 expect(
1071 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1072 pool.workerChoiceStrategyContext.workerChoiceStrategy
1073 ).previousWorkerNodeKey
1074 ).toEqual(expect.any(Number))
1075 // We need to clean up the resources after our test
1076 await pool.destroy()
1077 })
1078
1079 it('Verify FAIR_SHARE strategy default policy', async () => {
1080 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1081 let pool = new FixedThreadPool(
1082 max,
1083 './tests/worker-files/thread/testWorker.mjs',
1084 { workerChoiceStrategy }
1085 )
1086 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1087 dynamicWorkerUsage: false,
1088 dynamicWorkerReady: true
1089 })
1090 await pool.destroy()
1091 pool = new DynamicThreadPool(
1092 min,
1093 max,
1094 './tests/worker-files/thread/testWorker.mjs',
1095 { workerChoiceStrategy }
1096 )
1097 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1098 dynamicWorkerUsage: false,
1099 dynamicWorkerReady: true
1100 })
1101 // We need to clean up the resources after our test
1102 await pool.destroy()
1103 })
1104
1105 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1106 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1107 let pool = new FixedThreadPool(
1108 max,
1109 './tests/worker-files/thread/testWorker.mjs',
1110 { workerChoiceStrategy }
1111 )
1112 expect(
1113 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1114 ).toStrictEqual({
1115 runTime: {
1116 aggregate: true,
1117 average: true,
1118 median: false
1119 },
1120 waitTime: {
1121 aggregate: false,
1122 average: false,
1123 median: false
1124 },
1125 elu: {
1126 aggregate: true,
1127 average: true,
1128 median: false
1129 }
1130 })
1131 await pool.destroy()
1132 pool = new DynamicThreadPool(
1133 min,
1134 max,
1135 './tests/worker-files/thread/testWorker.mjs',
1136 { workerChoiceStrategy }
1137 )
1138 expect(
1139 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1140 ).toStrictEqual({
1141 runTime: {
1142 aggregate: true,
1143 average: true,
1144 median: false
1145 },
1146 waitTime: {
1147 aggregate: false,
1148 average: false,
1149 median: false
1150 },
1151 elu: {
1152 aggregate: true,
1153 average: true,
1154 median: false
1155 }
1156 })
1157 // We need to clean up the resources after our test
1158 await pool.destroy()
1159 })
1160
1161 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1162 const pool = new FixedThreadPool(
1163 max,
1164 './tests/worker-files/thread/testWorker.mjs',
1165 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1166 )
1167 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1168 const promises = new Set()
1169 const maxMultiplier = 2
1170 for (let i = 0; i < max * maxMultiplier; i++) {
1171 promises.add(pool.execute())
1172 }
1173 await Promise.all(promises)
1174 for (const workerNode of pool.workerNodes) {
1175 expect(workerNode.usage).toStrictEqual({
1176 tasks: {
1177 executed: expect.any(Number),
1178 executing: 0,
1179 queued: 0,
1180 maxQueued: 0,
1181 sequentiallyStolen: 0,
1182 stolen: 0,
1183 failed: 0
1184 },
1185 runTime: expect.objectContaining({
1186 history: expect.any(CircularArray)
1187 }),
1188 waitTime: {
1189 history: new CircularArray()
1190 },
1191 elu: expect.objectContaining({
1192 idle: expect.objectContaining({
1193 history: expect.any(CircularArray)
1194 }),
1195 active: expect.objectContaining({
1196 history: expect.any(CircularArray)
1197 })
1198 })
1199 })
1200 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1201 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1202 max * maxMultiplier
1203 )
1204 if (workerNode.usage.runTime.aggregate == null) {
1205 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1206 } else {
1207 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1208 }
1209 if (workerNode.usage.runTime.average == null) {
1210 expect(workerNode.usage.runTime.average).toBeUndefined()
1211 } else {
1212 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1213 }
1214 if (workerNode.usage.elu.active.aggregate == null) {
1215 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1216 } else {
1217 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1218 }
1219 if (workerNode.usage.elu.idle.aggregate == null) {
1220 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1221 } else {
1222 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1223 }
1224 if (workerNode.usage.elu.utilization == null) {
1225 expect(workerNode.usage.elu.utilization).toBeUndefined()
1226 } else {
1227 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1228 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1229 }
1230 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1231 }
1232 expect(
1233 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1234 pool.workerChoiceStrategyContext.workerChoiceStrategy
1235 ).nextWorkerNodeKey
1236 ).toEqual(expect.any(Number))
1237 expect(
1238 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1239 pool.workerChoiceStrategyContext.workerChoiceStrategy
1240 ).previousWorkerNodeKey
1241 ).toEqual(expect.any(Number))
1242 // We need to clean up the resources after our test
1243 await pool.destroy()
1244 })
1245
1246 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1247 const pool = new DynamicThreadPool(
1248 min,
1249 max,
1250 './tests/worker-files/thread/testWorker.mjs',
1251 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1252 )
1253 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1254 const promises = new Set()
1255 const maxMultiplier = 2
1256 for (let i = 0; i < max * maxMultiplier; i++) {
1257 promises.add(pool.execute())
1258 }
1259 await Promise.all(promises)
1260 for (const workerNode of pool.workerNodes) {
1261 expect(workerNode.usage).toStrictEqual({
1262 tasks: {
1263 executed: expect.any(Number),
1264 executing: 0,
1265 queued: 0,
1266 maxQueued: 0,
1267 sequentiallyStolen: 0,
1268 stolen: 0,
1269 failed: 0
1270 },
1271 runTime: expect.objectContaining({
1272 history: expect.any(CircularArray)
1273 }),
1274 waitTime: {
1275 history: new CircularArray()
1276 },
1277 elu: expect.objectContaining({
1278 idle: expect.objectContaining({
1279 history: expect.any(CircularArray)
1280 }),
1281 active: expect.objectContaining({
1282 history: expect.any(CircularArray)
1283 })
1284 })
1285 })
1286 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1287 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1288 max * maxMultiplier
1289 )
1290 if (workerNode.usage.runTime.aggregate == null) {
1291 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1292 } else {
1293 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1294 }
1295 if (workerNode.usage.runTime.average == null) {
1296 expect(workerNode.usage.runTime.average).toBeUndefined()
1297 } else {
1298 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1299 }
1300 if (workerNode.usage.elu.active.aggregate == null) {
1301 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1302 } else {
1303 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1304 }
1305 if (workerNode.usage.elu.idle.aggregate == null) {
1306 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1307 } else {
1308 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1309 }
1310 if (workerNode.usage.elu.utilization == null) {
1311 expect(workerNode.usage.elu.utilization).toBeUndefined()
1312 } else {
1313 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1314 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1315 }
1316 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1317 }
1318 expect(
1319 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1320 pool.workerChoiceStrategyContext.workerChoiceStrategy
1321 ).nextWorkerNodeKey
1322 ).toEqual(expect.any(Number))
1323 expect(
1324 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1325 pool.workerChoiceStrategyContext.workerChoiceStrategy
1326 ).previousWorkerNodeKey
1327 ).toEqual(expect.any(Number))
1328 // We need to clean up the resources after our test
1329 await pool.destroy()
1330 })
1331
1332 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1333 const pool = new DynamicThreadPool(
1334 min,
1335 max,
1336 './tests/worker-files/thread/testWorker.mjs',
1337 {
1338 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1339 workerChoiceStrategyOptions: {
1340 runTime: { median: true }
1341 }
1342 }
1343 )
1344 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1345 const promises = new Set()
1346 const maxMultiplier = 2
1347 for (let i = 0; i < max * maxMultiplier; i++) {
1348 promises.add(pool.execute())
1349 }
1350 await Promise.all(promises)
1351 for (const workerNode of pool.workerNodes) {
1352 expect(workerNode.usage).toStrictEqual({
1353 tasks: {
1354 executed: expect.any(Number),
1355 executing: 0,
1356 queued: 0,
1357 maxQueued: 0,
1358 sequentiallyStolen: 0,
1359 stolen: 0,
1360 failed: 0
1361 },
1362 runTime: expect.objectContaining({
1363 history: expect.any(CircularArray)
1364 }),
1365 waitTime: {
1366 history: new CircularArray()
1367 },
1368 elu: expect.objectContaining({
1369 idle: expect.objectContaining({
1370 history: expect.any(CircularArray)
1371 }),
1372 active: expect.objectContaining({
1373 history: expect.any(CircularArray)
1374 })
1375 })
1376 })
1377 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1378 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1379 max * maxMultiplier
1380 )
1381 if (workerNode.usage.runTime.aggregate == null) {
1382 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1383 } else {
1384 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1385 }
1386 if (workerNode.usage.runTime.median == null) {
1387 expect(workerNode.usage.runTime.median).toBeUndefined()
1388 } else {
1389 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1390 }
1391 if (workerNode.usage.elu.active.aggregate == null) {
1392 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1393 } else {
1394 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1395 }
1396 if (workerNode.usage.elu.idle.aggregate == null) {
1397 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1398 } else {
1399 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1400 }
1401 if (workerNode.usage.elu.utilization == null) {
1402 expect(workerNode.usage.elu.utilization).toBeUndefined()
1403 } else {
1404 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1405 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1406 }
1407 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1408 }
1409 expect(
1410 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1411 pool.workerChoiceStrategyContext.workerChoiceStrategy
1412 ).nextWorkerNodeKey
1413 ).toEqual(expect.any(Number))
1414 expect(
1415 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1416 pool.workerChoiceStrategyContext.workerChoiceStrategy
1417 ).previousWorkerNodeKey
1418 ).toEqual(expect.any(Number))
1419 // We need to clean up the resources after our test
1420 await pool.destroy()
1421 })
1422
1423 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1424 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1425 let pool = new FixedThreadPool(
1426 max,
1427 './tests/worker-files/thread/testWorker.mjs'
1428 )
1429 for (const workerNode of pool.workerNodes) {
1430 workerNode.strategyData = {
1431 virtualTaskEndTimestamp: performance.now()
1432 }
1433 }
1434 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1435 for (const workerNode of pool.workerNodes) {
1436 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1437 }
1438 await pool.destroy()
1439 pool = new DynamicThreadPool(
1440 min,
1441 max,
1442 './tests/worker-files/thread/testWorker.mjs'
1443 )
1444 for (const workerNode of pool.workerNodes) {
1445 workerNode.strategyData = {
1446 virtualTaskEndTimestamp: performance.now()
1447 }
1448 }
1449 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1450 for (const workerNode of pool.workerNodes) {
1451 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1452 }
1453 // We need to clean up the resources after our test
1454 await pool.destroy()
1455 })
1456
1457 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1458 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1459 let pool = new FixedThreadPool(
1460 max,
1461 './tests/worker-files/thread/testWorker.mjs',
1462 { workerChoiceStrategy }
1463 )
1464 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1465 dynamicWorkerUsage: false,
1466 dynamicWorkerReady: true
1467 })
1468 await pool.destroy()
1469 pool = new DynamicThreadPool(
1470 min,
1471 max,
1472 './tests/worker-files/thread/testWorker.mjs',
1473 { workerChoiceStrategy }
1474 )
1475 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1476 dynamicWorkerUsage: false,
1477 dynamicWorkerReady: true
1478 })
1479 // We need to clean up the resources after our test
1480 await pool.destroy()
1481 })
1482
1483 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1484 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1485 let pool = new FixedThreadPool(
1486 max,
1487 './tests/worker-files/thread/testWorker.mjs',
1488 { workerChoiceStrategy }
1489 )
1490 expect(
1491 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1492 ).toStrictEqual({
1493 runTime: {
1494 aggregate: true,
1495 average: true,
1496 median: false
1497 },
1498 waitTime: {
1499 aggregate: false,
1500 average: false,
1501 median: false
1502 },
1503 elu: {
1504 aggregate: false,
1505 average: false,
1506 median: false
1507 }
1508 })
1509 await pool.destroy()
1510 pool = new DynamicThreadPool(
1511 min,
1512 max,
1513 './tests/worker-files/thread/testWorker.mjs',
1514 { workerChoiceStrategy }
1515 )
1516 expect(
1517 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1518 ).toStrictEqual({
1519 runTime: {
1520 aggregate: true,
1521 average: true,
1522 median: false
1523 },
1524 waitTime: {
1525 aggregate: false,
1526 average: false,
1527 median: false
1528 },
1529 elu: {
1530 aggregate: false,
1531 average: false,
1532 median: false
1533 }
1534 })
1535 // We need to clean up the resources after our test
1536 await pool.destroy()
1537 })
1538
1539 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1540 const pool = new FixedThreadPool(
1541 max,
1542 './tests/worker-files/thread/testWorker.mjs',
1543 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1544 )
1545 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1546 const promises = new Set()
1547 const maxMultiplier = 2
1548 for (let i = 0; i < max * maxMultiplier; i++) {
1549 promises.add(pool.execute())
1550 }
1551 await Promise.all(promises)
1552 for (const workerNode of pool.workerNodes) {
1553 expect(workerNode.usage).toStrictEqual({
1554 tasks: {
1555 executed: expect.any(Number),
1556 executing: 0,
1557 queued: 0,
1558 maxQueued: 0,
1559 sequentiallyStolen: 0,
1560 stolen: 0,
1561 failed: 0
1562 },
1563 runTime: expect.objectContaining({
1564 history: expect.any(CircularArray)
1565 }),
1566 waitTime: {
1567 history: new CircularArray()
1568 },
1569 elu: {
1570 idle: {
1571 history: new CircularArray()
1572 },
1573 active: {
1574 history: new CircularArray()
1575 }
1576 }
1577 })
1578 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1579 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1580 max * maxMultiplier
1581 )
1582 if (workerNode.usage.runTime.aggregate == null) {
1583 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1584 } else {
1585 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1586 }
1587 if (workerNode.usage.runTime.average == null) {
1588 expect(workerNode.usage.runTime.average).toBeUndefined()
1589 } else {
1590 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1591 }
1592 }
1593 expect(
1594 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1595 pool.workerChoiceStrategyContext.workerChoiceStrategy
1596 ).nextWorkerNodeKey
1597 ).toBe(0)
1598 expect(
1599 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1600 pool.workerChoiceStrategyContext.workerChoiceStrategy
1601 ).previousWorkerNodeKey
1602 ).toEqual(0)
1603 expect(
1604 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1605 pool.workerChoiceStrategyContext.workerChoiceStrategy
1606 ).workerNodeVirtualTaskRunTime
1607 ).toBeGreaterThanOrEqual(0)
1608 // We need to clean up the resources after our test
1609 await pool.destroy()
1610 })
1611
1612 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1613 const pool = new DynamicThreadPool(
1614 min,
1615 max,
1616 './tests/worker-files/thread/testWorker.mjs',
1617 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1618 )
1619 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1620 const promises = new Set()
1621 const maxMultiplier = 2
1622 for (let i = 0; i < max * maxMultiplier; i++) {
1623 promises.add(pool.execute())
1624 }
1625 await Promise.all(promises)
1626 for (const workerNode of pool.workerNodes) {
1627 expect(workerNode.usage).toStrictEqual({
1628 tasks: {
1629 executed: expect.any(Number),
1630 executing: 0,
1631 queued: 0,
1632 maxQueued: 0,
1633 sequentiallyStolen: 0,
1634 stolen: 0,
1635 failed: 0
1636 },
1637 runTime: expect.objectContaining({
1638 history: expect.any(CircularArray)
1639 }),
1640 waitTime: {
1641 history: new CircularArray()
1642 },
1643 elu: {
1644 idle: {
1645 history: new CircularArray()
1646 },
1647 active: {
1648 history: new CircularArray()
1649 }
1650 }
1651 })
1652 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1653 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1654 max * maxMultiplier
1655 )
1656 if (workerNode.usage.runTime.aggregate == null) {
1657 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1658 } else {
1659 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1660 }
1661 if (workerNode.usage.runTime.average == null) {
1662 expect(workerNode.usage.runTime.average).toBeUndefined()
1663 } else {
1664 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1665 }
1666 }
1667 expect(
1668 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1669 pool.workerChoiceStrategyContext.workerChoiceStrategy
1670 ).nextWorkerNodeKey
1671 ).toEqual(0)
1672 expect(
1673 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1674 pool.workerChoiceStrategyContext.workerChoiceStrategy
1675 ).previousWorkerNodeKey
1676 ).toEqual(0)
1677 expect(
1678 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1679 pool.workerChoiceStrategyContext.workerChoiceStrategy
1680 ).workerNodeVirtualTaskRunTime
1681 ).toBeGreaterThanOrEqual(0)
1682 // We need to clean up the resources after our test
1683 await pool.destroy()
1684 })
1685
1686 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1687 const pool = new DynamicThreadPool(
1688 min,
1689 max,
1690 './tests/worker-files/thread/testWorker.mjs',
1691 {
1692 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1693 workerChoiceStrategyOptions: {
1694 runTime: { median: true }
1695 }
1696 }
1697 )
1698 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1699 const promises = new Set()
1700 const maxMultiplier = 2
1701 for (let i = 0; i < max * maxMultiplier; i++) {
1702 promises.add(pool.execute())
1703 }
1704 await Promise.all(promises)
1705 for (const workerNode of pool.workerNodes) {
1706 expect(workerNode.usage).toStrictEqual({
1707 tasks: {
1708 executed: expect.any(Number),
1709 executing: 0,
1710 queued: 0,
1711 maxQueued: 0,
1712 sequentiallyStolen: 0,
1713 stolen: 0,
1714 failed: 0
1715 },
1716 runTime: expect.objectContaining({
1717 history: expect.any(CircularArray)
1718 }),
1719 waitTime: {
1720 history: new CircularArray()
1721 },
1722 elu: {
1723 idle: {
1724 history: new CircularArray()
1725 },
1726 active: {
1727 history: new CircularArray()
1728 }
1729 }
1730 })
1731 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1732 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1733 max * maxMultiplier
1734 )
1735 if (workerNode.usage.runTime.aggregate == null) {
1736 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1737 } else {
1738 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1739 }
1740 if (workerNode.usage.runTime.median == null) {
1741 expect(workerNode.usage.runTime.median).toBeUndefined()
1742 } else {
1743 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1744 }
1745 }
1746 expect(
1747 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1748 pool.workerChoiceStrategyContext.workerChoiceStrategy
1749 ).nextWorkerNodeKey
1750 ).toEqual(0)
1751 expect(
1752 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1753 pool.workerChoiceStrategyContext.workerChoiceStrategy
1754 ).previousWorkerNodeKey
1755 ).toEqual(0)
1756 expect(
1757 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1758 pool.workerChoiceStrategyContext.workerChoiceStrategy
1759 ).workerNodeVirtualTaskRunTime
1760 ).toBeGreaterThanOrEqual(0)
1761 // We need to clean up the resources after our test
1762 await pool.destroy()
1763 })
1764
1765 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1766 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1767 let pool = new FixedThreadPool(
1768 max,
1769 './tests/worker-files/thread/testWorker.mjs'
1770 )
1771 expect(
1772 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1773 workerChoiceStrategy
1774 ).nextWorkerNodeKey
1775 ).toBeDefined()
1776 expect(
1777 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1778 workerChoiceStrategy
1779 ).previousWorkerNodeKey
1780 ).toBeDefined()
1781 expect(
1782 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1783 workerChoiceStrategy
1784 ).workerNodeVirtualTaskRunTime
1785 ).toBeDefined()
1786 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1787 expect(
1788 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1789 pool.workerChoiceStrategyContext.workerChoiceStrategy
1790 ).nextWorkerNodeKey
1791 ).toBe(0)
1792 expect(
1793 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1794 pool.workerChoiceStrategyContext.workerChoiceStrategy
1795 ).previousWorkerNodeKey
1796 ).toBe(0)
1797 expect(
1798 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1799 pool.workerChoiceStrategyContext.workerChoiceStrategy
1800 ).workerNodeVirtualTaskRunTime
1801 ).toBe(0)
1802 await pool.destroy()
1803 pool = new DynamicThreadPool(
1804 min,
1805 max,
1806 './tests/worker-files/thread/testWorker.mjs'
1807 )
1808 expect(
1809 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1810 workerChoiceStrategy
1811 ).nextWorkerNodeKey
1812 ).toBeDefined()
1813 expect(
1814 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1815 workerChoiceStrategy
1816 ).previousWorkerNodeKey
1817 ).toBeDefined()
1818 expect(
1819 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1820 workerChoiceStrategy
1821 ).workerNodeVirtualTaskRunTime
1822 ).toBeDefined()
1823 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1824 expect(
1825 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1826 pool.workerChoiceStrategyContext.workerChoiceStrategy
1827 ).nextWorkerNodeKey
1828 ).toBe(0)
1829 expect(
1830 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategy
1832 ).previousWorkerNodeKey
1833 ).toBe(0)
1834 expect(
1835 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategy
1837 ).workerNodeVirtualTaskRunTime
1838 ).toBe(0)
1839 // We need to clean up the resources after our test
1840 await pool.destroy()
1841 })
1842
1843 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1844 const workerChoiceStrategy =
1845 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1846 let pool = new FixedThreadPool(
1847 max,
1848 './tests/worker-files/thread/testWorker.mjs',
1849 { workerChoiceStrategy }
1850 )
1851 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1852 dynamicWorkerUsage: false,
1853 dynamicWorkerReady: true
1854 })
1855 await pool.destroy()
1856 pool = new DynamicThreadPool(
1857 min,
1858 max,
1859 './tests/worker-files/thread/testWorker.mjs',
1860 { workerChoiceStrategy }
1861 )
1862 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1863 dynamicWorkerUsage: false,
1864 dynamicWorkerReady: true
1865 })
1866 // We need to clean up the resources after our test
1867 await pool.destroy()
1868 })
1869
1870 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1871 const workerChoiceStrategy =
1872 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1873 let pool = new FixedThreadPool(
1874 max,
1875 './tests/worker-files/thread/testWorker.mjs',
1876 { workerChoiceStrategy }
1877 )
1878 expect(
1879 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1880 ).toStrictEqual({
1881 runTime: {
1882 aggregate: true,
1883 average: true,
1884 median: false
1885 },
1886 waitTime: {
1887 aggregate: false,
1888 average: false,
1889 median: false
1890 },
1891 elu: {
1892 aggregate: false,
1893 average: false,
1894 median: false
1895 }
1896 })
1897 await pool.destroy()
1898 pool = new DynamicThreadPool(
1899 min,
1900 max,
1901 './tests/worker-files/thread/testWorker.mjs',
1902 { workerChoiceStrategy }
1903 )
1904 expect(
1905 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1906 ).toStrictEqual({
1907 runTime: {
1908 aggregate: true,
1909 average: true,
1910 median: false
1911 },
1912 waitTime: {
1913 aggregate: false,
1914 average: false,
1915 median: false
1916 },
1917 elu: {
1918 aggregate: false,
1919 average: false,
1920 median: false
1921 }
1922 })
1923 // We need to clean up the resources after our test
1924 await pool.destroy()
1925 })
1926
1927 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1928 const pool = new FixedThreadPool(
1929 max,
1930 './tests/worker-files/thread/testWorker.mjs',
1931 {
1932 workerChoiceStrategy:
1933 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1934 }
1935 )
1936 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1937 const promises = new Set()
1938 const maxMultiplier = 2
1939 for (let i = 0; i < max * maxMultiplier; i++) {
1940 promises.add(pool.execute())
1941 }
1942 await Promise.all(promises)
1943 for (const workerNode of pool.workerNodes) {
1944 expect(workerNode.usage).toStrictEqual({
1945 tasks: {
1946 executed: expect.any(Number),
1947 executing: 0,
1948 queued: 0,
1949 maxQueued: 0,
1950 sequentiallyStolen: 0,
1951 stolen: 0,
1952 failed: 0
1953 },
1954 runTime: expect.objectContaining({
1955 history: expect.any(CircularArray)
1956 }),
1957 waitTime: {
1958 history: new CircularArray()
1959 },
1960 elu: {
1961 idle: {
1962 history: new CircularArray()
1963 },
1964 active: {
1965 history: new CircularArray()
1966 }
1967 }
1968 })
1969 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1970 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1971 max * maxMultiplier
1972 )
1973 }
1974 expect(
1975 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1976 pool.workerChoiceStrategyContext.workerChoiceStrategy
1977 ).roundId
1978 ).toBe(0)
1979 expect(
1980 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1981 pool.workerChoiceStrategyContext.workerChoiceStrategy
1982 ).workerNodeId
1983 ).toBe(0)
1984 expect(
1985 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1986 pool.workerChoiceStrategyContext.workerChoiceStrategy
1987 ).nextWorkerNodeKey
1988 ).toBe(0)
1989 expect(
1990 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1991 pool.workerChoiceStrategyContext.workerChoiceStrategy
1992 ).previousWorkerNodeKey
1993 ).toEqual(0)
1994 expect(
1995 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1996 pool.workerChoiceStrategyContext.workerChoiceStrategy
1997 ).roundWeights.length
1998 ).toBe(1)
1999 expect(
2000 Number.isSafeInteger(
2001 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2002 pool.workerChoiceStrategyContext.workerChoiceStrategy
2003 ).roundWeights[0]
2004 )
2005 ).toBe(true)
2006 // We need to clean up the resources after our test
2007 await pool.destroy()
2008 })
2009
2010 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2011 const pool = new DynamicThreadPool(
2012 min,
2013 max,
2014 './tests/worker-files/thread/testWorker.mjs',
2015 {
2016 workerChoiceStrategy:
2017 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2018 }
2019 )
2020 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2021 const promises = new Set()
2022 const maxMultiplier = 2
2023 for (let i = 0; i < max * maxMultiplier; i++) {
2024 promises.add(pool.execute())
2025 }
2026 await Promise.all(promises)
2027 for (const workerNode of pool.workerNodes) {
2028 expect(workerNode.usage).toStrictEqual({
2029 tasks: {
2030 executed: expect.any(Number),
2031 executing: 0,
2032 queued: 0,
2033 maxQueued: 0,
2034 sequentiallyStolen: 0,
2035 stolen: 0,
2036 failed: 0
2037 },
2038 runTime: expect.objectContaining({
2039 history: expect.any(CircularArray)
2040 }),
2041 waitTime: {
2042 history: new CircularArray()
2043 },
2044 elu: {
2045 idle: {
2046 history: new CircularArray()
2047 },
2048 active: {
2049 history: new CircularArray()
2050 }
2051 }
2052 })
2053 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2054 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2055 max * maxMultiplier
2056 )
2057 }
2058 expect(
2059 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2060 pool.workerChoiceStrategyContext.workerChoiceStrategy
2061 ).roundId
2062 ).toBe(0)
2063 expect(
2064 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2065 pool.workerChoiceStrategyContext.workerChoiceStrategy
2066 ).workerNodeId
2067 ).toBe(0)
2068 expect(
2069 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategyContext.workerChoiceStrategy
2071 ).nextWorkerNodeKey
2072 ).toBe(0)
2073 expect(
2074 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2075 pool.workerChoiceStrategyContext.workerChoiceStrategy
2076 ).previousWorkerNodeKey
2077 ).toEqual(0)
2078 expect(
2079 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2080 pool.workerChoiceStrategyContext.workerChoiceStrategy
2081 ).roundWeights.length
2082 ).toBe(1)
2083 expect(
2084 Number.isSafeInteger(
2085 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2086 pool.workerChoiceStrategyContext.workerChoiceStrategy
2087 ).roundWeights[0]
2088 )
2089 ).toBe(true)
2090 // We need to clean up the resources after our test
2091 await pool.destroy()
2092 })
2093
2094 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2095 const workerChoiceStrategy =
2096 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2097 let pool = new FixedThreadPool(
2098 max,
2099 './tests/worker-files/thread/testWorker.mjs'
2100 )
2101 expect(
2102 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2103 workerChoiceStrategy
2104 ).roundId
2105 ).toBeDefined()
2106 expect(
2107 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2108 workerChoiceStrategy
2109 ).workerNodeId
2110 ).toBeDefined()
2111 expect(
2112 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2113 workerChoiceStrategy
2114 ).nextWorkerNodeKey
2115 ).toBeDefined()
2116 expect(
2117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2118 workerChoiceStrategy
2119 ).previousWorkerNodeKey
2120 ).toBeDefined()
2121 expect(
2122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2123 workerChoiceStrategy
2124 ).roundWeights
2125 ).toBeDefined()
2126 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2127 expect(
2128 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2129 pool.workerChoiceStrategyContext.workerChoiceStrategy
2130 ).roundId
2131 ).toBe(0)
2132 expect(
2133 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2134 pool.workerChoiceStrategyContext.workerChoiceStrategy
2135 ).workerNodeId
2136 ).toBe(0)
2137 expect(
2138 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2139 pool.workerChoiceStrategyContext.workerChoiceStrategy
2140 ).nextWorkerNodeKey
2141 ).toBe(0)
2142 expect(
2143 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2144 pool.workerChoiceStrategyContext.workerChoiceStrategy
2145 ).previousWorkerNodeKey
2146 ).toBe(0)
2147 expect(
2148 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2149 pool.workerChoiceStrategyContext.workerChoiceStrategy
2150 ).roundWeights.length
2151 ).toBe(1)
2152 expect(
2153 Number.isSafeInteger(
2154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2155 pool.workerChoiceStrategyContext.workerChoiceStrategy
2156 ).roundWeights[0]
2157 )
2158 ).toBe(true)
2159 await pool.destroy()
2160 pool = new DynamicThreadPool(
2161 min,
2162 max,
2163 './tests/worker-files/thread/testWorker.mjs'
2164 )
2165 expect(
2166 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2167 workerChoiceStrategy
2168 ).roundId
2169 ).toBeDefined()
2170 expect(
2171 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2172 workerChoiceStrategy
2173 ).workerNodeId
2174 ).toBeDefined()
2175 expect(
2176 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2177 workerChoiceStrategy
2178 ).nextWorkerNodeKey
2179 ).toBeDefined()
2180 expect(
2181 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2182 workerChoiceStrategy
2183 ).previousWorkerNodeKey
2184 ).toBeDefined()
2185 expect(
2186 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2187 workerChoiceStrategy
2188 ).roundWeights
2189 ).toBeDefined()
2190 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2191 expect(
2192 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2193 pool.workerChoiceStrategyContext.workerChoiceStrategy
2194 ).roundId
2195 ).toBe(0)
2196 expect(
2197 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2198 pool.workerChoiceStrategyContext.workerChoiceStrategy
2199 ).workerNodeId
2200 ).toBe(0)
2201 expect(
2202 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2203 pool.workerChoiceStrategyContext.workerChoiceStrategy
2204 ).nextWorkerNodeKey
2205 ).toBe(0)
2206 expect(
2207 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2208 pool.workerChoiceStrategyContext.workerChoiceStrategy
2209 ).previousWorkerNodeKey
2210 ).toBe(0)
2211 expect(
2212 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2213 pool.workerChoiceStrategyContext.workerChoiceStrategy
2214 ).roundWeights.length
2215 ).toBe(1)
2216 expect(
2217 Number.isSafeInteger(
2218 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2219 pool.workerChoiceStrategyContext.workerChoiceStrategy
2220 ).roundWeights[0]
2221 )
2222 ).toBe(true)
2223 // We need to clean up the resources after our test
2224 await pool.destroy()
2225 })
2226
2227 it('Verify unknown strategy throw error', () => {
2228 expect(
2229 () =>
2230 new DynamicThreadPool(
2231 min,
2232 max,
2233 './tests/worker-files/thread/testWorker.mjs',
2234 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2235 )
2236 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2237 })
2238 })