test: remove unneeded sleep
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.mjs'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 expect(
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
118 workerChoiceStrategy
119 ).nextWorkerNodeKey
120 ).toBe(0)
121 expect(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).previousWorkerNodeKey
125 ).toBe(0)
126 if (
127 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
128 ) {
129 expect(
130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
131 workerChoiceStrategy
132 ).defaultWorkerWeight
133 ).toBeGreaterThan(0)
134 expect(
135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
136 workerChoiceStrategy
137 ).workerNodeVirtualTaskRunTime
138 ).toBe(0)
139 } else if (
140 workerChoiceStrategy ===
141 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
142 ) {
143 expect(
144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
145 workerChoiceStrategy
146 ).defaultWorkerWeight
147 ).toBeGreaterThan(0)
148 expect(
149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
150 workerChoiceStrategy
151 ).workerNodeVirtualTaskRunTime
152 ).toBe(0)
153 expect(
154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
155 workerChoiceStrategy
156 ).roundId
157 ).toBe(0)
158 expect(
159 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
160 workerChoiceStrategy
161 ).workerNodeId
162 ).toBe(0)
163 expect(
164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
165 workerChoiceStrategy
166 ).roundWeights
167 ).toStrictEqual([
168 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
169 workerChoiceStrategy
170 ).defaultWorkerWeight
171 ])
172 }
173 }
174 await pool.destroy()
175 })
176
177 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
178 const pool = new DynamicThreadPool(
179 min,
180 max,
181 './tests/worker-files/thread/testWorker.mjs'
182 )
183 expect(pool.starting).toBe(false)
184 expect(pool.workerNodes.length).toBe(min)
185 const maxMultiplier = 10000
186 const promises = new Set()
187 for (let i = 0; i < max * maxMultiplier; i++) {
188 promises.add(pool.execute())
189 }
190 await Promise.all(promises)
191 expect(pool.workerNodes.length).toBe(max)
192 // We need to clean up the resources after our test
193 await pool.destroy()
194 })
195
196 it('Verify ROUND_ROBIN strategy default policy', async () => {
197 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
198 let pool = new FixedThreadPool(
199 max,
200 './tests/worker-files/thread/testWorker.mjs',
201 { workerChoiceStrategy }
202 )
203 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
204 dynamicWorkerUsage: false,
205 dynamicWorkerReady: true
206 })
207 await pool.destroy()
208 pool = new DynamicThreadPool(
209 min,
210 max,
211 './tests/worker-files/thread/testWorker.mjs',
212 { workerChoiceStrategy }
213 )
214 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
215 dynamicWorkerUsage: false,
216 dynamicWorkerReady: true
217 })
218 // We need to clean up the resources after our test
219 await pool.destroy()
220 })
221
222 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
223 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
224 let pool = new FixedThreadPool(
225 max,
226 './tests/worker-files/thread/testWorker.mjs',
227 { workerChoiceStrategy }
228 )
229 expect(
230 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
231 ).toStrictEqual({
232 runTime: {
233 aggregate: false,
234 average: false,
235 median: false
236 },
237 waitTime: {
238 aggregate: false,
239 average: false,
240 median: false
241 },
242 elu: {
243 aggregate: false,
244 average: false,
245 median: false
246 }
247 })
248 await pool.destroy()
249 pool = new DynamicThreadPool(
250 min,
251 max,
252 './tests/worker-files/thread/testWorker.mjs',
253 { workerChoiceStrategy }
254 )
255 expect(
256 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
257 ).toStrictEqual({
258 runTime: {
259 aggregate: false,
260 average: false,
261 median: false
262 },
263 waitTime: {
264 aggregate: false,
265 average: false,
266 median: false
267 },
268 elu: {
269 aggregate: false,
270 average: false,
271 median: false
272 }
273 })
274 // We need to clean up the resources after our test
275 await pool.destroy()
276 })
277
278 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
279 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
280 const pool = new FixedThreadPool(
281 max,
282 './tests/worker-files/thread/testWorker.mjs',
283 { workerChoiceStrategy }
284 )
285 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
286 const promises = new Set()
287 const maxMultiplier = 2
288 for (let i = 0; i < max * maxMultiplier; i++) {
289 promises.add(pool.execute())
290 }
291 await Promise.all(promises)
292 for (const workerNode of pool.workerNodes) {
293 expect(workerNode.usage).toStrictEqual({
294 tasks: {
295 executed: maxMultiplier,
296 executing: 0,
297 queued: 0,
298 maxQueued: 0,
299 sequentiallyStolen: 0,
300 stolen: 0,
301 failed: 0
302 },
303 runTime: {
304 history: new CircularArray()
305 },
306 waitTime: {
307 history: new CircularArray()
308 },
309 elu: {
310 idle: {
311 history: new CircularArray()
312 },
313 active: {
314 history: new CircularArray()
315 }
316 }
317 })
318 }
319 expect(
320 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
321 pool.workerChoiceStrategyContext.workerChoiceStrategy
322 ).nextWorkerNodeKey
323 ).toBe(0)
324 expect(
325 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
326 pool.workerChoiceStrategyContext.workerChoiceStrategy
327 ).previousWorkerNodeKey
328 ).toBe(pool.workerNodes.length - 1)
329 // We need to clean up the resources after our test
330 await pool.destroy()
331 })
332
333 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
334 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
335 const pool = new DynamicThreadPool(
336 min,
337 max,
338 './tests/worker-files/thread/testWorker.mjs',
339 { workerChoiceStrategy }
340 )
341 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
342 const promises = new Set()
343 const maxMultiplier = 2
344 for (let i = 0; i < max * maxMultiplier; i++) {
345 promises.add(pool.execute())
346 }
347 await Promise.all(promises)
348 for (const workerNode of pool.workerNodes) {
349 expect(workerNode.usage).toStrictEqual({
350 tasks: {
351 executed: expect.any(Number),
352 executing: 0,
353 queued: 0,
354 maxQueued: 0,
355 sequentiallyStolen: 0,
356 stolen: 0,
357 failed: 0
358 },
359 runTime: {
360 history: new CircularArray()
361 },
362 waitTime: {
363 history: new CircularArray()
364 },
365 elu: {
366 idle: {
367 history: new CircularArray()
368 },
369 active: {
370 history: new CircularArray()
371 }
372 }
373 })
374 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
375 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
376 max * maxMultiplier
377 )
378 }
379 expect(
380 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
381 pool.workerChoiceStrategyContext.workerChoiceStrategy
382 ).nextWorkerNodeKey
383 ).toBe(0)
384 expect(
385 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
386 pool.workerChoiceStrategyContext.workerChoiceStrategy
387 ).previousWorkerNodeKey
388 ).toBe(pool.workerNodes.length - 1)
389 // We need to clean up the resources after our test
390 await pool.destroy()
391 })
392
393 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
394 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
395 let pool = new FixedClusterPool(
396 max,
397 './tests/worker-files/cluster/testWorker.js',
398 { workerChoiceStrategy }
399 )
400 let results = new Set()
401 for (let i = 0; i < max; i++) {
402 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
403 }
404 expect(results.size).toBe(max)
405 await pool.destroy()
406 pool = new FixedThreadPool(
407 max,
408 './tests/worker-files/thread/testWorker.mjs',
409 { workerChoiceStrategy }
410 )
411 results = new Set()
412 for (let i = 0; i < max; i++) {
413 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
414 }
415 expect(results.size).toBe(max)
416 await pool.destroy()
417 })
418
419 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
420 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
421 let pool = new FixedThreadPool(
422 max,
423 './tests/worker-files/thread/testWorker.mjs',
424 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
425 )
426 expect(
427 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
428 pool.workerChoiceStrategyContext.workerChoiceStrategy
429 ).nextWorkerNodeKey
430 ).toBeDefined()
431 expect(
432 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
433 pool.workerChoiceStrategyContext.workerChoiceStrategy
434 ).previousWorkerNodeKey
435 ).toBeDefined()
436 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
437 expect(
438 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
439 pool.workerChoiceStrategyContext.workerChoiceStrategy
440 ).nextWorkerNodeKey
441 ).toBe(0)
442 expect(
443 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
444 pool.workerChoiceStrategyContext.workerChoiceStrategy
445 ).previousWorkerNodeKey
446 ).toBe(0)
447 await pool.destroy()
448 pool = new DynamicThreadPool(
449 min,
450 max,
451 './tests/worker-files/thread/testWorker.mjs',
452 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
453 )
454 expect(
455 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
456 pool.workerChoiceStrategyContext.workerChoiceStrategy
457 ).nextWorkerNodeKey
458 ).toBeDefined()
459 expect(
460 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
461 pool.workerChoiceStrategyContext.workerChoiceStrategy
462 ).previousWorkerNodeKey
463 ).toBeDefined()
464 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
465 expect(
466 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
467 pool.workerChoiceStrategyContext.workerChoiceStrategy
468 ).nextWorkerNodeKey
469 ).toBe(0)
470 expect(
471 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
472 pool.workerChoiceStrategyContext.workerChoiceStrategy
473 ).previousWorkerNodeKey
474 ).toBe(0)
475 // We need to clean up the resources after our test
476 await pool.destroy()
477 })
478
479 it('Verify LEAST_USED strategy default policy', async () => {
480 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
481 let pool = new FixedThreadPool(
482 max,
483 './tests/worker-files/thread/testWorker.mjs',
484 { workerChoiceStrategy }
485 )
486 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
487 dynamicWorkerUsage: false,
488 dynamicWorkerReady: true
489 })
490 await pool.destroy()
491 pool = new DynamicThreadPool(
492 min,
493 max,
494 './tests/worker-files/thread/testWorker.mjs',
495 { workerChoiceStrategy }
496 )
497 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
498 dynamicWorkerUsage: false,
499 dynamicWorkerReady: true
500 })
501 // We need to clean up the resources after our test
502 await pool.destroy()
503 })
504
505 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
506 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
507 let pool = new FixedThreadPool(
508 max,
509 './tests/worker-files/thread/testWorker.mjs',
510 { workerChoiceStrategy }
511 )
512 expect(
513 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
514 ).toStrictEqual({
515 runTime: {
516 aggregate: false,
517 average: false,
518 median: false
519 },
520 waitTime: {
521 aggregate: false,
522 average: false,
523 median: false
524 },
525 elu: {
526 aggregate: false,
527 average: false,
528 median: false
529 }
530 })
531 await pool.destroy()
532 pool = new DynamicThreadPool(
533 min,
534 max,
535 './tests/worker-files/thread/testWorker.mjs',
536 { workerChoiceStrategy }
537 )
538 expect(
539 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
540 ).toStrictEqual({
541 runTime: {
542 aggregate: false,
543 average: false,
544 median: false
545 },
546 waitTime: {
547 aggregate: false,
548 average: false,
549 median: false
550 },
551 elu: {
552 aggregate: false,
553 average: false,
554 median: false
555 }
556 })
557 // We need to clean up the resources after our test
558 await pool.destroy()
559 })
560
561 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
562 const pool = new FixedThreadPool(
563 max,
564 './tests/worker-files/thread/testWorker.mjs',
565 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
566 )
567 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
568 const promises = new Set()
569 const maxMultiplier = 2
570 for (let i = 0; i < max * maxMultiplier; i++) {
571 promises.add(pool.execute())
572 }
573 await Promise.all(promises)
574 for (const workerNode of pool.workerNodes) {
575 expect(workerNode.usage).toStrictEqual({
576 tasks: {
577 executed: expect.any(Number),
578 executing: 0,
579 queued: 0,
580 maxQueued: 0,
581 sequentiallyStolen: 0,
582 stolen: 0,
583 failed: 0
584 },
585 runTime: {
586 history: new CircularArray()
587 },
588 waitTime: {
589 history: new CircularArray()
590 },
591 elu: {
592 idle: {
593 history: new CircularArray()
594 },
595 active: {
596 history: new CircularArray()
597 }
598 }
599 })
600 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
601 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
602 max * maxMultiplier
603 )
604 }
605 expect(
606 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
607 pool.workerChoiceStrategyContext.workerChoiceStrategy
608 ).nextWorkerNodeKey
609 ).toEqual(expect.any(Number))
610 expect(
611 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
612 pool.workerChoiceStrategyContext.workerChoiceStrategy
613 ).previousWorkerNodeKey
614 ).toEqual(expect.any(Number))
615 // We need to clean up the resources after our test
616 await pool.destroy()
617 })
618
619 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
620 const pool = new DynamicThreadPool(
621 min,
622 max,
623 './tests/worker-files/thread/testWorker.mjs',
624 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
625 )
626 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
627 const promises = new Set()
628 const maxMultiplier = 2
629 for (let i = 0; i < max * maxMultiplier; i++) {
630 promises.add(pool.execute())
631 }
632 await Promise.all(promises)
633 for (const workerNode of pool.workerNodes) {
634 expect(workerNode.usage).toStrictEqual({
635 tasks: {
636 executed: expect.any(Number),
637 executing: 0,
638 queued: 0,
639 maxQueued: 0,
640 sequentiallyStolen: 0,
641 stolen: 0,
642 failed: 0
643 },
644 runTime: {
645 history: new CircularArray()
646 },
647 waitTime: {
648 history: new CircularArray()
649 },
650 elu: {
651 idle: {
652 history: new CircularArray()
653 },
654 active: {
655 history: new CircularArray()
656 }
657 }
658 })
659 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
660 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
661 max * maxMultiplier
662 )
663 }
664 expect(
665 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
666 pool.workerChoiceStrategyContext.workerChoiceStrategy
667 ).nextWorkerNodeKey
668 ).toEqual(expect.any(Number))
669 expect(
670 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
671 pool.workerChoiceStrategyContext.workerChoiceStrategy
672 ).previousWorkerNodeKey
673 ).toEqual(expect.any(Number))
674 // We need to clean up the resources after our test
675 await pool.destroy()
676 })
677
678 it('Verify LEAST_BUSY strategy default policy', async () => {
679 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
680 let pool = new FixedThreadPool(
681 max,
682 './tests/worker-files/thread/testWorker.mjs',
683 { workerChoiceStrategy }
684 )
685 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
686 dynamicWorkerUsage: false,
687 dynamicWorkerReady: true
688 })
689 await pool.destroy()
690 pool = new DynamicThreadPool(
691 min,
692 max,
693 './tests/worker-files/thread/testWorker.mjs',
694 { workerChoiceStrategy }
695 )
696 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
697 dynamicWorkerUsage: false,
698 dynamicWorkerReady: true
699 })
700 // We need to clean up the resources after our test
701 await pool.destroy()
702 })
703
704 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
705 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
706 let pool = new FixedThreadPool(
707 max,
708 './tests/worker-files/thread/testWorker.mjs',
709 { workerChoiceStrategy }
710 )
711 expect(
712 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
713 ).toStrictEqual({
714 runTime: {
715 aggregate: true,
716 average: false,
717 median: false
718 },
719 waitTime: {
720 aggregate: true,
721 average: false,
722 median: false
723 },
724 elu: {
725 aggregate: false,
726 average: false,
727 median: false
728 }
729 })
730 await pool.destroy()
731 pool = new DynamicThreadPool(
732 min,
733 max,
734 './tests/worker-files/thread/testWorker.mjs',
735 { workerChoiceStrategy }
736 )
737 expect(
738 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
739 ).toStrictEqual({
740 runTime: {
741 aggregate: true,
742 average: false,
743 median: false
744 },
745 waitTime: {
746 aggregate: true,
747 average: false,
748 median: false
749 },
750 elu: {
751 aggregate: false,
752 average: false,
753 median: false
754 }
755 })
756 // We need to clean up the resources after our test
757 await pool.destroy()
758 })
759
760 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
761 const pool = new FixedThreadPool(
762 max,
763 './tests/worker-files/thread/testWorker.mjs',
764 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
765 )
766 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
767 const promises = new Set()
768 const maxMultiplier = 2
769 for (let i = 0; i < max * maxMultiplier; i++) {
770 promises.add(pool.execute())
771 }
772 await Promise.all(promises)
773 for (const workerNode of pool.workerNodes) {
774 expect(workerNode.usage).toStrictEqual({
775 tasks: {
776 executed: expect.any(Number),
777 executing: 0,
778 queued: 0,
779 maxQueued: 0,
780 sequentiallyStolen: 0,
781 stolen: 0,
782 failed: 0
783 },
784 runTime: expect.objectContaining({
785 history: expect.any(CircularArray)
786 }),
787 waitTime: expect.objectContaining({
788 history: expect.any(CircularArray)
789 }),
790 elu: {
791 idle: {
792 history: new CircularArray()
793 },
794 active: {
795 history: new CircularArray()
796 }
797 }
798 })
799 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
800 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
801 max * maxMultiplier
802 )
803 if (workerNode.usage.runTime.aggregate == null) {
804 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
805 } else {
806 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
807 }
808 if (workerNode.usage.waitTime.aggregate == null) {
809 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
810 } else {
811 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
812 }
813 }
814 expect(
815 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
816 pool.workerChoiceStrategyContext.workerChoiceStrategy
817 ).nextWorkerNodeKey
818 ).toEqual(expect.any(Number))
819 expect(
820 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
821 pool.workerChoiceStrategyContext.workerChoiceStrategy
822 ).previousWorkerNodeKey
823 ).toEqual(expect.any(Number))
824 // We need to clean up the resources after our test
825 await pool.destroy()
826 })
827
828 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
829 const pool = new DynamicThreadPool(
830 min,
831 max,
832 './tests/worker-files/thread/testWorker.mjs',
833 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
834 )
835 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
836 const promises = new Set()
837 const maxMultiplier = 2
838 for (let i = 0; i < max * maxMultiplier; i++) {
839 promises.add(pool.execute())
840 }
841 await Promise.all(promises)
842 for (const workerNode of pool.workerNodes) {
843 expect(workerNode.usage).toStrictEqual({
844 tasks: {
845 executed: expect.any(Number),
846 executing: 0,
847 queued: 0,
848 maxQueued: 0,
849 sequentiallyStolen: 0,
850 stolen: 0,
851 failed: 0
852 },
853 runTime: expect.objectContaining({
854 history: expect.any(CircularArray)
855 }),
856 waitTime: expect.objectContaining({
857 history: expect.any(CircularArray)
858 }),
859 elu: {
860 idle: {
861 history: new CircularArray()
862 },
863 active: {
864 history: new CircularArray()
865 }
866 }
867 })
868 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
869 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
870 max * maxMultiplier
871 )
872 if (workerNode.usage.runTime.aggregate == null) {
873 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
874 } else {
875 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
876 }
877 if (workerNode.usage.waitTime.aggregate == null) {
878 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
879 } else {
880 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
881 }
882 }
883 expect(
884 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
885 pool.workerChoiceStrategyContext.workerChoiceStrategy
886 ).nextWorkerNodeKey
887 ).toEqual(expect.any(Number))
888 expect(
889 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
890 pool.workerChoiceStrategyContext.workerChoiceStrategy
891 ).previousWorkerNodeKey
892 ).toEqual(expect.any(Number))
893 // We need to clean up the resources after our test
894 await pool.destroy()
895 })
896
897 it('Verify LEAST_ELU strategy default policy', async () => {
898 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
899 let pool = new FixedThreadPool(
900 max,
901 './tests/worker-files/thread/testWorker.mjs',
902 { workerChoiceStrategy }
903 )
904 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
905 dynamicWorkerUsage: false,
906 dynamicWorkerReady: true
907 })
908 await pool.destroy()
909 pool = new DynamicThreadPool(
910 min,
911 max,
912 './tests/worker-files/thread/testWorker.mjs',
913 { workerChoiceStrategy }
914 )
915 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
916 dynamicWorkerUsage: false,
917 dynamicWorkerReady: true
918 })
919 // We need to clean up the resources after our test
920 await pool.destroy()
921 })
922
923 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
924 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
925 let pool = new FixedThreadPool(
926 max,
927 './tests/worker-files/thread/testWorker.mjs',
928 { workerChoiceStrategy }
929 )
930 expect(
931 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932 ).toStrictEqual({
933 runTime: {
934 aggregate: false,
935 average: false,
936 median: false
937 },
938 waitTime: {
939 aggregate: false,
940 average: false,
941 median: false
942 },
943 elu: {
944 aggregate: true,
945 average: false,
946 median: false
947 }
948 })
949 await pool.destroy()
950 pool = new DynamicThreadPool(
951 min,
952 max,
953 './tests/worker-files/thread/testWorker.mjs',
954 { workerChoiceStrategy }
955 )
956 expect(
957 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
958 ).toStrictEqual({
959 runTime: {
960 aggregate: false,
961 average: false,
962 median: false
963 },
964 waitTime: {
965 aggregate: false,
966 average: false,
967 median: false
968 },
969 elu: {
970 aggregate: true,
971 average: false,
972 median: false
973 }
974 })
975 // We need to clean up the resources after our test
976 await pool.destroy()
977 })
978
979 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
980 const pool = new FixedThreadPool(
981 max,
982 './tests/worker-files/thread/testWorker.mjs',
983 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
984 )
985 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
986 const promises = new Set()
987 const maxMultiplier = 2
988 for (let i = 0; i < max * maxMultiplier; i++) {
989 promises.add(pool.execute())
990 }
991 await Promise.all(promises)
992 for (const workerNode of pool.workerNodes) {
993 expect(workerNode.usage).toStrictEqual({
994 tasks: {
995 executed: expect.any(Number),
996 executing: 0,
997 queued: 0,
998 maxQueued: 0,
999 sequentiallyStolen: 0,
1000 stolen: 0,
1001 failed: 0
1002 },
1003 runTime: {
1004 history: new CircularArray()
1005 },
1006 waitTime: {
1007 history: new CircularArray()
1008 },
1009 elu: expect.objectContaining({
1010 idle: expect.objectContaining({
1011 history: expect.any(CircularArray)
1012 }),
1013 active: expect.objectContaining({
1014 history: expect.any(CircularArray)
1015 })
1016 })
1017 })
1018 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1019 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1020 max * maxMultiplier
1021 )
1022 if (workerNode.usage.elu.active.aggregate == null) {
1023 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1024 } else {
1025 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1026 }
1027 if (workerNode.usage.elu.idle.aggregate == null) {
1028 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1029 } else {
1030 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1031 }
1032 if (workerNode.usage.elu.utilization == null) {
1033 expect(workerNode.usage.elu.utilization).toBeUndefined()
1034 } else {
1035 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1036 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1037 }
1038 }
1039 expect(
1040 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1041 pool.workerChoiceStrategyContext.workerChoiceStrategy
1042 ).nextWorkerNodeKey
1043 ).toEqual(expect.any(Number))
1044 expect(
1045 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1046 pool.workerChoiceStrategyContext.workerChoiceStrategy
1047 ).previousWorkerNodeKey
1048 ).toEqual(expect.any(Number))
1049 // We need to clean up the resources after our test
1050 await pool.destroy()
1051 })
1052
1053 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1054 const pool = new DynamicThreadPool(
1055 min,
1056 max,
1057 './tests/worker-files/thread/testWorker.mjs',
1058 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1059 )
1060 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1061 const promises = new Set()
1062 const maxMultiplier = 2
1063 for (let i = 0; i < max * maxMultiplier; i++) {
1064 promises.add(pool.execute())
1065 }
1066 await Promise.all(promises)
1067 for (const workerNode of pool.workerNodes) {
1068 expect(workerNode.usage).toStrictEqual({
1069 tasks: {
1070 executed: expect.any(Number),
1071 executing: 0,
1072 queued: 0,
1073 maxQueued: 0,
1074 sequentiallyStolen: 0,
1075 stolen: 0,
1076 failed: 0
1077 },
1078 runTime: {
1079 history: new CircularArray()
1080 },
1081 waitTime: {
1082 history: new CircularArray()
1083 },
1084 elu: expect.objectContaining({
1085 idle: expect.objectContaining({
1086 history: expect.any(CircularArray)
1087 }),
1088 active: expect.objectContaining({
1089 history: expect.any(CircularArray)
1090 })
1091 })
1092 })
1093 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1094 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1095 max * maxMultiplier
1096 )
1097 if (workerNode.usage.elu.active.aggregate == null) {
1098 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1099 } else {
1100 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1101 }
1102 if (workerNode.usage.elu.idle.aggregate == null) {
1103 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1104 } else {
1105 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1106 }
1107 if (workerNode.usage.elu.utilization == null) {
1108 expect(workerNode.usage.elu.utilization).toBeUndefined()
1109 } else {
1110 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1111 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1112 }
1113 }
1114 expect(
1115 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1116 pool.workerChoiceStrategyContext.workerChoiceStrategy
1117 ).nextWorkerNodeKey
1118 ).toEqual(expect.any(Number))
1119 expect(
1120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1121 pool.workerChoiceStrategyContext.workerChoiceStrategy
1122 ).previousWorkerNodeKey
1123 ).toEqual(expect.any(Number))
1124 // We need to clean up the resources after our test
1125 await pool.destroy()
1126 })
1127
1128 it('Verify FAIR_SHARE strategy default policy', async () => {
1129 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1130 let pool = new FixedThreadPool(
1131 max,
1132 './tests/worker-files/thread/testWorker.mjs',
1133 { workerChoiceStrategy }
1134 )
1135 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1136 dynamicWorkerUsage: false,
1137 dynamicWorkerReady: true
1138 })
1139 await pool.destroy()
1140 pool = new DynamicThreadPool(
1141 min,
1142 max,
1143 './tests/worker-files/thread/testWorker.mjs',
1144 { workerChoiceStrategy }
1145 )
1146 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1147 dynamicWorkerUsage: false,
1148 dynamicWorkerReady: true
1149 })
1150 // We need to clean up the resources after our test
1151 await pool.destroy()
1152 })
1153
1154 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1155 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1156 let pool = new FixedThreadPool(
1157 max,
1158 './tests/worker-files/thread/testWorker.mjs',
1159 { workerChoiceStrategy }
1160 )
1161 expect(
1162 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1163 ).toStrictEqual({
1164 runTime: {
1165 aggregate: true,
1166 average: true,
1167 median: false
1168 },
1169 waitTime: {
1170 aggregate: false,
1171 average: false,
1172 median: false
1173 },
1174 elu: {
1175 aggregate: true,
1176 average: true,
1177 median: false
1178 }
1179 })
1180 await pool.destroy()
1181 pool = new DynamicThreadPool(
1182 min,
1183 max,
1184 './tests/worker-files/thread/testWorker.mjs',
1185 { workerChoiceStrategy }
1186 )
1187 expect(
1188 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1189 ).toStrictEqual({
1190 runTime: {
1191 aggregate: true,
1192 average: true,
1193 median: false
1194 },
1195 waitTime: {
1196 aggregate: false,
1197 average: false,
1198 median: false
1199 },
1200 elu: {
1201 aggregate: true,
1202 average: true,
1203 median: false
1204 }
1205 })
1206 // We need to clean up the resources after our test
1207 await pool.destroy()
1208 })
1209
1210 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1211 const pool = new FixedThreadPool(
1212 max,
1213 './tests/worker-files/thread/testWorker.mjs',
1214 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1215 )
1216 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1217 const promises = new Set()
1218 const maxMultiplier = 2
1219 for (let i = 0; i < max * maxMultiplier; i++) {
1220 promises.add(pool.execute())
1221 }
1222 await Promise.all(promises)
1223 for (const workerNode of pool.workerNodes) {
1224 expect(workerNode.usage).toStrictEqual({
1225 tasks: {
1226 executed: expect.any(Number),
1227 executing: 0,
1228 queued: 0,
1229 maxQueued: 0,
1230 sequentiallyStolen: 0,
1231 stolen: 0,
1232 failed: 0
1233 },
1234 runTime: expect.objectContaining({
1235 history: expect.any(CircularArray)
1236 }),
1237 waitTime: {
1238 history: new CircularArray()
1239 },
1240 elu: expect.objectContaining({
1241 idle: expect.objectContaining({
1242 history: expect.any(CircularArray)
1243 }),
1244 active: expect.objectContaining({
1245 history: expect.any(CircularArray)
1246 })
1247 })
1248 })
1249 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1250 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1251 max * maxMultiplier
1252 )
1253 if (workerNode.usage.runTime.aggregate == null) {
1254 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1255 } else {
1256 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1257 }
1258 if (workerNode.usage.runTime.average == null) {
1259 expect(workerNode.usage.runTime.average).toBeUndefined()
1260 } else {
1261 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1262 }
1263 if (workerNode.usage.elu.active.aggregate == null) {
1264 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1265 } else {
1266 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1267 }
1268 if (workerNode.usage.elu.idle.aggregate == null) {
1269 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1270 } else {
1271 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1272 }
1273 if (workerNode.usage.elu.utilization == null) {
1274 expect(workerNode.usage.elu.utilization).toBeUndefined()
1275 } else {
1276 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1277 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1278 }
1279 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1280 }
1281 expect(
1282 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1283 pool.workerChoiceStrategyContext.workerChoiceStrategy
1284 ).nextWorkerNodeKey
1285 ).toEqual(expect.any(Number))
1286 expect(
1287 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1288 pool.workerChoiceStrategyContext.workerChoiceStrategy
1289 ).previousWorkerNodeKey
1290 ).toEqual(expect.any(Number))
1291 // We need to clean up the resources after our test
1292 await pool.destroy()
1293 })
1294
1295 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1296 const pool = new DynamicThreadPool(
1297 min,
1298 max,
1299 './tests/worker-files/thread/testWorker.mjs',
1300 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1301 )
1302 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1303 const promises = new Set()
1304 const maxMultiplier = 2
1305 for (let i = 0; i < max * maxMultiplier; i++) {
1306 promises.add(pool.execute())
1307 }
1308 await Promise.all(promises)
1309 for (const workerNode of pool.workerNodes) {
1310 expect(workerNode.usage).toStrictEqual({
1311 tasks: {
1312 executed: expect.any(Number),
1313 executing: 0,
1314 queued: 0,
1315 maxQueued: 0,
1316 sequentiallyStolen: 0,
1317 stolen: 0,
1318 failed: 0
1319 },
1320 runTime: expect.objectContaining({
1321 history: expect.any(CircularArray)
1322 }),
1323 waitTime: {
1324 history: new CircularArray()
1325 },
1326 elu: expect.objectContaining({
1327 idle: expect.objectContaining({
1328 history: expect.any(CircularArray)
1329 }),
1330 active: expect.objectContaining({
1331 history: expect.any(CircularArray)
1332 })
1333 })
1334 })
1335 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1336 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1337 max * maxMultiplier
1338 )
1339 if (workerNode.usage.runTime.aggregate == null) {
1340 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1341 } else {
1342 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1343 }
1344 if (workerNode.usage.runTime.average == null) {
1345 expect(workerNode.usage.runTime.average).toBeUndefined()
1346 } else {
1347 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1348 }
1349 if (workerNode.usage.elu.active.aggregate == null) {
1350 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1351 } else {
1352 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1353 }
1354 if (workerNode.usage.elu.idle.aggregate == null) {
1355 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1356 } else {
1357 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1358 }
1359 if (workerNode.usage.elu.utilization == null) {
1360 expect(workerNode.usage.elu.utilization).toBeUndefined()
1361 } else {
1362 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1363 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1364 }
1365 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1366 }
1367 expect(
1368 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1369 pool.workerChoiceStrategyContext.workerChoiceStrategy
1370 ).nextWorkerNodeKey
1371 ).toEqual(expect.any(Number))
1372 expect(
1373 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1374 pool.workerChoiceStrategyContext.workerChoiceStrategy
1375 ).previousWorkerNodeKey
1376 ).toEqual(expect.any(Number))
1377 // We need to clean up the resources after our test
1378 await pool.destroy()
1379 })
1380
1381 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1382 const pool = new DynamicThreadPool(
1383 min,
1384 max,
1385 './tests/worker-files/thread/testWorker.mjs',
1386 {
1387 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1388 workerChoiceStrategyOptions: {
1389 runTime: { median: true }
1390 }
1391 }
1392 )
1393 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1394 const promises = new Set()
1395 const maxMultiplier = 2
1396 for (let i = 0; i < max * maxMultiplier; i++) {
1397 promises.add(pool.execute())
1398 }
1399 await Promise.all(promises)
1400 for (const workerNode of pool.workerNodes) {
1401 expect(workerNode.usage).toStrictEqual({
1402 tasks: {
1403 executed: expect.any(Number),
1404 executing: 0,
1405 queued: 0,
1406 maxQueued: 0,
1407 sequentiallyStolen: 0,
1408 stolen: 0,
1409 failed: 0
1410 },
1411 runTime: expect.objectContaining({
1412 history: expect.any(CircularArray)
1413 }),
1414 waitTime: {
1415 history: new CircularArray()
1416 },
1417 elu: expect.objectContaining({
1418 idle: expect.objectContaining({
1419 history: expect.any(CircularArray)
1420 }),
1421 active: expect.objectContaining({
1422 history: expect.any(CircularArray)
1423 })
1424 })
1425 })
1426 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1427 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1428 max * maxMultiplier
1429 )
1430 if (workerNode.usage.runTime.aggregate == null) {
1431 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1432 } else {
1433 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1434 }
1435 if (workerNode.usage.runTime.median == null) {
1436 expect(workerNode.usage.runTime.median).toBeUndefined()
1437 } else {
1438 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1439 }
1440 if (workerNode.usage.elu.active.aggregate == null) {
1441 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1442 } else {
1443 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1444 }
1445 if (workerNode.usage.elu.idle.aggregate == null) {
1446 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1447 } else {
1448 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1449 }
1450 if (workerNode.usage.elu.utilization == null) {
1451 expect(workerNode.usage.elu.utilization).toBeUndefined()
1452 } else {
1453 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1454 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1455 }
1456 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1457 }
1458 expect(
1459 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1460 pool.workerChoiceStrategyContext.workerChoiceStrategy
1461 ).nextWorkerNodeKey
1462 ).toEqual(expect.any(Number))
1463 expect(
1464 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1465 pool.workerChoiceStrategyContext.workerChoiceStrategy
1466 ).previousWorkerNodeKey
1467 ).toEqual(expect.any(Number))
1468 // We need to clean up the resources after our test
1469 await pool.destroy()
1470 })
1471
1472 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1473 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1474 let pool = new FixedThreadPool(
1475 max,
1476 './tests/worker-files/thread/testWorker.mjs'
1477 )
1478 for (const workerNode of pool.workerNodes) {
1479 workerNode.strategyData = {
1480 virtualTaskEndTimestamp: performance.now()
1481 }
1482 }
1483 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1484 for (const workerNode of pool.workerNodes) {
1485 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1486 }
1487 await pool.destroy()
1488 pool = new DynamicThreadPool(
1489 min,
1490 max,
1491 './tests/worker-files/thread/testWorker.mjs'
1492 )
1493 for (const workerNode of pool.workerNodes) {
1494 workerNode.strategyData = {
1495 virtualTaskEndTimestamp: performance.now()
1496 }
1497 }
1498 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1499 for (const workerNode of pool.workerNodes) {
1500 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1501 }
1502 // We need to clean up the resources after our test
1503 await pool.destroy()
1504 })
1505
1506 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1507 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1508 let pool = new FixedThreadPool(
1509 max,
1510 './tests/worker-files/thread/testWorker.mjs',
1511 { workerChoiceStrategy }
1512 )
1513 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1514 dynamicWorkerUsage: false,
1515 dynamicWorkerReady: true
1516 })
1517 await pool.destroy()
1518 pool = new DynamicThreadPool(
1519 min,
1520 max,
1521 './tests/worker-files/thread/testWorker.mjs',
1522 { workerChoiceStrategy }
1523 )
1524 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1525 dynamicWorkerUsage: false,
1526 dynamicWorkerReady: true
1527 })
1528 // We need to clean up the resources after our test
1529 await pool.destroy()
1530 })
1531
1532 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1533 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1534 let pool = new FixedThreadPool(
1535 max,
1536 './tests/worker-files/thread/testWorker.mjs',
1537 { workerChoiceStrategy }
1538 )
1539 expect(
1540 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1541 ).toStrictEqual({
1542 runTime: {
1543 aggregate: true,
1544 average: true,
1545 median: false
1546 },
1547 waitTime: {
1548 aggregate: false,
1549 average: false,
1550 median: false
1551 },
1552 elu: {
1553 aggregate: false,
1554 average: false,
1555 median: false
1556 }
1557 })
1558 await pool.destroy()
1559 pool = new DynamicThreadPool(
1560 min,
1561 max,
1562 './tests/worker-files/thread/testWorker.mjs',
1563 { workerChoiceStrategy }
1564 )
1565 expect(
1566 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1567 ).toStrictEqual({
1568 runTime: {
1569 aggregate: true,
1570 average: true,
1571 median: false
1572 },
1573 waitTime: {
1574 aggregate: false,
1575 average: false,
1576 median: false
1577 },
1578 elu: {
1579 aggregate: false,
1580 average: false,
1581 median: false
1582 }
1583 })
1584 // We need to clean up the resources after our test
1585 await pool.destroy()
1586 })
1587
1588 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1589 const pool = new FixedThreadPool(
1590 max,
1591 './tests/worker-files/thread/testWorker.mjs',
1592 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1593 )
1594 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1595 const promises = new Set()
1596 const maxMultiplier = 2
1597 for (let i = 0; i < max * maxMultiplier; i++) {
1598 promises.add(pool.execute())
1599 }
1600 await Promise.all(promises)
1601 for (const workerNode of pool.workerNodes) {
1602 expect(workerNode.usage).toStrictEqual({
1603 tasks: {
1604 executed: expect.any(Number),
1605 executing: 0,
1606 queued: 0,
1607 maxQueued: 0,
1608 sequentiallyStolen: 0,
1609 stolen: 0,
1610 failed: 0
1611 },
1612 runTime: expect.objectContaining({
1613 history: expect.any(CircularArray)
1614 }),
1615 waitTime: {
1616 history: new CircularArray()
1617 },
1618 elu: {
1619 idle: {
1620 history: new CircularArray()
1621 },
1622 active: {
1623 history: new CircularArray()
1624 }
1625 }
1626 })
1627 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1628 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1629 max * maxMultiplier
1630 )
1631 if (workerNode.usage.runTime.aggregate == null) {
1632 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1633 } else {
1634 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1635 }
1636 if (workerNode.usage.runTime.average == null) {
1637 expect(workerNode.usage.runTime.average).toBeUndefined()
1638 } else {
1639 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1640 }
1641 }
1642 expect(
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 pool.workerChoiceStrategyContext.workerChoiceStrategy
1645 ).nextWorkerNodeKey
1646 ).toBe(0)
1647 expect(
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 ).previousWorkerNodeKey
1651 ).toBe(0)
1652 expect(
1653 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1654 pool.workerChoiceStrategyContext.workerChoiceStrategy
1655 ).defaultWorkerWeight
1656 ).toBeGreaterThan(0)
1657 expect(
1658 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1659 pool.workerChoiceStrategyContext.workerChoiceStrategy
1660 ).workerNodeVirtualTaskRunTime
1661 ).toBeGreaterThanOrEqual(0)
1662 // We need to clean up the resources after our test
1663 await pool.destroy()
1664 })
1665
1666 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1667 const pool = new DynamicThreadPool(
1668 min,
1669 max,
1670 './tests/worker-files/thread/testWorker.mjs',
1671 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1672 )
1673 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1674 const promises = new Set()
1675 const maxMultiplier = 2
1676 for (let i = 0; i < max * maxMultiplier; i++) {
1677 promises.add(pool.execute())
1678 }
1679 await Promise.all(promises)
1680 for (const workerNode of pool.workerNodes) {
1681 expect(workerNode.usage).toStrictEqual({
1682 tasks: {
1683 executed: expect.any(Number),
1684 executing: 0,
1685 queued: 0,
1686 maxQueued: 0,
1687 sequentiallyStolen: 0,
1688 stolen: 0,
1689 failed: 0
1690 },
1691 runTime: expect.objectContaining({
1692 history: expect.any(CircularArray)
1693 }),
1694 waitTime: {
1695 history: new CircularArray()
1696 },
1697 elu: {
1698 idle: {
1699 history: new CircularArray()
1700 },
1701 active: {
1702 history: new CircularArray()
1703 }
1704 }
1705 })
1706 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1707 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1708 max * maxMultiplier
1709 )
1710 if (workerNode.usage.runTime.aggregate == null) {
1711 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1712 } else {
1713 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1714 }
1715 if (workerNode.usage.runTime.average == null) {
1716 expect(workerNode.usage.runTime.average).toBeUndefined()
1717 } else {
1718 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1719 }
1720 }
1721 expect(
1722 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1723 pool.workerChoiceStrategyContext.workerChoiceStrategy
1724 ).nextWorkerNodeKey
1725 ).toBe(0)
1726 expect(
1727 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1728 pool.workerChoiceStrategyContext.workerChoiceStrategy
1729 ).previousWorkerNodeKey
1730 ).toBe(0)
1731 expect(
1732 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1733 pool.workerChoiceStrategyContext.workerChoiceStrategy
1734 ).defaultWorkerWeight
1735 ).toBeGreaterThan(0)
1736 expect(
1737 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1738 pool.workerChoiceStrategyContext.workerChoiceStrategy
1739 ).workerNodeVirtualTaskRunTime
1740 ).toBeGreaterThanOrEqual(0)
1741 // We need to clean up the resources after our test
1742 await pool.destroy()
1743 })
1744
1745 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1746 const pool = new DynamicThreadPool(
1747 min,
1748 max,
1749 './tests/worker-files/thread/testWorker.mjs',
1750 {
1751 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1752 workerChoiceStrategyOptions: {
1753 runTime: { median: true }
1754 }
1755 }
1756 )
1757 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1758 const promises = new Set()
1759 const maxMultiplier = 2
1760 for (let i = 0; i < max * maxMultiplier; i++) {
1761 promises.add(pool.execute())
1762 }
1763 await Promise.all(promises)
1764 for (const workerNode of pool.workerNodes) {
1765 expect(workerNode.usage).toStrictEqual({
1766 tasks: {
1767 executed: expect.any(Number),
1768 executing: 0,
1769 queued: 0,
1770 maxQueued: 0,
1771 sequentiallyStolen: 0,
1772 stolen: 0,
1773 failed: 0
1774 },
1775 runTime: expect.objectContaining({
1776 history: expect.any(CircularArray)
1777 }),
1778 waitTime: {
1779 history: new CircularArray()
1780 },
1781 elu: {
1782 idle: {
1783 history: new CircularArray()
1784 },
1785 active: {
1786 history: new CircularArray()
1787 }
1788 }
1789 })
1790 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1791 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1792 max * maxMultiplier
1793 )
1794 if (workerNode.usage.runTime.aggregate == null) {
1795 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1796 } else {
1797 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1798 }
1799 if (workerNode.usage.runTime.median == null) {
1800 expect(workerNode.usage.runTime.median).toBeUndefined()
1801 } else {
1802 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1803 }
1804 }
1805 expect(
1806 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1807 pool.workerChoiceStrategyContext.workerChoiceStrategy
1808 ).nextWorkerNodeKey
1809 ).toBe(0)
1810 expect(
1811 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1812 pool.workerChoiceStrategyContext.workerChoiceStrategy
1813 ).previousWorkerNodeKey
1814 ).toBe(0)
1815 expect(
1816 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1817 pool.workerChoiceStrategyContext.workerChoiceStrategy
1818 ).defaultWorkerWeight
1819 ).toBeGreaterThan(0)
1820 expect(
1821 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1822 pool.workerChoiceStrategyContext.workerChoiceStrategy
1823 ).workerNodeVirtualTaskRunTime
1824 ).toBeGreaterThanOrEqual(0)
1825 // We need to clean up the resources after our test
1826 await pool.destroy()
1827 })
1828
1829 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1830 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1831 let pool = new FixedThreadPool(
1832 max,
1833 './tests/worker-files/thread/testWorker.mjs'
1834 )
1835 expect(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 workerChoiceStrategy
1838 ).nextWorkerNodeKey
1839 ).toBeDefined()
1840 expect(
1841 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1842 workerChoiceStrategy
1843 ).previousWorkerNodeKey
1844 ).toBeDefined()
1845 expect(
1846 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1847 workerChoiceStrategy
1848 ).defaultWorkerWeight
1849 ).toBeDefined()
1850 expect(
1851 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1852 workerChoiceStrategy
1853 ).workerNodeVirtualTaskRunTime
1854 ).toBeDefined()
1855 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1856 expect(
1857 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1858 pool.workerChoiceStrategyContext.workerChoiceStrategy
1859 ).nextWorkerNodeKey
1860 ).toBe(0)
1861 expect(
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 pool.workerChoiceStrategyContext.workerChoiceStrategy
1864 ).previousWorkerNodeKey
1865 ).toBe(0)
1866 expect(
1867 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1868 pool.workerChoiceStrategyContext.workerChoiceStrategy
1869 ).defaultWorkerWeight
1870 ).toBeGreaterThan(0)
1871 expect(
1872 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1873 pool.workerChoiceStrategyContext.workerChoiceStrategy
1874 ).workerNodeVirtualTaskRunTime
1875 ).toBe(0)
1876 await pool.destroy()
1877 pool = new DynamicThreadPool(
1878 min,
1879 max,
1880 './tests/worker-files/thread/testWorker.mjs'
1881 )
1882 expect(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1884 workerChoiceStrategy
1885 ).nextWorkerNodeKey
1886 ).toBeDefined()
1887 expect(
1888 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1889 workerChoiceStrategy
1890 ).previousWorkerNodeKey
1891 ).toBeDefined()
1892 expect(
1893 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1894 workerChoiceStrategy
1895 ).defaultWorkerWeight
1896 ).toBeDefined()
1897 expect(
1898 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1899 workerChoiceStrategy
1900 ).workerNodeVirtualTaskRunTime
1901 ).toBeDefined()
1902 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1903 expect(
1904 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1905 pool.workerChoiceStrategyContext.workerChoiceStrategy
1906 ).nextWorkerNodeKey
1907 ).toBe(0)
1908 expect(
1909 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1910 pool.workerChoiceStrategyContext.workerChoiceStrategy
1911 ).previousWorkerNodeKey
1912 ).toBe(0)
1913 expect(
1914 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1915 pool.workerChoiceStrategyContext.workerChoiceStrategy
1916 ).defaultWorkerWeight
1917 ).toBeGreaterThan(0)
1918 expect(
1919 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1920 pool.workerChoiceStrategyContext.workerChoiceStrategy
1921 ).workerNodeVirtualTaskRunTime
1922 ).toBe(0)
1923 // We need to clean up the resources after our test
1924 await pool.destroy()
1925 })
1926
1927 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1928 const workerChoiceStrategy =
1929 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1930 let pool = new FixedThreadPool(
1931 max,
1932 './tests/worker-files/thread/testWorker.mjs',
1933 { workerChoiceStrategy }
1934 )
1935 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1936 dynamicWorkerUsage: false,
1937 dynamicWorkerReady: true
1938 })
1939 await pool.destroy()
1940 pool = new DynamicThreadPool(
1941 min,
1942 max,
1943 './tests/worker-files/thread/testWorker.mjs',
1944 { workerChoiceStrategy }
1945 )
1946 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1947 dynamicWorkerUsage: false,
1948 dynamicWorkerReady: true
1949 })
1950 // We need to clean up the resources after our test
1951 await pool.destroy()
1952 })
1953
1954 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1955 const workerChoiceStrategy =
1956 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1957 let pool = new FixedThreadPool(
1958 max,
1959 './tests/worker-files/thread/testWorker.mjs',
1960 { workerChoiceStrategy }
1961 )
1962 expect(
1963 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1964 ).toStrictEqual({
1965 runTime: {
1966 aggregate: true,
1967 average: true,
1968 median: false
1969 },
1970 waitTime: {
1971 aggregate: false,
1972 average: false,
1973 median: false
1974 },
1975 elu: {
1976 aggregate: false,
1977 average: false,
1978 median: false
1979 }
1980 })
1981 await pool.destroy()
1982 pool = new DynamicThreadPool(
1983 min,
1984 max,
1985 './tests/worker-files/thread/testWorker.mjs',
1986 { workerChoiceStrategy }
1987 )
1988 expect(
1989 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1990 ).toStrictEqual({
1991 runTime: {
1992 aggregate: true,
1993 average: true,
1994 median: false
1995 },
1996 waitTime: {
1997 aggregate: false,
1998 average: false,
1999 median: false
2000 },
2001 elu: {
2002 aggregate: false,
2003 average: false,
2004 median: false
2005 }
2006 })
2007 // We need to clean up the resources after our test
2008 await pool.destroy()
2009 })
2010
2011 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
2012 const pool = new FixedThreadPool(
2013 max,
2014 './tests/worker-files/thread/testWorker.mjs',
2015 {
2016 workerChoiceStrategy:
2017 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2018 }
2019 )
2020 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2021 const promises = new Set()
2022 const maxMultiplier = 2
2023 for (let i = 0; i < max * maxMultiplier; i++) {
2024 promises.add(pool.execute())
2025 }
2026 await Promise.all(promises)
2027 for (const workerNode of pool.workerNodes) {
2028 expect(workerNode.usage).toStrictEqual({
2029 tasks: {
2030 executed: expect.any(Number),
2031 executing: 0,
2032 queued: 0,
2033 maxQueued: 0,
2034 sequentiallyStolen: 0,
2035 stolen: 0,
2036 failed: 0
2037 },
2038 runTime: expect.objectContaining({
2039 history: expect.any(CircularArray)
2040 }),
2041 waitTime: {
2042 history: new CircularArray()
2043 },
2044 elu: {
2045 idle: {
2046 history: new CircularArray()
2047 },
2048 active: {
2049 history: new CircularArray()
2050 }
2051 }
2052 })
2053 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2054 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2055 max * maxMultiplier
2056 )
2057 }
2058 expect(
2059 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2060 pool.workerChoiceStrategyContext.workerChoiceStrategy
2061 ).defaultWorkerWeight
2062 ).toBeGreaterThan(0)
2063 expect(
2064 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2065 pool.workerChoiceStrategyContext.workerChoiceStrategy
2066 ).roundId
2067 ).toBe(0)
2068 expect(
2069 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategyContext.workerChoiceStrategy
2071 ).workerNodeId
2072 ).toBe(0)
2073 expect(
2074 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2075 pool.workerChoiceStrategyContext.workerChoiceStrategy
2076 ).nextWorkerNodeKey
2077 ).toBe(0)
2078 expect(
2079 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2080 pool.workerChoiceStrategyContext.workerChoiceStrategy
2081 ).previousWorkerNodeKey
2082 ).toEqual(expect.any(Number))
2083 expect(
2084 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2085 pool.workerChoiceStrategyContext.workerChoiceStrategy
2086 ).roundWeights
2087 ).toStrictEqual([
2088 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2089 pool.workerChoiceStrategyContext.workerChoiceStrategy
2090 ).defaultWorkerWeight
2091 ])
2092 // We need to clean up the resources after our test
2093 await pool.destroy()
2094 })
2095
2096 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2097 const pool = new DynamicThreadPool(
2098 min,
2099 max,
2100 './tests/worker-files/thread/testWorker.mjs',
2101 {
2102 workerChoiceStrategy:
2103 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2104 }
2105 )
2106 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2107 const promises = new Set()
2108 const maxMultiplier = 2
2109 for (let i = 0; i < max * maxMultiplier; i++) {
2110 promises.add(pool.execute())
2111 }
2112 await Promise.all(promises)
2113 for (const workerNode of pool.workerNodes) {
2114 expect(workerNode.usage).toStrictEqual({
2115 tasks: {
2116 executed: expect.any(Number),
2117 executing: 0,
2118 queued: 0,
2119 maxQueued: 0,
2120 sequentiallyStolen: 0,
2121 stolen: 0,
2122 failed: 0
2123 },
2124 runTime: expect.objectContaining({
2125 history: expect.any(CircularArray)
2126 }),
2127 waitTime: {
2128 history: new CircularArray()
2129 },
2130 elu: {
2131 idle: {
2132 history: new CircularArray()
2133 },
2134 active: {
2135 history: new CircularArray()
2136 }
2137 }
2138 })
2139 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2140 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2141 max * maxMultiplier
2142 )
2143 }
2144 expect(
2145 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2146 pool.workerChoiceStrategyContext.workerChoiceStrategy
2147 ).defaultWorkerWeight
2148 ).toBeGreaterThan(0)
2149 expect(
2150 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategy
2152 ).roundId
2153 ).toBe(0)
2154 expect(
2155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategy
2157 ).workerNodeId
2158 ).toBe(0)
2159 expect(
2160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2161 pool.workerChoiceStrategyContext.workerChoiceStrategy
2162 ).nextWorkerNodeKey
2163 ).toBe(0)
2164 expect(
2165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2166 pool.workerChoiceStrategyContext.workerChoiceStrategy
2167 ).previousWorkerNodeKey
2168 ).toEqual(expect.any(Number))
2169 expect(
2170 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2171 pool.workerChoiceStrategyContext.workerChoiceStrategy
2172 ).roundWeights
2173 ).toStrictEqual([
2174 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2175 pool.workerChoiceStrategyContext.workerChoiceStrategy
2176 ).defaultWorkerWeight
2177 ])
2178 // We need to clean up the resources after our test
2179 await pool.destroy()
2180 })
2181
2182 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2183 const workerChoiceStrategy =
2184 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2185 let pool = new FixedThreadPool(
2186 max,
2187 './tests/worker-files/thread/testWorker.mjs'
2188 )
2189 expect(
2190 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2191 workerChoiceStrategy
2192 ).roundId
2193 ).toBeDefined()
2194 expect(
2195 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2196 workerChoiceStrategy
2197 ).workerNodeId
2198 ).toBeDefined()
2199 expect(
2200 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2201 workerChoiceStrategy
2202 ).nextWorkerNodeKey
2203 ).toBeDefined()
2204 expect(
2205 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2206 workerChoiceStrategy
2207 ).previousWorkerNodeKey
2208 ).toBeDefined()
2209 expect(
2210 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2211 workerChoiceStrategy
2212 ).defaultWorkerWeight
2213 ).toBeDefined()
2214 expect(
2215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2216 workerChoiceStrategy
2217 ).roundWeights
2218 ).toBeDefined()
2219 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2220 expect(
2221 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2222 pool.workerChoiceStrategyContext.workerChoiceStrategy
2223 ).roundId
2224 ).toBe(0)
2225 expect(
2226 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2227 pool.workerChoiceStrategyContext.workerChoiceStrategy
2228 ).workerNodeId
2229 ).toBe(0)
2230 expect(
2231 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2232 pool.workerChoiceStrategyContext.workerChoiceStrategy
2233 ).nextWorkerNodeKey
2234 ).toBe(0)
2235 expect(
2236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2237 pool.workerChoiceStrategyContext.workerChoiceStrategy
2238 ).previousWorkerNodeKey
2239 ).toBe(0)
2240 expect(
2241 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2242 pool.workerChoiceStrategyContext.workerChoiceStrategy
2243 ).defaultWorkerWeight
2244 ).toBeGreaterThan(0)
2245 expect(
2246 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2247 pool.workerChoiceStrategyContext.workerChoiceStrategy
2248 ).roundWeights
2249 ).toStrictEqual([
2250 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2251 pool.workerChoiceStrategyContext.workerChoiceStrategy
2252 ).defaultWorkerWeight
2253 ])
2254 await pool.destroy()
2255 pool = new DynamicThreadPool(
2256 min,
2257 max,
2258 './tests/worker-files/thread/testWorker.mjs'
2259 )
2260 expect(
2261 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2262 workerChoiceStrategy
2263 ).roundId
2264 ).toBeDefined()
2265 expect(
2266 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2267 workerChoiceStrategy
2268 ).workerNodeId
2269 ).toBeDefined()
2270 expect(
2271 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2272 workerChoiceStrategy
2273 ).nextWorkerNodeKey
2274 ).toBeDefined()
2275 expect(
2276 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2277 workerChoiceStrategy
2278 ).previousWorkerNodeKey
2279 ).toBeDefined()
2280 expect(
2281 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2282 workerChoiceStrategy
2283 ).defaultWorkerWeight
2284 ).toBeDefined()
2285 expect(
2286 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2287 workerChoiceStrategy
2288 ).roundWeights
2289 ).toBeDefined()
2290 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2291 expect(
2292 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2293 pool.workerChoiceStrategyContext.workerChoiceStrategy
2294 ).roundId
2295 ).toBe(0)
2296 expect(
2297 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2298 pool.workerChoiceStrategyContext.workerChoiceStrategy
2299 ).workerNodeId
2300 ).toBe(0)
2301 expect(
2302 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2303 pool.workerChoiceStrategyContext.workerChoiceStrategy
2304 ).nextWorkerNodeKey
2305 ).toBe(0)
2306 expect(
2307 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2308 pool.workerChoiceStrategyContext.workerChoiceStrategy
2309 ).previousWorkerNodeKey
2310 ).toBe(0)
2311 expect(
2312 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2313 pool.workerChoiceStrategyContext.workerChoiceStrategy
2314 ).defaultWorkerWeight
2315 ).toBeGreaterThan(0)
2316 expect(
2317 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2318 pool.workerChoiceStrategyContext.workerChoiceStrategy
2319 ).roundWeights
2320 ).toStrictEqual([
2321 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2322 pool.workerChoiceStrategyContext.workerChoiceStrategy
2323 ).defaultWorkerWeight
2324 ])
2325 // We need to clean up the resources after our test
2326 await pool.destroy()
2327 })
2328
2329 it('Verify unknown strategy throw error', () => {
2330 expect(
2331 () =>
2332 new DynamicThreadPool(
2333 min,
2334 max,
2335 './tests/worker-files/thread/testWorker.mjs',
2336 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2337 )
2338 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2339 })
2340 })