8dc06262d2d35418b1ff050bff843ce03b1efe97
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.cjs'
9 import { CircularArray } from '../../../lib/circular-array.cjs'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 await pool.destroy()
70 }
71 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
72 const pool = new DynamicClusterPool(
73 min,
74 max,
75 './tests/worker-files/cluster/testWorker.cjs'
76 )
77 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
78 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
79 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
80 workerChoiceStrategy
81 )
82 await pool.destroy()
83 }
84 })
85
86 it('Verify available strategies default internals at pool creation', async () => {
87 const pool = new FixedThreadPool(
88 max,
89 './tests/worker-files/thread/testWorker.mjs'
90 )
91 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
92 expect(
93 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
94 workerChoiceStrategy
95 ).nextWorkerNodeKey
96 ).toBe(0)
97 expect(
98 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
99 workerChoiceStrategy
100 ).previousWorkerNodeKey
101 ).toBe(0)
102 if (
103 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
104 ) {
105 expect(
106 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
107 workerChoiceStrategy
108 ).workerNodeVirtualTaskRunTime
109 ).toBe(0)
110 } else if (
111 workerChoiceStrategy ===
112 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
113 ) {
114 expect(
115 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
116 workerChoiceStrategy
117 ).workerNodeVirtualTaskRunTime
118 ).toBe(0)
119 expect(
120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
121 workerChoiceStrategy
122 ).roundId
123 ).toBe(0)
124 expect(
125 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
126 workerChoiceStrategy
127 ).workerNodeId
128 ).toBe(0)
129 expect(
130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
131 workerChoiceStrategy
132 ).roundWeights.length
133 ).toBe(1)
134 expect(
135 Number.isSafeInteger(
136 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
137 workerChoiceStrategy
138 ).roundWeights[0]
139 )
140 ).toBe(true)
141 }
142 }
143 await pool.destroy()
144 })
145
146 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
147 const pool = new DynamicThreadPool(
148 min,
149 max,
150 './tests/worker-files/thread/testWorker.mjs'
151 )
152 expect(pool.starting).toBe(false)
153 expect(pool.workerNodes.length).toBe(min)
154 const maxMultiplier = 10000
155 const promises = new Set()
156 for (let i = 0; i < max * maxMultiplier; i++) {
157 promises.add(pool.execute())
158 }
159 await Promise.all(promises)
160 expect(pool.workerNodes.length).toBe(max)
161 // We need to clean up the resources after our test
162 await pool.destroy()
163 })
164
165 it('Verify ROUND_ROBIN strategy default policy', async () => {
166 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
167 let pool = new FixedThreadPool(
168 max,
169 './tests/worker-files/thread/testWorker.mjs',
170 { workerChoiceStrategy }
171 )
172 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
173 dynamicWorkerUsage: false,
174 dynamicWorkerReady: true
175 })
176 await pool.destroy()
177 pool = new DynamicThreadPool(
178 min,
179 max,
180 './tests/worker-files/thread/testWorker.mjs',
181 { workerChoiceStrategy }
182 )
183 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
184 dynamicWorkerUsage: false,
185 dynamicWorkerReady: true
186 })
187 // We need to clean up the resources after our test
188 await pool.destroy()
189 })
190
191 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
192 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
193 let pool = new FixedThreadPool(
194 max,
195 './tests/worker-files/thread/testWorker.mjs',
196 { workerChoiceStrategy }
197 )
198 expect(
199 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
200 ).toStrictEqual({
201 runTime: {
202 aggregate: false,
203 average: false,
204 median: false
205 },
206 waitTime: {
207 aggregate: false,
208 average: false,
209 median: false
210 },
211 elu: {
212 aggregate: false,
213 average: false,
214 median: false
215 }
216 })
217 await pool.destroy()
218 pool = new DynamicThreadPool(
219 min,
220 max,
221 './tests/worker-files/thread/testWorker.mjs',
222 { workerChoiceStrategy }
223 )
224 expect(
225 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
226 ).toStrictEqual({
227 runTime: {
228 aggregate: false,
229 average: false,
230 median: false
231 },
232 waitTime: {
233 aggregate: false,
234 average: false,
235 median: false
236 },
237 elu: {
238 aggregate: false,
239 average: false,
240 median: false
241 }
242 })
243 // We need to clean up the resources after our test
244 await pool.destroy()
245 })
246
247 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
248 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
249 const pool = new FixedThreadPool(
250 max,
251 './tests/worker-files/thread/testWorker.mjs',
252 { workerChoiceStrategy }
253 )
254 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
255 const promises = new Set()
256 const maxMultiplier = 2
257 for (let i = 0; i < max * maxMultiplier; i++) {
258 promises.add(pool.execute())
259 }
260 await Promise.all(promises)
261 for (const workerNode of pool.workerNodes) {
262 expect(workerNode.usage).toStrictEqual({
263 tasks: {
264 executed: maxMultiplier,
265 executing: 0,
266 queued: 0,
267 maxQueued: 0,
268 sequentiallyStolen: 0,
269 stolen: 0,
270 failed: 0
271 },
272 runTime: {
273 history: new CircularArray()
274 },
275 waitTime: {
276 history: new CircularArray()
277 },
278 elu: {
279 idle: {
280 history: new CircularArray()
281 },
282 active: {
283 history: new CircularArray()
284 }
285 }
286 })
287 }
288 expect(
289 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
290 pool.workerChoiceStrategyContext.workerChoiceStrategy
291 ).nextWorkerNodeKey
292 ).toBe(0)
293 expect(
294 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
295 pool.workerChoiceStrategyContext.workerChoiceStrategy
296 ).previousWorkerNodeKey
297 ).toBe(pool.workerNodes.length - 1)
298 // We need to clean up the resources after our test
299 await pool.destroy()
300 })
301
302 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
303 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
304 const pool = new DynamicThreadPool(
305 min,
306 max,
307 './tests/worker-files/thread/testWorker.mjs',
308 { workerChoiceStrategy }
309 )
310 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
311 const promises = new Set()
312 const maxMultiplier = 2
313 for (let i = 0; i < max * maxMultiplier; i++) {
314 promises.add(pool.execute())
315 }
316 await Promise.all(promises)
317 for (const workerNode of pool.workerNodes) {
318 expect(workerNode.usage).toStrictEqual({
319 tasks: {
320 executed: expect.any(Number),
321 executing: 0,
322 queued: 0,
323 maxQueued: 0,
324 sequentiallyStolen: 0,
325 stolen: 0,
326 failed: 0
327 },
328 runTime: {
329 history: new CircularArray()
330 },
331 waitTime: {
332 history: new CircularArray()
333 },
334 elu: {
335 idle: {
336 history: new CircularArray()
337 },
338 active: {
339 history: new CircularArray()
340 }
341 }
342 })
343 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
344 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
345 max * maxMultiplier
346 )
347 }
348 expect(
349 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
350 pool.workerChoiceStrategyContext.workerChoiceStrategy
351 ).nextWorkerNodeKey
352 ).toBe(0)
353 expect(
354 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
355 pool.workerChoiceStrategyContext.workerChoiceStrategy
356 ).previousWorkerNodeKey
357 ).toBe(pool.workerNodes.length - 1)
358 // We need to clean up the resources after our test
359 await pool.destroy()
360 })
361
362 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
363 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
364 let pool = new FixedClusterPool(
365 max,
366 './tests/worker-files/cluster/testWorker.cjs',
367 { workerChoiceStrategy }
368 )
369 let results = new Set()
370 for (let i = 0; i < max; i++) {
371 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
372 }
373 expect(results.size).toBe(max)
374 await pool.destroy()
375 pool = new FixedThreadPool(
376 max,
377 './tests/worker-files/thread/testWorker.mjs',
378 { workerChoiceStrategy }
379 )
380 results = new Set()
381 for (let i = 0; i < max; i++) {
382 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
383 }
384 expect(results.size).toBe(max)
385 await pool.destroy()
386 })
387
388 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
389 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
390 let pool = new FixedThreadPool(
391 max,
392 './tests/worker-files/thread/testWorker.mjs',
393 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
394 )
395 expect(
396 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
397 pool.workerChoiceStrategyContext.workerChoiceStrategy
398 ).nextWorkerNodeKey
399 ).toBeDefined()
400 expect(
401 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
402 pool.workerChoiceStrategyContext.workerChoiceStrategy
403 ).previousWorkerNodeKey
404 ).toBeDefined()
405 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
406 expect(
407 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
408 pool.workerChoiceStrategyContext.workerChoiceStrategy
409 ).nextWorkerNodeKey
410 ).toBe(0)
411 expect(
412 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
413 pool.workerChoiceStrategyContext.workerChoiceStrategy
414 ).previousWorkerNodeKey
415 ).toBe(0)
416 await pool.destroy()
417 pool = new DynamicThreadPool(
418 min,
419 max,
420 './tests/worker-files/thread/testWorker.mjs',
421 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
422 )
423 expect(
424 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
425 pool.workerChoiceStrategyContext.workerChoiceStrategy
426 ).nextWorkerNodeKey
427 ).toBeDefined()
428 expect(
429 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
430 pool.workerChoiceStrategyContext.workerChoiceStrategy
431 ).previousWorkerNodeKey
432 ).toBeDefined()
433 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
434 expect(
435 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
436 pool.workerChoiceStrategyContext.workerChoiceStrategy
437 ).nextWorkerNodeKey
438 ).toBe(0)
439 expect(
440 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
441 pool.workerChoiceStrategyContext.workerChoiceStrategy
442 ).previousWorkerNodeKey
443 ).toBe(0)
444 // We need to clean up the resources after our test
445 await pool.destroy()
446 })
447
448 it('Verify LEAST_USED strategy default policy', async () => {
449 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
450 let pool = new FixedThreadPool(
451 max,
452 './tests/worker-files/thread/testWorker.mjs',
453 { workerChoiceStrategy }
454 )
455 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
456 dynamicWorkerUsage: false,
457 dynamicWorkerReady: true
458 })
459 await pool.destroy()
460 pool = new DynamicThreadPool(
461 min,
462 max,
463 './tests/worker-files/thread/testWorker.mjs',
464 { workerChoiceStrategy }
465 )
466 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
467 dynamicWorkerUsage: false,
468 dynamicWorkerReady: true
469 })
470 // We need to clean up the resources after our test
471 await pool.destroy()
472 })
473
474 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
475 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
476 let pool = new FixedThreadPool(
477 max,
478 './tests/worker-files/thread/testWorker.mjs',
479 { workerChoiceStrategy }
480 )
481 expect(
482 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
483 ).toStrictEqual({
484 runTime: {
485 aggregate: false,
486 average: false,
487 median: false
488 },
489 waitTime: {
490 aggregate: false,
491 average: false,
492 median: false
493 },
494 elu: {
495 aggregate: false,
496 average: false,
497 median: false
498 }
499 })
500 await pool.destroy()
501 pool = new DynamicThreadPool(
502 min,
503 max,
504 './tests/worker-files/thread/testWorker.mjs',
505 { workerChoiceStrategy }
506 )
507 expect(
508 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
509 ).toStrictEqual({
510 runTime: {
511 aggregate: false,
512 average: false,
513 median: false
514 },
515 waitTime: {
516 aggregate: false,
517 average: false,
518 median: false
519 },
520 elu: {
521 aggregate: false,
522 average: false,
523 median: false
524 }
525 })
526 // We need to clean up the resources after our test
527 await pool.destroy()
528 })
529
530 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
531 const pool = new FixedThreadPool(
532 max,
533 './tests/worker-files/thread/testWorker.mjs',
534 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
535 )
536 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
537 const promises = new Set()
538 const maxMultiplier = 2
539 for (let i = 0; i < max * maxMultiplier; i++) {
540 promises.add(pool.execute())
541 }
542 await Promise.all(promises)
543 for (const workerNode of pool.workerNodes) {
544 expect(workerNode.usage).toStrictEqual({
545 tasks: {
546 executed: expect.any(Number),
547 executing: 0,
548 queued: 0,
549 maxQueued: 0,
550 sequentiallyStolen: 0,
551 stolen: 0,
552 failed: 0
553 },
554 runTime: {
555 history: new CircularArray()
556 },
557 waitTime: {
558 history: new CircularArray()
559 },
560 elu: {
561 idle: {
562 history: new CircularArray()
563 },
564 active: {
565 history: new CircularArray()
566 }
567 }
568 })
569 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
570 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
571 max * maxMultiplier
572 )
573 }
574 expect(
575 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
576 pool.workerChoiceStrategyContext.workerChoiceStrategy
577 ).nextWorkerNodeKey
578 ).toEqual(expect.any(Number))
579 expect(
580 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
581 pool.workerChoiceStrategyContext.workerChoiceStrategy
582 ).previousWorkerNodeKey
583 ).toEqual(expect.any(Number))
584 // We need to clean up the resources after our test
585 await pool.destroy()
586 })
587
588 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
589 const pool = new DynamicThreadPool(
590 min,
591 max,
592 './tests/worker-files/thread/testWorker.mjs',
593 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
594 )
595 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
596 const promises = new Set()
597 const maxMultiplier = 2
598 for (let i = 0; i < max * maxMultiplier; i++) {
599 promises.add(pool.execute())
600 }
601 await Promise.all(promises)
602 for (const workerNode of pool.workerNodes) {
603 expect(workerNode.usage).toStrictEqual({
604 tasks: {
605 executed: expect.any(Number),
606 executing: 0,
607 queued: 0,
608 maxQueued: 0,
609 sequentiallyStolen: 0,
610 stolen: 0,
611 failed: 0
612 },
613 runTime: {
614 history: new CircularArray()
615 },
616 waitTime: {
617 history: new CircularArray()
618 },
619 elu: {
620 idle: {
621 history: new CircularArray()
622 },
623 active: {
624 history: new CircularArray()
625 }
626 }
627 })
628 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
629 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
630 max * maxMultiplier
631 )
632 }
633 expect(
634 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
635 pool.workerChoiceStrategyContext.workerChoiceStrategy
636 ).nextWorkerNodeKey
637 ).toEqual(expect.any(Number))
638 expect(
639 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
640 pool.workerChoiceStrategyContext.workerChoiceStrategy
641 ).previousWorkerNodeKey
642 ).toEqual(expect.any(Number))
643 // We need to clean up the resources after our test
644 await pool.destroy()
645 })
646
647 it('Verify LEAST_BUSY strategy default policy', async () => {
648 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
649 let pool = new FixedThreadPool(
650 max,
651 './tests/worker-files/thread/testWorker.mjs',
652 { workerChoiceStrategy }
653 )
654 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
655 dynamicWorkerUsage: false,
656 dynamicWorkerReady: true
657 })
658 await pool.destroy()
659 pool = new DynamicThreadPool(
660 min,
661 max,
662 './tests/worker-files/thread/testWorker.mjs',
663 { workerChoiceStrategy }
664 )
665 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
666 dynamicWorkerUsage: false,
667 dynamicWorkerReady: true
668 })
669 // We need to clean up the resources after our test
670 await pool.destroy()
671 })
672
673 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
674 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
675 let pool = new FixedThreadPool(
676 max,
677 './tests/worker-files/thread/testWorker.mjs',
678 { workerChoiceStrategy }
679 )
680 expect(
681 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
682 ).toStrictEqual({
683 runTime: {
684 aggregate: true,
685 average: false,
686 median: false
687 },
688 waitTime: {
689 aggregate: true,
690 average: false,
691 median: false
692 },
693 elu: {
694 aggregate: false,
695 average: false,
696 median: false
697 }
698 })
699 await pool.destroy()
700 pool = new DynamicThreadPool(
701 min,
702 max,
703 './tests/worker-files/thread/testWorker.mjs',
704 { workerChoiceStrategy }
705 )
706 expect(
707 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
708 ).toStrictEqual({
709 runTime: {
710 aggregate: true,
711 average: false,
712 median: false
713 },
714 waitTime: {
715 aggregate: true,
716 average: false,
717 median: false
718 },
719 elu: {
720 aggregate: false,
721 average: false,
722 median: false
723 }
724 })
725 // We need to clean up the resources after our test
726 await pool.destroy()
727 })
728
729 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
730 const pool = new FixedThreadPool(
731 max,
732 './tests/worker-files/thread/testWorker.mjs',
733 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
734 )
735 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
736 const promises = new Set()
737 const maxMultiplier = 2
738 for (let i = 0; i < max * maxMultiplier; i++) {
739 promises.add(pool.execute())
740 }
741 await Promise.all(promises)
742 for (const workerNode of pool.workerNodes) {
743 expect(workerNode.usage).toStrictEqual({
744 tasks: {
745 executed: expect.any(Number),
746 executing: 0,
747 queued: 0,
748 maxQueued: 0,
749 sequentiallyStolen: 0,
750 stolen: 0,
751 failed: 0
752 },
753 runTime: expect.objectContaining({
754 history: expect.any(CircularArray)
755 }),
756 waitTime: expect.objectContaining({
757 history: expect.any(CircularArray)
758 }),
759 elu: {
760 idle: {
761 history: new CircularArray()
762 },
763 active: {
764 history: new CircularArray()
765 }
766 }
767 })
768 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
769 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
770 max * maxMultiplier
771 )
772 if (workerNode.usage.runTime.aggregate == null) {
773 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
774 } else {
775 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
776 }
777 if (workerNode.usage.waitTime.aggregate == null) {
778 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
779 } else {
780 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
781 }
782 }
783 expect(
784 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
785 pool.workerChoiceStrategyContext.workerChoiceStrategy
786 ).nextWorkerNodeKey
787 ).toEqual(expect.any(Number))
788 expect(
789 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
790 pool.workerChoiceStrategyContext.workerChoiceStrategy
791 ).previousWorkerNodeKey
792 ).toEqual(expect.any(Number))
793 // We need to clean up the resources after our test
794 await pool.destroy()
795 })
796
797 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
798 const pool = new DynamicThreadPool(
799 min,
800 max,
801 './tests/worker-files/thread/testWorker.mjs',
802 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
803 )
804 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
805 const promises = new Set()
806 const maxMultiplier = 2
807 for (let i = 0; i < max * maxMultiplier; i++) {
808 promises.add(pool.execute())
809 }
810 await Promise.all(promises)
811 for (const workerNode of pool.workerNodes) {
812 expect(workerNode.usage).toStrictEqual({
813 tasks: {
814 executed: expect.any(Number),
815 executing: 0,
816 queued: 0,
817 maxQueued: 0,
818 sequentiallyStolen: 0,
819 stolen: 0,
820 failed: 0
821 },
822 runTime: expect.objectContaining({
823 history: expect.any(CircularArray)
824 }),
825 waitTime: expect.objectContaining({
826 history: expect.any(CircularArray)
827 }),
828 elu: {
829 idle: {
830 history: new CircularArray()
831 },
832 active: {
833 history: new CircularArray()
834 }
835 }
836 })
837 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
838 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
839 max * maxMultiplier
840 )
841 if (workerNode.usage.runTime.aggregate == null) {
842 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
843 } else {
844 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
845 }
846 if (workerNode.usage.waitTime.aggregate == null) {
847 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
848 } else {
849 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
850 }
851 }
852 expect(
853 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
854 pool.workerChoiceStrategyContext.workerChoiceStrategy
855 ).nextWorkerNodeKey
856 ).toEqual(expect.any(Number))
857 expect(
858 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
859 pool.workerChoiceStrategyContext.workerChoiceStrategy
860 ).previousWorkerNodeKey
861 ).toEqual(expect.any(Number))
862 // We need to clean up the resources after our test
863 await pool.destroy()
864 })
865
866 it('Verify LEAST_ELU strategy default policy', async () => {
867 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
868 let pool = new FixedThreadPool(
869 max,
870 './tests/worker-files/thread/testWorker.mjs',
871 { workerChoiceStrategy }
872 )
873 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
874 dynamicWorkerUsage: false,
875 dynamicWorkerReady: true
876 })
877 await pool.destroy()
878 pool = new DynamicThreadPool(
879 min,
880 max,
881 './tests/worker-files/thread/testWorker.mjs',
882 { workerChoiceStrategy }
883 )
884 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
885 dynamicWorkerUsage: false,
886 dynamicWorkerReady: true
887 })
888 // We need to clean up the resources after our test
889 await pool.destroy()
890 })
891
892 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
893 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
894 let pool = new FixedThreadPool(
895 max,
896 './tests/worker-files/thread/testWorker.mjs',
897 { workerChoiceStrategy }
898 )
899 expect(
900 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
901 ).toStrictEqual({
902 runTime: {
903 aggregate: false,
904 average: false,
905 median: false
906 },
907 waitTime: {
908 aggregate: false,
909 average: false,
910 median: false
911 },
912 elu: {
913 aggregate: true,
914 average: false,
915 median: false
916 }
917 })
918 await pool.destroy()
919 pool = new DynamicThreadPool(
920 min,
921 max,
922 './tests/worker-files/thread/testWorker.mjs',
923 { workerChoiceStrategy }
924 )
925 expect(
926 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
927 ).toStrictEqual({
928 runTime: {
929 aggregate: false,
930 average: false,
931 median: false
932 },
933 waitTime: {
934 aggregate: false,
935 average: false,
936 median: false
937 },
938 elu: {
939 aggregate: true,
940 average: false,
941 median: false
942 }
943 })
944 // We need to clean up the resources after our test
945 await pool.destroy()
946 })
947
948 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
949 const pool = new FixedThreadPool(
950 max,
951 './tests/worker-files/thread/testWorker.mjs',
952 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
953 )
954 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
955 const promises = new Set()
956 const maxMultiplier = 2
957 for (let i = 0; i < max * maxMultiplier; i++) {
958 promises.add(pool.execute())
959 }
960 await Promise.all(promises)
961 for (const workerNode of pool.workerNodes) {
962 expect(workerNode.usage).toStrictEqual({
963 tasks: {
964 executed: expect.any(Number),
965 executing: 0,
966 queued: 0,
967 maxQueued: 0,
968 sequentiallyStolen: 0,
969 stolen: 0,
970 failed: 0
971 },
972 runTime: {
973 history: new CircularArray()
974 },
975 waitTime: {
976 history: new CircularArray()
977 },
978 elu: expect.objectContaining({
979 idle: expect.objectContaining({
980 history: expect.any(CircularArray)
981 }),
982 active: expect.objectContaining({
983 history: expect.any(CircularArray)
984 })
985 })
986 })
987 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
988 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
989 max * maxMultiplier
990 )
991 if (workerNode.usage.elu.active.aggregate == null) {
992 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
993 } else {
994 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
995 }
996 if (workerNode.usage.elu.idle.aggregate == null) {
997 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
998 } else {
999 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1000 }
1001 if (workerNode.usage.elu.utilization == null) {
1002 expect(workerNode.usage.elu.utilization).toBeUndefined()
1003 } else {
1004 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1005 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1006 }
1007 }
1008 expect(
1009 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1010 pool.workerChoiceStrategyContext.workerChoiceStrategy
1011 ).nextWorkerNodeKey
1012 ).toEqual(expect.any(Number))
1013 expect(
1014 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1015 pool.workerChoiceStrategyContext.workerChoiceStrategy
1016 ).previousWorkerNodeKey
1017 ).toEqual(expect.any(Number))
1018 // We need to clean up the resources after our test
1019 await pool.destroy()
1020 })
1021
1022 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1023 const pool = new DynamicThreadPool(
1024 min,
1025 max,
1026 './tests/worker-files/thread/testWorker.mjs',
1027 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1028 )
1029 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1030 const promises = new Set()
1031 const maxMultiplier = 2
1032 for (let i = 0; i < max * maxMultiplier; i++) {
1033 promises.add(pool.execute())
1034 }
1035 await Promise.all(promises)
1036 for (const workerNode of pool.workerNodes) {
1037 expect(workerNode.usage).toStrictEqual({
1038 tasks: {
1039 executed: expect.any(Number),
1040 executing: 0,
1041 queued: 0,
1042 maxQueued: 0,
1043 sequentiallyStolen: 0,
1044 stolen: 0,
1045 failed: 0
1046 },
1047 runTime: {
1048 history: new CircularArray()
1049 },
1050 waitTime: {
1051 history: new CircularArray()
1052 },
1053 elu: expect.objectContaining({
1054 idle: expect.objectContaining({
1055 history: expect.any(CircularArray)
1056 }),
1057 active: expect.objectContaining({
1058 history: expect.any(CircularArray)
1059 })
1060 })
1061 })
1062 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1063 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1064 max * maxMultiplier
1065 )
1066 if (workerNode.usage.elu.active.aggregate == null) {
1067 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1068 } else {
1069 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1070 }
1071 if (workerNode.usage.elu.idle.aggregate == null) {
1072 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1073 } else {
1074 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1075 }
1076 if (workerNode.usage.elu.utilization == null) {
1077 expect(workerNode.usage.elu.utilization).toBeUndefined()
1078 } else {
1079 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1080 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1081 }
1082 }
1083 expect(
1084 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1085 pool.workerChoiceStrategyContext.workerChoiceStrategy
1086 ).nextWorkerNodeKey
1087 ).toEqual(expect.any(Number))
1088 expect(
1089 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1090 pool.workerChoiceStrategyContext.workerChoiceStrategy
1091 ).previousWorkerNodeKey
1092 ).toEqual(expect.any(Number))
1093 // We need to clean up the resources after our test
1094 await pool.destroy()
1095 })
1096
1097 it('Verify FAIR_SHARE strategy default policy', async () => {
1098 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1099 let pool = new FixedThreadPool(
1100 max,
1101 './tests/worker-files/thread/testWorker.mjs',
1102 { workerChoiceStrategy }
1103 )
1104 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1105 dynamicWorkerUsage: false,
1106 dynamicWorkerReady: true
1107 })
1108 await pool.destroy()
1109 pool = new DynamicThreadPool(
1110 min,
1111 max,
1112 './tests/worker-files/thread/testWorker.mjs',
1113 { workerChoiceStrategy }
1114 )
1115 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1116 dynamicWorkerUsage: false,
1117 dynamicWorkerReady: true
1118 })
1119 // We need to clean up the resources after our test
1120 await pool.destroy()
1121 })
1122
1123 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1124 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1125 let pool = new FixedThreadPool(
1126 max,
1127 './tests/worker-files/thread/testWorker.mjs',
1128 { workerChoiceStrategy }
1129 )
1130 expect(
1131 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1132 ).toStrictEqual({
1133 runTime: {
1134 aggregate: true,
1135 average: true,
1136 median: false
1137 },
1138 waitTime: {
1139 aggregate: false,
1140 average: false,
1141 median: false
1142 },
1143 elu: {
1144 aggregate: true,
1145 average: true,
1146 median: false
1147 }
1148 })
1149 await pool.destroy()
1150 pool = new DynamicThreadPool(
1151 min,
1152 max,
1153 './tests/worker-files/thread/testWorker.mjs',
1154 { workerChoiceStrategy }
1155 )
1156 expect(
1157 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1158 ).toStrictEqual({
1159 runTime: {
1160 aggregate: true,
1161 average: true,
1162 median: false
1163 },
1164 waitTime: {
1165 aggregate: false,
1166 average: false,
1167 median: false
1168 },
1169 elu: {
1170 aggregate: true,
1171 average: true,
1172 median: false
1173 }
1174 })
1175 // We need to clean up the resources after our test
1176 await pool.destroy()
1177 })
1178
1179 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1180 const pool = new FixedThreadPool(
1181 max,
1182 './tests/worker-files/thread/testWorker.mjs',
1183 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1184 )
1185 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1186 const promises = new Set()
1187 const maxMultiplier = 2
1188 for (let i = 0; i < max * maxMultiplier; i++) {
1189 promises.add(pool.execute())
1190 }
1191 await Promise.all(promises)
1192 for (const workerNode of pool.workerNodes) {
1193 expect(workerNode.usage).toStrictEqual({
1194 tasks: {
1195 executed: expect.any(Number),
1196 executing: 0,
1197 queued: 0,
1198 maxQueued: 0,
1199 sequentiallyStolen: 0,
1200 stolen: 0,
1201 failed: 0
1202 },
1203 runTime: expect.objectContaining({
1204 history: expect.any(CircularArray)
1205 }),
1206 waitTime: {
1207 history: new CircularArray()
1208 },
1209 elu: expect.objectContaining({
1210 idle: expect.objectContaining({
1211 history: expect.any(CircularArray)
1212 }),
1213 active: expect.objectContaining({
1214 history: expect.any(CircularArray)
1215 })
1216 })
1217 })
1218 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1219 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1220 max * maxMultiplier
1221 )
1222 if (workerNode.usage.runTime.aggregate == null) {
1223 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1224 } else {
1225 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1226 }
1227 if (workerNode.usage.runTime.average == null) {
1228 expect(workerNode.usage.runTime.average).toBeUndefined()
1229 } else {
1230 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1231 }
1232 if (workerNode.usage.elu.active.aggregate == null) {
1233 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1234 } else {
1235 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1236 }
1237 if (workerNode.usage.elu.idle.aggregate == null) {
1238 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1239 } else {
1240 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1241 }
1242 if (workerNode.usage.elu.utilization == null) {
1243 expect(workerNode.usage.elu.utilization).toBeUndefined()
1244 } else {
1245 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1246 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1247 }
1248 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1249 }
1250 expect(
1251 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1252 pool.workerChoiceStrategyContext.workerChoiceStrategy
1253 ).nextWorkerNodeKey
1254 ).toEqual(expect.any(Number))
1255 expect(
1256 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1257 pool.workerChoiceStrategyContext.workerChoiceStrategy
1258 ).previousWorkerNodeKey
1259 ).toEqual(expect.any(Number))
1260 // We need to clean up the resources after our test
1261 await pool.destroy()
1262 })
1263
1264 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1265 const pool = new DynamicThreadPool(
1266 min,
1267 max,
1268 './tests/worker-files/thread/testWorker.mjs',
1269 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1270 )
1271 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1272 const promises = new Set()
1273 const maxMultiplier = 2
1274 for (let i = 0; i < max * maxMultiplier; i++) {
1275 promises.add(pool.execute())
1276 }
1277 await Promise.all(promises)
1278 for (const workerNode of pool.workerNodes) {
1279 expect(workerNode.usage).toStrictEqual({
1280 tasks: {
1281 executed: expect.any(Number),
1282 executing: 0,
1283 queued: 0,
1284 maxQueued: 0,
1285 sequentiallyStolen: 0,
1286 stolen: 0,
1287 failed: 0
1288 },
1289 runTime: expect.objectContaining({
1290 history: expect.any(CircularArray)
1291 }),
1292 waitTime: {
1293 history: new CircularArray()
1294 },
1295 elu: expect.objectContaining({
1296 idle: expect.objectContaining({
1297 history: expect.any(CircularArray)
1298 }),
1299 active: expect.objectContaining({
1300 history: expect.any(CircularArray)
1301 })
1302 })
1303 })
1304 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1305 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1306 max * maxMultiplier
1307 )
1308 if (workerNode.usage.runTime.aggregate == null) {
1309 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1310 } else {
1311 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1312 }
1313 if (workerNode.usage.runTime.average == null) {
1314 expect(workerNode.usage.runTime.average).toBeUndefined()
1315 } else {
1316 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1317 }
1318 if (workerNode.usage.elu.active.aggregate == null) {
1319 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1320 } else {
1321 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1322 }
1323 if (workerNode.usage.elu.idle.aggregate == null) {
1324 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1325 } else {
1326 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1327 }
1328 if (workerNode.usage.elu.utilization == null) {
1329 expect(workerNode.usage.elu.utilization).toBeUndefined()
1330 } else {
1331 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1332 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1333 }
1334 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1335 }
1336 expect(
1337 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1338 pool.workerChoiceStrategyContext.workerChoiceStrategy
1339 ).nextWorkerNodeKey
1340 ).toEqual(expect.any(Number))
1341 expect(
1342 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1343 pool.workerChoiceStrategyContext.workerChoiceStrategy
1344 ).previousWorkerNodeKey
1345 ).toEqual(expect.any(Number))
1346 // We need to clean up the resources after our test
1347 await pool.destroy()
1348 })
1349
1350 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1351 const pool = new DynamicThreadPool(
1352 min,
1353 max,
1354 './tests/worker-files/thread/testWorker.mjs',
1355 {
1356 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1357 workerChoiceStrategyOptions: {
1358 runTime: { median: true }
1359 }
1360 }
1361 )
1362 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1363 const promises = new Set()
1364 const maxMultiplier = 2
1365 for (let i = 0; i < max * maxMultiplier; i++) {
1366 promises.add(pool.execute())
1367 }
1368 await Promise.all(promises)
1369 for (const workerNode of pool.workerNodes) {
1370 expect(workerNode.usage).toStrictEqual({
1371 tasks: {
1372 executed: expect.any(Number),
1373 executing: 0,
1374 queued: 0,
1375 maxQueued: 0,
1376 sequentiallyStolen: 0,
1377 stolen: 0,
1378 failed: 0
1379 },
1380 runTime: expect.objectContaining({
1381 history: expect.any(CircularArray)
1382 }),
1383 waitTime: {
1384 history: new CircularArray()
1385 },
1386 elu: expect.objectContaining({
1387 idle: expect.objectContaining({
1388 history: expect.any(CircularArray)
1389 }),
1390 active: expect.objectContaining({
1391 history: expect.any(CircularArray)
1392 })
1393 })
1394 })
1395 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1396 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1397 max * maxMultiplier
1398 )
1399 if (workerNode.usage.runTime.aggregate == null) {
1400 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1401 } else {
1402 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1403 }
1404 if (workerNode.usage.runTime.median == null) {
1405 expect(workerNode.usage.runTime.median).toBeUndefined()
1406 } else {
1407 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1408 }
1409 if (workerNode.usage.elu.active.aggregate == null) {
1410 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1411 } else {
1412 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1413 }
1414 if (workerNode.usage.elu.idle.aggregate == null) {
1415 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1416 } else {
1417 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1418 }
1419 if (workerNode.usage.elu.utilization == null) {
1420 expect(workerNode.usage.elu.utilization).toBeUndefined()
1421 } else {
1422 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1423 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1424 }
1425 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1426 }
1427 expect(
1428 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1429 pool.workerChoiceStrategyContext.workerChoiceStrategy
1430 ).nextWorkerNodeKey
1431 ).toEqual(expect.any(Number))
1432 expect(
1433 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1434 pool.workerChoiceStrategyContext.workerChoiceStrategy
1435 ).previousWorkerNodeKey
1436 ).toEqual(expect.any(Number))
1437 // We need to clean up the resources after our test
1438 await pool.destroy()
1439 })
1440
1441 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1442 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1443 let pool = new FixedThreadPool(
1444 max,
1445 './tests/worker-files/thread/testWorker.mjs'
1446 )
1447 for (const workerNode of pool.workerNodes) {
1448 workerNode.strategyData = {
1449 virtualTaskEndTimestamp: performance.now()
1450 }
1451 }
1452 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1453 for (const workerNode of pool.workerNodes) {
1454 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1455 }
1456 await pool.destroy()
1457 pool = new DynamicThreadPool(
1458 min,
1459 max,
1460 './tests/worker-files/thread/testWorker.mjs'
1461 )
1462 for (const workerNode of pool.workerNodes) {
1463 workerNode.strategyData = {
1464 virtualTaskEndTimestamp: performance.now()
1465 }
1466 }
1467 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1468 for (const workerNode of pool.workerNodes) {
1469 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1470 }
1471 // We need to clean up the resources after our test
1472 await pool.destroy()
1473 })
1474
1475 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1476 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1477 let pool = new FixedThreadPool(
1478 max,
1479 './tests/worker-files/thread/testWorker.mjs',
1480 { workerChoiceStrategy }
1481 )
1482 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1483 dynamicWorkerUsage: false,
1484 dynamicWorkerReady: true
1485 })
1486 await pool.destroy()
1487 pool = new DynamicThreadPool(
1488 min,
1489 max,
1490 './tests/worker-files/thread/testWorker.mjs',
1491 { workerChoiceStrategy }
1492 )
1493 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1494 dynamicWorkerUsage: false,
1495 dynamicWorkerReady: true
1496 })
1497 // We need to clean up the resources after our test
1498 await pool.destroy()
1499 })
1500
1501 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1502 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1503 let pool = new FixedThreadPool(
1504 max,
1505 './tests/worker-files/thread/testWorker.mjs',
1506 { workerChoiceStrategy }
1507 )
1508 expect(
1509 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1510 ).toStrictEqual({
1511 runTime: {
1512 aggregate: true,
1513 average: true,
1514 median: false
1515 },
1516 waitTime: {
1517 aggregate: false,
1518 average: false,
1519 median: false
1520 },
1521 elu: {
1522 aggregate: false,
1523 average: false,
1524 median: false
1525 }
1526 })
1527 await pool.destroy()
1528 pool = new DynamicThreadPool(
1529 min,
1530 max,
1531 './tests/worker-files/thread/testWorker.mjs',
1532 { workerChoiceStrategy }
1533 )
1534 expect(
1535 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1536 ).toStrictEqual({
1537 runTime: {
1538 aggregate: true,
1539 average: true,
1540 median: false
1541 },
1542 waitTime: {
1543 aggregate: false,
1544 average: false,
1545 median: false
1546 },
1547 elu: {
1548 aggregate: false,
1549 average: false,
1550 median: false
1551 }
1552 })
1553 // We need to clean up the resources after our test
1554 await pool.destroy()
1555 })
1556
1557 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1558 const pool = new FixedThreadPool(
1559 max,
1560 './tests/worker-files/thread/testWorker.mjs',
1561 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1562 )
1563 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1564 const promises = new Set()
1565 const maxMultiplier = 2
1566 for (let i = 0; i < max * maxMultiplier; i++) {
1567 promises.add(pool.execute())
1568 }
1569 await Promise.all(promises)
1570 for (const workerNode of pool.workerNodes) {
1571 expect(workerNode.usage).toStrictEqual({
1572 tasks: {
1573 executed: expect.any(Number),
1574 executing: 0,
1575 queued: 0,
1576 maxQueued: 0,
1577 sequentiallyStolen: 0,
1578 stolen: 0,
1579 failed: 0
1580 },
1581 runTime: expect.objectContaining({
1582 history: expect.any(CircularArray)
1583 }),
1584 waitTime: {
1585 history: new CircularArray()
1586 },
1587 elu: {
1588 idle: {
1589 history: new CircularArray()
1590 },
1591 active: {
1592 history: new CircularArray()
1593 }
1594 }
1595 })
1596 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1597 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1598 max * maxMultiplier
1599 )
1600 if (workerNode.usage.runTime.aggregate == null) {
1601 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1602 } else {
1603 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1604 }
1605 if (workerNode.usage.runTime.average == null) {
1606 expect(workerNode.usage.runTime.average).toBeUndefined()
1607 } else {
1608 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1609 }
1610 }
1611 expect(
1612 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1613 pool.workerChoiceStrategyContext.workerChoiceStrategy
1614 ).nextWorkerNodeKey
1615 ).toBe(0)
1616 expect(
1617 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1618 pool.workerChoiceStrategyContext.workerChoiceStrategy
1619 ).previousWorkerNodeKey
1620 ).toEqual(0)
1621 expect(
1622 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1623 pool.workerChoiceStrategyContext.workerChoiceStrategy
1624 ).workerNodeVirtualTaskRunTime
1625 ).toBeGreaterThanOrEqual(0)
1626 // We need to clean up the resources after our test
1627 await pool.destroy()
1628 })
1629
1630 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1631 const pool = new DynamicThreadPool(
1632 min,
1633 max,
1634 './tests/worker-files/thread/testWorker.mjs',
1635 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1636 )
1637 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1638 const promises = new Set()
1639 const maxMultiplier = 2
1640 for (let i = 0; i < max * maxMultiplier; i++) {
1641 promises.add(pool.execute())
1642 }
1643 await Promise.all(promises)
1644 for (const workerNode of pool.workerNodes) {
1645 expect(workerNode.usage).toStrictEqual({
1646 tasks: {
1647 executed: expect.any(Number),
1648 executing: 0,
1649 queued: 0,
1650 maxQueued: 0,
1651 sequentiallyStolen: 0,
1652 stolen: 0,
1653 failed: 0
1654 },
1655 runTime: expect.objectContaining({
1656 history: expect.any(CircularArray)
1657 }),
1658 waitTime: {
1659 history: new CircularArray()
1660 },
1661 elu: {
1662 idle: {
1663 history: new CircularArray()
1664 },
1665 active: {
1666 history: new CircularArray()
1667 }
1668 }
1669 })
1670 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1671 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1672 max * maxMultiplier
1673 )
1674 if (workerNode.usage.runTime.aggregate == null) {
1675 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1676 } else {
1677 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1678 }
1679 if (workerNode.usage.runTime.average == null) {
1680 expect(workerNode.usage.runTime.average).toBeUndefined()
1681 } else {
1682 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1683 }
1684 }
1685 expect(
1686 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1687 pool.workerChoiceStrategyContext.workerChoiceStrategy
1688 ).nextWorkerNodeKey
1689 ).toEqual(0)
1690 expect(
1691 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1692 pool.workerChoiceStrategyContext.workerChoiceStrategy
1693 ).previousWorkerNodeKey
1694 ).toEqual(0)
1695 expect(
1696 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1697 pool.workerChoiceStrategyContext.workerChoiceStrategy
1698 ).workerNodeVirtualTaskRunTime
1699 ).toBeGreaterThanOrEqual(0)
1700 // We need to clean up the resources after our test
1701 await pool.destroy()
1702 })
1703
1704 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1705 const pool = new DynamicThreadPool(
1706 min,
1707 max,
1708 './tests/worker-files/thread/testWorker.mjs',
1709 {
1710 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1711 workerChoiceStrategyOptions: {
1712 runTime: { median: true }
1713 }
1714 }
1715 )
1716 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1717 const promises = new Set()
1718 const maxMultiplier = 2
1719 for (let i = 0; i < max * maxMultiplier; i++) {
1720 promises.add(pool.execute())
1721 }
1722 await Promise.all(promises)
1723 for (const workerNode of pool.workerNodes) {
1724 expect(workerNode.usage).toStrictEqual({
1725 tasks: {
1726 executed: expect.any(Number),
1727 executing: 0,
1728 queued: 0,
1729 maxQueued: 0,
1730 sequentiallyStolen: 0,
1731 stolen: 0,
1732 failed: 0
1733 },
1734 runTime: expect.objectContaining({
1735 history: expect.any(CircularArray)
1736 }),
1737 waitTime: {
1738 history: new CircularArray()
1739 },
1740 elu: {
1741 idle: {
1742 history: new CircularArray()
1743 },
1744 active: {
1745 history: new CircularArray()
1746 }
1747 }
1748 })
1749 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1750 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1751 max * maxMultiplier
1752 )
1753 if (workerNode.usage.runTime.aggregate == null) {
1754 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1755 } else {
1756 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1757 }
1758 if (workerNode.usage.runTime.median == null) {
1759 expect(workerNode.usage.runTime.median).toBeUndefined()
1760 } else {
1761 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1762 }
1763 }
1764 expect(
1765 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1766 pool.workerChoiceStrategyContext.workerChoiceStrategy
1767 ).nextWorkerNodeKey
1768 ).toEqual(0)
1769 expect(
1770 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1771 pool.workerChoiceStrategyContext.workerChoiceStrategy
1772 ).previousWorkerNodeKey
1773 ).toEqual(0)
1774 expect(
1775 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1776 pool.workerChoiceStrategyContext.workerChoiceStrategy
1777 ).workerNodeVirtualTaskRunTime
1778 ).toBeGreaterThanOrEqual(0)
1779 // We need to clean up the resources after our test
1780 await pool.destroy()
1781 })
1782
1783 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1784 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1785 let pool = new FixedThreadPool(
1786 max,
1787 './tests/worker-files/thread/testWorker.mjs'
1788 )
1789 expect(
1790 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1791 workerChoiceStrategy
1792 ).nextWorkerNodeKey
1793 ).toBeDefined()
1794 expect(
1795 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1796 workerChoiceStrategy
1797 ).previousWorkerNodeKey
1798 ).toBeDefined()
1799 expect(
1800 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1801 workerChoiceStrategy
1802 ).workerNodeVirtualTaskRunTime
1803 ).toBeDefined()
1804 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1805 expect(
1806 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1807 pool.workerChoiceStrategyContext.workerChoiceStrategy
1808 ).nextWorkerNodeKey
1809 ).toBe(0)
1810 expect(
1811 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1812 pool.workerChoiceStrategyContext.workerChoiceStrategy
1813 ).previousWorkerNodeKey
1814 ).toBe(0)
1815 expect(
1816 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1817 pool.workerChoiceStrategyContext.workerChoiceStrategy
1818 ).workerNodeVirtualTaskRunTime
1819 ).toBe(0)
1820 await pool.destroy()
1821 pool = new DynamicThreadPool(
1822 min,
1823 max,
1824 './tests/worker-files/thread/testWorker.mjs'
1825 )
1826 expect(
1827 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1828 workerChoiceStrategy
1829 ).nextWorkerNodeKey
1830 ).toBeDefined()
1831 expect(
1832 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1833 workerChoiceStrategy
1834 ).previousWorkerNodeKey
1835 ).toBeDefined()
1836 expect(
1837 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1838 workerChoiceStrategy
1839 ).workerNodeVirtualTaskRunTime
1840 ).toBeDefined()
1841 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1842 expect(
1843 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1844 pool.workerChoiceStrategyContext.workerChoiceStrategy
1845 ).nextWorkerNodeKey
1846 ).toBe(0)
1847 expect(
1848 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1849 pool.workerChoiceStrategyContext.workerChoiceStrategy
1850 ).previousWorkerNodeKey
1851 ).toBe(0)
1852 expect(
1853 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1854 pool.workerChoiceStrategyContext.workerChoiceStrategy
1855 ).workerNodeVirtualTaskRunTime
1856 ).toBe(0)
1857 // We need to clean up the resources after our test
1858 await pool.destroy()
1859 })
1860
1861 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1862 const workerChoiceStrategy =
1863 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1864 let pool = new FixedThreadPool(
1865 max,
1866 './tests/worker-files/thread/testWorker.mjs',
1867 { workerChoiceStrategy }
1868 )
1869 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1870 dynamicWorkerUsage: false,
1871 dynamicWorkerReady: true
1872 })
1873 await pool.destroy()
1874 pool = new DynamicThreadPool(
1875 min,
1876 max,
1877 './tests/worker-files/thread/testWorker.mjs',
1878 { workerChoiceStrategy }
1879 )
1880 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1881 dynamicWorkerUsage: false,
1882 dynamicWorkerReady: true
1883 })
1884 // We need to clean up the resources after our test
1885 await pool.destroy()
1886 })
1887
1888 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1889 const workerChoiceStrategy =
1890 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1891 let pool = new FixedThreadPool(
1892 max,
1893 './tests/worker-files/thread/testWorker.mjs',
1894 { workerChoiceStrategy }
1895 )
1896 expect(
1897 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1898 ).toStrictEqual({
1899 runTime: {
1900 aggregate: true,
1901 average: true,
1902 median: false
1903 },
1904 waitTime: {
1905 aggregate: false,
1906 average: false,
1907 median: false
1908 },
1909 elu: {
1910 aggregate: false,
1911 average: false,
1912 median: false
1913 }
1914 })
1915 await pool.destroy()
1916 pool = new DynamicThreadPool(
1917 min,
1918 max,
1919 './tests/worker-files/thread/testWorker.mjs',
1920 { workerChoiceStrategy }
1921 )
1922 expect(
1923 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1924 ).toStrictEqual({
1925 runTime: {
1926 aggregate: true,
1927 average: true,
1928 median: false
1929 },
1930 waitTime: {
1931 aggregate: false,
1932 average: false,
1933 median: false
1934 },
1935 elu: {
1936 aggregate: false,
1937 average: false,
1938 median: false
1939 }
1940 })
1941 // We need to clean up the resources after our test
1942 await pool.destroy()
1943 })
1944
1945 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1946 const pool = new FixedThreadPool(
1947 max,
1948 './tests/worker-files/thread/testWorker.mjs',
1949 {
1950 workerChoiceStrategy:
1951 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1952 }
1953 )
1954 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1955 const promises = new Set()
1956 const maxMultiplier = 2
1957 for (let i = 0; i < max * maxMultiplier; i++) {
1958 promises.add(pool.execute())
1959 }
1960 await Promise.all(promises)
1961 for (const workerNode of pool.workerNodes) {
1962 expect(workerNode.usage).toStrictEqual({
1963 tasks: {
1964 executed: expect.any(Number),
1965 executing: 0,
1966 queued: 0,
1967 maxQueued: 0,
1968 sequentiallyStolen: 0,
1969 stolen: 0,
1970 failed: 0
1971 },
1972 runTime: expect.objectContaining({
1973 history: expect.any(CircularArray)
1974 }),
1975 waitTime: {
1976 history: new CircularArray()
1977 },
1978 elu: {
1979 idle: {
1980 history: new CircularArray()
1981 },
1982 active: {
1983 history: new CircularArray()
1984 }
1985 }
1986 })
1987 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1988 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1989 max * maxMultiplier
1990 )
1991 }
1992 expect(
1993 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1994 pool.workerChoiceStrategyContext.workerChoiceStrategy
1995 ).roundId
1996 ).toBe(0)
1997 expect(
1998 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1999 pool.workerChoiceStrategyContext.workerChoiceStrategy
2000 ).workerNodeId
2001 ).toBe(0)
2002 expect(
2003 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2004 pool.workerChoiceStrategyContext.workerChoiceStrategy
2005 ).nextWorkerNodeKey
2006 ).toBe(0)
2007 expect(
2008 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2009 pool.workerChoiceStrategyContext.workerChoiceStrategy
2010 ).previousWorkerNodeKey
2011 ).toEqual(0)
2012 expect(
2013 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2014 pool.workerChoiceStrategyContext.workerChoiceStrategy
2015 ).roundWeights.length
2016 ).toBe(1)
2017 expect(
2018 Number.isSafeInteger(
2019 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2020 pool.workerChoiceStrategyContext.workerChoiceStrategy
2021 ).roundWeights[0]
2022 )
2023 ).toBe(true)
2024 // We need to clean up the resources after our test
2025 await pool.destroy()
2026 })
2027
2028 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2029 const pool = new DynamicThreadPool(
2030 min,
2031 max,
2032 './tests/worker-files/thread/testWorker.mjs',
2033 {
2034 workerChoiceStrategy:
2035 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2036 }
2037 )
2038 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2039 const promises = new Set()
2040 const maxMultiplier = 2
2041 for (let i = 0; i < max * maxMultiplier; i++) {
2042 promises.add(pool.execute())
2043 }
2044 await Promise.all(promises)
2045 for (const workerNode of pool.workerNodes) {
2046 expect(workerNode.usage).toStrictEqual({
2047 tasks: {
2048 executed: expect.any(Number),
2049 executing: 0,
2050 queued: 0,
2051 maxQueued: 0,
2052 sequentiallyStolen: 0,
2053 stolen: 0,
2054 failed: 0
2055 },
2056 runTime: expect.objectContaining({
2057 history: expect.any(CircularArray)
2058 }),
2059 waitTime: {
2060 history: new CircularArray()
2061 },
2062 elu: {
2063 idle: {
2064 history: new CircularArray()
2065 },
2066 active: {
2067 history: new CircularArray()
2068 }
2069 }
2070 })
2071 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2072 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2073 max * maxMultiplier
2074 )
2075 }
2076 expect(
2077 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2078 pool.workerChoiceStrategyContext.workerChoiceStrategy
2079 ).roundId
2080 ).toBe(0)
2081 expect(
2082 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2083 pool.workerChoiceStrategyContext.workerChoiceStrategy
2084 ).workerNodeId
2085 ).toBe(0)
2086 expect(
2087 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2088 pool.workerChoiceStrategyContext.workerChoiceStrategy
2089 ).nextWorkerNodeKey
2090 ).toBe(0)
2091 expect(
2092 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2093 pool.workerChoiceStrategyContext.workerChoiceStrategy
2094 ).previousWorkerNodeKey
2095 ).toEqual(0)
2096 expect(
2097 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2098 pool.workerChoiceStrategyContext.workerChoiceStrategy
2099 ).roundWeights.length
2100 ).toBe(1)
2101 expect(
2102 Number.isSafeInteger(
2103 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2104 pool.workerChoiceStrategyContext.workerChoiceStrategy
2105 ).roundWeights[0]
2106 )
2107 ).toBe(true)
2108 // We need to clean up the resources after our test
2109 await pool.destroy()
2110 })
2111
2112 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2113 const workerChoiceStrategy =
2114 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2115 let pool = new FixedThreadPool(
2116 max,
2117 './tests/worker-files/thread/testWorker.mjs'
2118 )
2119 expect(
2120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2121 workerChoiceStrategy
2122 ).roundId
2123 ).toBeDefined()
2124 expect(
2125 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2126 workerChoiceStrategy
2127 ).workerNodeId
2128 ).toBeDefined()
2129 expect(
2130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2131 workerChoiceStrategy
2132 ).nextWorkerNodeKey
2133 ).toBeDefined()
2134 expect(
2135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2136 workerChoiceStrategy
2137 ).previousWorkerNodeKey
2138 ).toBeDefined()
2139 expect(
2140 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2141 workerChoiceStrategy
2142 ).roundWeights
2143 ).toBeDefined()
2144 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2145 expect(
2146 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2147 pool.workerChoiceStrategyContext.workerChoiceStrategy
2148 ).roundId
2149 ).toBe(0)
2150 expect(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2152 pool.workerChoiceStrategyContext.workerChoiceStrategy
2153 ).workerNodeId
2154 ).toBe(0)
2155 expect(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2157 pool.workerChoiceStrategyContext.workerChoiceStrategy
2158 ).nextWorkerNodeKey
2159 ).toBe(0)
2160 expect(
2161 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2162 pool.workerChoiceStrategyContext.workerChoiceStrategy
2163 ).previousWorkerNodeKey
2164 ).toBe(0)
2165 expect(
2166 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2167 pool.workerChoiceStrategyContext.workerChoiceStrategy
2168 ).roundWeights.length
2169 ).toBe(1)
2170 expect(
2171 Number.isSafeInteger(
2172 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2173 pool.workerChoiceStrategyContext.workerChoiceStrategy
2174 ).roundWeights[0]
2175 )
2176 ).toBe(true)
2177 await pool.destroy()
2178 pool = new DynamicThreadPool(
2179 min,
2180 max,
2181 './tests/worker-files/thread/testWorker.mjs'
2182 )
2183 expect(
2184 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2185 workerChoiceStrategy
2186 ).roundId
2187 ).toBeDefined()
2188 expect(
2189 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2190 workerChoiceStrategy
2191 ).workerNodeId
2192 ).toBeDefined()
2193 expect(
2194 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2195 workerChoiceStrategy
2196 ).nextWorkerNodeKey
2197 ).toBeDefined()
2198 expect(
2199 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2200 workerChoiceStrategy
2201 ).previousWorkerNodeKey
2202 ).toBeDefined()
2203 expect(
2204 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2205 workerChoiceStrategy
2206 ).roundWeights
2207 ).toBeDefined()
2208 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2209 expect(
2210 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2211 pool.workerChoiceStrategyContext.workerChoiceStrategy
2212 ).roundId
2213 ).toBe(0)
2214 expect(
2215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2216 pool.workerChoiceStrategyContext.workerChoiceStrategy
2217 ).workerNodeId
2218 ).toBe(0)
2219 expect(
2220 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2221 pool.workerChoiceStrategyContext.workerChoiceStrategy
2222 ).nextWorkerNodeKey
2223 ).toBe(0)
2224 expect(
2225 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2226 pool.workerChoiceStrategyContext.workerChoiceStrategy
2227 ).previousWorkerNodeKey
2228 ).toBe(0)
2229 expect(
2230 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2231 pool.workerChoiceStrategyContext.workerChoiceStrategy
2232 ).roundWeights.length
2233 ).toBe(1)
2234 expect(
2235 Number.isSafeInteger(
2236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2237 pool.workerChoiceStrategyContext.workerChoiceStrategy
2238 ).roundWeights[0]
2239 )
2240 ).toBe(true)
2241 // We need to clean up the resources after our test
2242 await pool.destroy()
2243 })
2244
2245 it('Verify unknown strategy throw error', () => {
2246 expect(
2247 () =>
2248 new DynamicThreadPool(
2249 min,
2250 max,
2251 './tests/worker-files/thread/testWorker.mjs',
2252 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2253 )
2254 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2255 })
2256 })