fix: ensure worker choice is retried at least the pool max size
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
70 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
71 retries: pool.info.maxSize,
72 runTime: { median: false },
73 waitTime: { median: false },
74 elu: { median: false }
75 })
76 await pool.destroy()
77 }
78 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
79 const pool = new DynamicClusterPool(
80 min,
81 max,
82 './tests/worker-files/cluster/testWorker.js'
83 )
84 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
85 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
86 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
87 workerChoiceStrategy
88 )
89 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
90 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
91 retries: pool.info.maxSize,
92 runTime: { median: false },
93 waitTime: { median: false },
94 elu: { median: false }
95 })
96 await pool.destroy()
97 }
98 })
99
100 it('Verify available strategies default internals at pool creation', async () => {
101 const pool = new FixedThreadPool(
102 max,
103 './tests/worker-files/thread/testWorker.mjs'
104 )
105 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
106 expect(
107 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
108 workerChoiceStrategy
109 ).nextWorkerNodeKey
110 ).toBe(0)
111 expect(
112 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
113 workerChoiceStrategy
114 ).previousWorkerNodeKey
115 ).toBe(0)
116 if (
117 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
118 ) {
119 expect(
120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
121 workerChoiceStrategy
122 ).defaultWorkerWeight
123 ).toBeGreaterThan(0)
124 expect(
125 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
126 workerChoiceStrategy
127 ).workerNodeVirtualTaskRunTime
128 ).toBe(0)
129 } else if (
130 workerChoiceStrategy ===
131 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
132 ) {
133 expect(
134 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
135 workerChoiceStrategy
136 ).defaultWorkerWeight
137 ).toBeGreaterThan(0)
138 expect(
139 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
140 workerChoiceStrategy
141 ).workerNodeVirtualTaskRunTime
142 ).toBe(0)
143 expect(
144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
145 workerChoiceStrategy
146 ).roundId
147 ).toBe(0)
148 expect(
149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
150 workerChoiceStrategy
151 ).workerNodeId
152 ).toBe(0)
153 expect(
154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
155 workerChoiceStrategy
156 ).roundWeights
157 ).toStrictEqual([
158 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
159 workerChoiceStrategy
160 ).defaultWorkerWeight
161 ])
162 }
163 }
164 await pool.destroy()
165 })
166
167 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
168 const pool = new DynamicThreadPool(
169 min,
170 max,
171 './tests/worker-files/thread/testWorker.mjs'
172 )
173 expect(pool.starting).toBe(false)
174 expect(pool.workerNodes.length).toBe(min)
175 const maxMultiplier = 10000
176 const promises = new Set()
177 for (let i = 0; i < max * maxMultiplier; i++) {
178 promises.add(pool.execute())
179 }
180 await Promise.all(promises)
181 expect(pool.workerNodes.length).toBe(max)
182 // We need to clean up the resources after our test
183 await pool.destroy()
184 })
185
186 it('Verify ROUND_ROBIN strategy default policy', async () => {
187 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
188 let pool = new FixedThreadPool(
189 max,
190 './tests/worker-files/thread/testWorker.mjs',
191 { workerChoiceStrategy }
192 )
193 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
194 dynamicWorkerUsage: false,
195 dynamicWorkerReady: true
196 })
197 await pool.destroy()
198 pool = new DynamicThreadPool(
199 min,
200 max,
201 './tests/worker-files/thread/testWorker.mjs',
202 { workerChoiceStrategy }
203 )
204 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
205 dynamicWorkerUsage: false,
206 dynamicWorkerReady: true
207 })
208 // We need to clean up the resources after our test
209 await pool.destroy()
210 })
211
212 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
213 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
214 let pool = new FixedThreadPool(
215 max,
216 './tests/worker-files/thread/testWorker.mjs',
217 { workerChoiceStrategy }
218 )
219 expect(
220 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
221 ).toStrictEqual({
222 runTime: {
223 aggregate: false,
224 average: false,
225 median: false
226 },
227 waitTime: {
228 aggregate: false,
229 average: false,
230 median: false
231 },
232 elu: {
233 aggregate: false,
234 average: false,
235 median: false
236 }
237 })
238 await pool.destroy()
239 pool = new DynamicThreadPool(
240 min,
241 max,
242 './tests/worker-files/thread/testWorker.mjs',
243 { workerChoiceStrategy }
244 )
245 expect(
246 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
247 ).toStrictEqual({
248 runTime: {
249 aggregate: false,
250 average: false,
251 median: false
252 },
253 waitTime: {
254 aggregate: false,
255 average: false,
256 median: false
257 },
258 elu: {
259 aggregate: false,
260 average: false,
261 median: false
262 }
263 })
264 // We need to clean up the resources after our test
265 await pool.destroy()
266 })
267
268 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
269 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
270 const pool = new FixedThreadPool(
271 max,
272 './tests/worker-files/thread/testWorker.mjs',
273 { workerChoiceStrategy }
274 )
275 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
276 const promises = new Set()
277 const maxMultiplier = 2
278 for (let i = 0; i < max * maxMultiplier; i++) {
279 promises.add(pool.execute())
280 }
281 await Promise.all(promises)
282 for (const workerNode of pool.workerNodes) {
283 expect(workerNode.usage).toStrictEqual({
284 tasks: {
285 executed: maxMultiplier,
286 executing: 0,
287 queued: 0,
288 maxQueued: 0,
289 sequentiallyStolen: 0,
290 stolen: 0,
291 failed: 0
292 },
293 runTime: {
294 history: new CircularArray()
295 },
296 waitTime: {
297 history: new CircularArray()
298 },
299 elu: {
300 idle: {
301 history: new CircularArray()
302 },
303 active: {
304 history: new CircularArray()
305 }
306 }
307 })
308 }
309 expect(
310 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
311 pool.workerChoiceStrategyContext.workerChoiceStrategy
312 ).nextWorkerNodeKey
313 ).toBe(0)
314 expect(
315 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
316 pool.workerChoiceStrategyContext.workerChoiceStrategy
317 ).previousWorkerNodeKey
318 ).toBe(pool.workerNodes.length - 1)
319 // We need to clean up the resources after our test
320 await pool.destroy()
321 })
322
323 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
324 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
325 const pool = new DynamicThreadPool(
326 min,
327 max,
328 './tests/worker-files/thread/testWorker.mjs',
329 { workerChoiceStrategy }
330 )
331 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
332 const promises = new Set()
333 const maxMultiplier = 2
334 for (let i = 0; i < max * maxMultiplier; i++) {
335 promises.add(pool.execute())
336 }
337 await Promise.all(promises)
338 for (const workerNode of pool.workerNodes) {
339 expect(workerNode.usage).toStrictEqual({
340 tasks: {
341 executed: expect.any(Number),
342 executing: 0,
343 queued: 0,
344 maxQueued: 0,
345 sequentiallyStolen: 0,
346 stolen: 0,
347 failed: 0
348 },
349 runTime: {
350 history: new CircularArray()
351 },
352 waitTime: {
353 history: new CircularArray()
354 },
355 elu: {
356 idle: {
357 history: new CircularArray()
358 },
359 active: {
360 history: new CircularArray()
361 }
362 }
363 })
364 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
365 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
366 max * maxMultiplier
367 )
368 }
369 expect(
370 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
371 pool.workerChoiceStrategyContext.workerChoiceStrategy
372 ).nextWorkerNodeKey
373 ).toBe(0)
374 expect(
375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
376 pool.workerChoiceStrategyContext.workerChoiceStrategy
377 ).previousWorkerNodeKey
378 ).toBe(pool.workerNodes.length - 1)
379 // We need to clean up the resources after our test
380 await pool.destroy()
381 })
382
383 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
385 let pool = new FixedClusterPool(
386 max,
387 './tests/worker-files/cluster/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 let results = new Set()
391 for (let i = 0; i < max; i++) {
392 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
393 }
394 expect(results.size).toBe(max)
395 await pool.destroy()
396 pool = new FixedThreadPool(
397 max,
398 './tests/worker-files/thread/testWorker.mjs',
399 { workerChoiceStrategy }
400 )
401 results = new Set()
402 for (let i = 0; i < max; i++) {
403 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
404 }
405 expect(results.size).toBe(max)
406 await pool.destroy()
407 })
408
409 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
410 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
411 let pool = new FixedThreadPool(
412 max,
413 './tests/worker-files/thread/testWorker.mjs',
414 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
415 )
416 expect(
417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
418 pool.workerChoiceStrategyContext.workerChoiceStrategy
419 ).nextWorkerNodeKey
420 ).toBeDefined()
421 expect(
422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
423 pool.workerChoiceStrategyContext.workerChoiceStrategy
424 ).previousWorkerNodeKey
425 ).toBeDefined()
426 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
427 expect(
428 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
429 pool.workerChoiceStrategyContext.workerChoiceStrategy
430 ).nextWorkerNodeKey
431 ).toBe(0)
432 expect(
433 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
434 pool.workerChoiceStrategyContext.workerChoiceStrategy
435 ).previousWorkerNodeKey
436 ).toBe(0)
437 await pool.destroy()
438 pool = new DynamicThreadPool(
439 min,
440 max,
441 './tests/worker-files/thread/testWorker.mjs',
442 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
443 )
444 expect(
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
447 ).nextWorkerNodeKey
448 ).toBeDefined()
449 expect(
450 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
451 pool.workerChoiceStrategyContext.workerChoiceStrategy
452 ).previousWorkerNodeKey
453 ).toBeDefined()
454 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
455 expect(
456 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
457 pool.workerChoiceStrategyContext.workerChoiceStrategy
458 ).nextWorkerNodeKey
459 ).toBe(0)
460 expect(
461 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
462 pool.workerChoiceStrategyContext.workerChoiceStrategy
463 ).previousWorkerNodeKey
464 ).toBe(0)
465 // We need to clean up the resources after our test
466 await pool.destroy()
467 })
468
469 it('Verify LEAST_USED strategy default policy', async () => {
470 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
471 let pool = new FixedThreadPool(
472 max,
473 './tests/worker-files/thread/testWorker.mjs',
474 { workerChoiceStrategy }
475 )
476 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
477 dynamicWorkerUsage: false,
478 dynamicWorkerReady: true
479 })
480 await pool.destroy()
481 pool = new DynamicThreadPool(
482 min,
483 max,
484 './tests/worker-files/thread/testWorker.mjs',
485 { workerChoiceStrategy }
486 )
487 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
488 dynamicWorkerUsage: false,
489 dynamicWorkerReady: true
490 })
491 // We need to clean up the resources after our test
492 await pool.destroy()
493 })
494
495 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
496 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
497 let pool = new FixedThreadPool(
498 max,
499 './tests/worker-files/thread/testWorker.mjs',
500 { workerChoiceStrategy }
501 )
502 expect(
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
504 ).toStrictEqual({
505 runTime: {
506 aggregate: false,
507 average: false,
508 median: false
509 },
510 waitTime: {
511 aggregate: false,
512 average: false,
513 median: false
514 },
515 elu: {
516 aggregate: false,
517 average: false,
518 median: false
519 }
520 })
521 await pool.destroy()
522 pool = new DynamicThreadPool(
523 min,
524 max,
525 './tests/worker-files/thread/testWorker.mjs',
526 { workerChoiceStrategy }
527 )
528 expect(
529 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
530 ).toStrictEqual({
531 runTime: {
532 aggregate: false,
533 average: false,
534 median: false
535 },
536 waitTime: {
537 aggregate: false,
538 average: false,
539 median: false
540 },
541 elu: {
542 aggregate: false,
543 average: false,
544 median: false
545 }
546 })
547 // We need to clean up the resources after our test
548 await pool.destroy()
549 })
550
551 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
552 const pool = new FixedThreadPool(
553 max,
554 './tests/worker-files/thread/testWorker.mjs',
555 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
556 )
557 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
558 const promises = new Set()
559 const maxMultiplier = 2
560 for (let i = 0; i < max * maxMultiplier; i++) {
561 promises.add(pool.execute())
562 }
563 await Promise.all(promises)
564 for (const workerNode of pool.workerNodes) {
565 expect(workerNode.usage).toStrictEqual({
566 tasks: {
567 executed: expect.any(Number),
568 executing: 0,
569 queued: 0,
570 maxQueued: 0,
571 sequentiallyStolen: 0,
572 stolen: 0,
573 failed: 0
574 },
575 runTime: {
576 history: new CircularArray()
577 },
578 waitTime: {
579 history: new CircularArray()
580 },
581 elu: {
582 idle: {
583 history: new CircularArray()
584 },
585 active: {
586 history: new CircularArray()
587 }
588 }
589 })
590 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
591 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
592 max * maxMultiplier
593 )
594 }
595 expect(
596 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
597 pool.workerChoiceStrategyContext.workerChoiceStrategy
598 ).nextWorkerNodeKey
599 ).toEqual(expect.any(Number))
600 expect(
601 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
602 pool.workerChoiceStrategyContext.workerChoiceStrategy
603 ).previousWorkerNodeKey
604 ).toEqual(expect.any(Number))
605 // We need to clean up the resources after our test
606 await pool.destroy()
607 })
608
609 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
610 const pool = new DynamicThreadPool(
611 min,
612 max,
613 './tests/worker-files/thread/testWorker.mjs',
614 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
615 )
616 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
617 const promises = new Set()
618 const maxMultiplier = 2
619 for (let i = 0; i < max * maxMultiplier; i++) {
620 promises.add(pool.execute())
621 }
622 await Promise.all(promises)
623 for (const workerNode of pool.workerNodes) {
624 expect(workerNode.usage).toStrictEqual({
625 tasks: {
626 executed: expect.any(Number),
627 executing: 0,
628 queued: 0,
629 maxQueued: 0,
630 sequentiallyStolen: 0,
631 stolen: 0,
632 failed: 0
633 },
634 runTime: {
635 history: new CircularArray()
636 },
637 waitTime: {
638 history: new CircularArray()
639 },
640 elu: {
641 idle: {
642 history: new CircularArray()
643 },
644 active: {
645 history: new CircularArray()
646 }
647 }
648 })
649 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
650 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
651 max * maxMultiplier
652 )
653 }
654 expect(
655 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
656 pool.workerChoiceStrategyContext.workerChoiceStrategy
657 ).nextWorkerNodeKey
658 ).toEqual(expect.any(Number))
659 expect(
660 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
661 pool.workerChoiceStrategyContext.workerChoiceStrategy
662 ).previousWorkerNodeKey
663 ).toEqual(expect.any(Number))
664 // We need to clean up the resources after our test
665 await pool.destroy()
666 })
667
668 it('Verify LEAST_BUSY strategy default policy', async () => {
669 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
670 let pool = new FixedThreadPool(
671 max,
672 './tests/worker-files/thread/testWorker.mjs',
673 { workerChoiceStrategy }
674 )
675 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
676 dynamicWorkerUsage: false,
677 dynamicWorkerReady: true
678 })
679 await pool.destroy()
680 pool = new DynamicThreadPool(
681 min,
682 max,
683 './tests/worker-files/thread/testWorker.mjs',
684 { workerChoiceStrategy }
685 )
686 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
687 dynamicWorkerUsage: false,
688 dynamicWorkerReady: true
689 })
690 // We need to clean up the resources after our test
691 await pool.destroy()
692 })
693
694 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
695 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
696 let pool = new FixedThreadPool(
697 max,
698 './tests/worker-files/thread/testWorker.mjs',
699 { workerChoiceStrategy }
700 )
701 expect(
702 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
703 ).toStrictEqual({
704 runTime: {
705 aggregate: true,
706 average: false,
707 median: false
708 },
709 waitTime: {
710 aggregate: true,
711 average: false,
712 median: false
713 },
714 elu: {
715 aggregate: false,
716 average: false,
717 median: false
718 }
719 })
720 await pool.destroy()
721 pool = new DynamicThreadPool(
722 min,
723 max,
724 './tests/worker-files/thread/testWorker.mjs',
725 { workerChoiceStrategy }
726 )
727 expect(
728 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
729 ).toStrictEqual({
730 runTime: {
731 aggregate: true,
732 average: false,
733 median: false
734 },
735 waitTime: {
736 aggregate: true,
737 average: false,
738 median: false
739 },
740 elu: {
741 aggregate: false,
742 average: false,
743 median: false
744 }
745 })
746 // We need to clean up the resources after our test
747 await pool.destroy()
748 })
749
750 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
751 const pool = new FixedThreadPool(
752 max,
753 './tests/worker-files/thread/testWorker.mjs',
754 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
755 )
756 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
757 const promises = new Set()
758 const maxMultiplier = 2
759 for (let i = 0; i < max * maxMultiplier; i++) {
760 promises.add(pool.execute())
761 }
762 await Promise.all(promises)
763 for (const workerNode of pool.workerNodes) {
764 expect(workerNode.usage).toStrictEqual({
765 tasks: {
766 executed: expect.any(Number),
767 executing: 0,
768 queued: 0,
769 maxQueued: 0,
770 sequentiallyStolen: 0,
771 stolen: 0,
772 failed: 0
773 },
774 runTime: expect.objectContaining({
775 history: expect.any(CircularArray)
776 }),
777 waitTime: expect.objectContaining({
778 history: expect.any(CircularArray)
779 }),
780 elu: {
781 idle: {
782 history: new CircularArray()
783 },
784 active: {
785 history: new CircularArray()
786 }
787 }
788 })
789 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
790 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
791 max * maxMultiplier
792 )
793 if (workerNode.usage.runTime.aggregate == null) {
794 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
795 } else {
796 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
797 }
798 if (workerNode.usage.waitTime.aggregate == null) {
799 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
800 } else {
801 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
802 }
803 }
804 expect(
805 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
806 pool.workerChoiceStrategyContext.workerChoiceStrategy
807 ).nextWorkerNodeKey
808 ).toEqual(expect.any(Number))
809 expect(
810 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
811 pool.workerChoiceStrategyContext.workerChoiceStrategy
812 ).previousWorkerNodeKey
813 ).toEqual(expect.any(Number))
814 // We need to clean up the resources after our test
815 await pool.destroy()
816 })
817
818 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
819 const pool = new DynamicThreadPool(
820 min,
821 max,
822 './tests/worker-files/thread/testWorker.mjs',
823 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
824 )
825 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
826 const promises = new Set()
827 const maxMultiplier = 2
828 for (let i = 0; i < max * maxMultiplier; i++) {
829 promises.add(pool.execute())
830 }
831 await Promise.all(promises)
832 for (const workerNode of pool.workerNodes) {
833 expect(workerNode.usage).toStrictEqual({
834 tasks: {
835 executed: expect.any(Number),
836 executing: 0,
837 queued: 0,
838 maxQueued: 0,
839 sequentiallyStolen: 0,
840 stolen: 0,
841 failed: 0
842 },
843 runTime: expect.objectContaining({
844 history: expect.any(CircularArray)
845 }),
846 waitTime: expect.objectContaining({
847 history: expect.any(CircularArray)
848 }),
849 elu: {
850 idle: {
851 history: new CircularArray()
852 },
853 active: {
854 history: new CircularArray()
855 }
856 }
857 })
858 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
859 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
860 max * maxMultiplier
861 )
862 if (workerNode.usage.runTime.aggregate == null) {
863 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
864 } else {
865 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
866 }
867 if (workerNode.usage.waitTime.aggregate == null) {
868 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
869 } else {
870 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
871 }
872 }
873 expect(
874 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
875 pool.workerChoiceStrategyContext.workerChoiceStrategy
876 ).nextWorkerNodeKey
877 ).toEqual(expect.any(Number))
878 expect(
879 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
880 pool.workerChoiceStrategyContext.workerChoiceStrategy
881 ).previousWorkerNodeKey
882 ).toEqual(expect.any(Number))
883 // We need to clean up the resources after our test
884 await pool.destroy()
885 })
886
887 it('Verify LEAST_ELU strategy default policy', async () => {
888 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
889 let pool = new FixedThreadPool(
890 max,
891 './tests/worker-files/thread/testWorker.mjs',
892 { workerChoiceStrategy }
893 )
894 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
895 dynamicWorkerUsage: false,
896 dynamicWorkerReady: true
897 })
898 await pool.destroy()
899 pool = new DynamicThreadPool(
900 min,
901 max,
902 './tests/worker-files/thread/testWorker.mjs',
903 { workerChoiceStrategy }
904 )
905 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
906 dynamicWorkerUsage: false,
907 dynamicWorkerReady: true
908 })
909 // We need to clean up the resources after our test
910 await pool.destroy()
911 })
912
913 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
914 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
915 let pool = new FixedThreadPool(
916 max,
917 './tests/worker-files/thread/testWorker.mjs',
918 { workerChoiceStrategy }
919 )
920 expect(
921 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
922 ).toStrictEqual({
923 runTime: {
924 aggregate: false,
925 average: false,
926 median: false
927 },
928 waitTime: {
929 aggregate: false,
930 average: false,
931 median: false
932 },
933 elu: {
934 aggregate: true,
935 average: false,
936 median: false
937 }
938 })
939 await pool.destroy()
940 pool = new DynamicThreadPool(
941 min,
942 max,
943 './tests/worker-files/thread/testWorker.mjs',
944 { workerChoiceStrategy }
945 )
946 expect(
947 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
948 ).toStrictEqual({
949 runTime: {
950 aggregate: false,
951 average: false,
952 median: false
953 },
954 waitTime: {
955 aggregate: false,
956 average: false,
957 median: false
958 },
959 elu: {
960 aggregate: true,
961 average: false,
962 median: false
963 }
964 })
965 // We need to clean up the resources after our test
966 await pool.destroy()
967 })
968
969 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
970 const pool = new FixedThreadPool(
971 max,
972 './tests/worker-files/thread/testWorker.mjs',
973 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
974 )
975 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
976 const promises = new Set()
977 const maxMultiplier = 2
978 for (let i = 0; i < max * maxMultiplier; i++) {
979 promises.add(pool.execute())
980 }
981 await Promise.all(promises)
982 for (const workerNode of pool.workerNodes) {
983 expect(workerNode.usage).toStrictEqual({
984 tasks: {
985 executed: expect.any(Number),
986 executing: 0,
987 queued: 0,
988 maxQueued: 0,
989 sequentiallyStolen: 0,
990 stolen: 0,
991 failed: 0
992 },
993 runTime: {
994 history: new CircularArray()
995 },
996 waitTime: {
997 history: new CircularArray()
998 },
999 elu: expect.objectContaining({
1000 idle: expect.objectContaining({
1001 history: expect.any(CircularArray)
1002 }),
1003 active: expect.objectContaining({
1004 history: expect.any(CircularArray)
1005 })
1006 })
1007 })
1008 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1009 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1010 max * maxMultiplier
1011 )
1012 if (workerNode.usage.elu.active.aggregate == null) {
1013 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1014 } else {
1015 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1016 }
1017 if (workerNode.usage.elu.idle.aggregate == null) {
1018 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1019 } else {
1020 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1021 }
1022 if (workerNode.usage.elu.utilization == null) {
1023 expect(workerNode.usage.elu.utilization).toBeUndefined()
1024 } else {
1025 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1026 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1027 }
1028 }
1029 expect(
1030 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1031 pool.workerChoiceStrategyContext.workerChoiceStrategy
1032 ).nextWorkerNodeKey
1033 ).toEqual(expect.any(Number))
1034 expect(
1035 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1036 pool.workerChoiceStrategyContext.workerChoiceStrategy
1037 ).previousWorkerNodeKey
1038 ).toEqual(expect.any(Number))
1039 // We need to clean up the resources after our test
1040 await pool.destroy()
1041 })
1042
1043 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1044 const pool = new DynamicThreadPool(
1045 min,
1046 max,
1047 './tests/worker-files/thread/testWorker.mjs',
1048 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1049 )
1050 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1051 const promises = new Set()
1052 const maxMultiplier = 2
1053 for (let i = 0; i < max * maxMultiplier; i++) {
1054 promises.add(pool.execute())
1055 }
1056 await Promise.all(promises)
1057 for (const workerNode of pool.workerNodes) {
1058 expect(workerNode.usage).toStrictEqual({
1059 tasks: {
1060 executed: expect.any(Number),
1061 executing: 0,
1062 queued: 0,
1063 maxQueued: 0,
1064 sequentiallyStolen: 0,
1065 stolen: 0,
1066 failed: 0
1067 },
1068 runTime: {
1069 history: new CircularArray()
1070 },
1071 waitTime: {
1072 history: new CircularArray()
1073 },
1074 elu: expect.objectContaining({
1075 idle: expect.objectContaining({
1076 history: expect.any(CircularArray)
1077 }),
1078 active: expect.objectContaining({
1079 history: expect.any(CircularArray)
1080 })
1081 })
1082 })
1083 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1084 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1085 max * maxMultiplier
1086 )
1087 if (workerNode.usage.elu.active.aggregate == null) {
1088 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1089 } else {
1090 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1091 }
1092 if (workerNode.usage.elu.idle.aggregate == null) {
1093 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1094 } else {
1095 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1096 }
1097 if (workerNode.usage.elu.utilization == null) {
1098 expect(workerNode.usage.elu.utilization).toBeUndefined()
1099 } else {
1100 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1101 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1102 }
1103 }
1104 expect(
1105 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1106 pool.workerChoiceStrategyContext.workerChoiceStrategy
1107 ).nextWorkerNodeKey
1108 ).toEqual(expect.any(Number))
1109 expect(
1110 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1111 pool.workerChoiceStrategyContext.workerChoiceStrategy
1112 ).previousWorkerNodeKey
1113 ).toEqual(expect.any(Number))
1114 // We need to clean up the resources after our test
1115 await pool.destroy()
1116 })
1117
1118 it('Verify FAIR_SHARE strategy default policy', async () => {
1119 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1120 let pool = new FixedThreadPool(
1121 max,
1122 './tests/worker-files/thread/testWorker.mjs',
1123 { workerChoiceStrategy }
1124 )
1125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1126 dynamicWorkerUsage: false,
1127 dynamicWorkerReady: true
1128 })
1129 await pool.destroy()
1130 pool = new DynamicThreadPool(
1131 min,
1132 max,
1133 './tests/worker-files/thread/testWorker.mjs',
1134 { workerChoiceStrategy }
1135 )
1136 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1137 dynamicWorkerUsage: false,
1138 dynamicWorkerReady: true
1139 })
1140 // We need to clean up the resources after our test
1141 await pool.destroy()
1142 })
1143
1144 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1145 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1146 let pool = new FixedThreadPool(
1147 max,
1148 './tests/worker-files/thread/testWorker.mjs',
1149 { workerChoiceStrategy }
1150 )
1151 expect(
1152 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1153 ).toStrictEqual({
1154 runTime: {
1155 aggregate: true,
1156 average: true,
1157 median: false
1158 },
1159 waitTime: {
1160 aggregate: false,
1161 average: false,
1162 median: false
1163 },
1164 elu: {
1165 aggregate: true,
1166 average: true,
1167 median: false
1168 }
1169 })
1170 await pool.destroy()
1171 pool = new DynamicThreadPool(
1172 min,
1173 max,
1174 './tests/worker-files/thread/testWorker.mjs',
1175 { workerChoiceStrategy }
1176 )
1177 expect(
1178 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1179 ).toStrictEqual({
1180 runTime: {
1181 aggregate: true,
1182 average: true,
1183 median: false
1184 },
1185 waitTime: {
1186 aggregate: false,
1187 average: false,
1188 median: false
1189 },
1190 elu: {
1191 aggregate: true,
1192 average: true,
1193 median: false
1194 }
1195 })
1196 // We need to clean up the resources after our test
1197 await pool.destroy()
1198 })
1199
1200 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1201 const pool = new FixedThreadPool(
1202 max,
1203 './tests/worker-files/thread/testWorker.mjs',
1204 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1205 )
1206 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1207 const promises = new Set()
1208 const maxMultiplier = 2
1209 for (let i = 0; i < max * maxMultiplier; i++) {
1210 promises.add(pool.execute())
1211 }
1212 await Promise.all(promises)
1213 for (const workerNode of pool.workerNodes) {
1214 expect(workerNode.usage).toStrictEqual({
1215 tasks: {
1216 executed: expect.any(Number),
1217 executing: 0,
1218 queued: 0,
1219 maxQueued: 0,
1220 sequentiallyStolen: 0,
1221 stolen: 0,
1222 failed: 0
1223 },
1224 runTime: expect.objectContaining({
1225 history: expect.any(CircularArray)
1226 }),
1227 waitTime: {
1228 history: new CircularArray()
1229 },
1230 elu: expect.objectContaining({
1231 idle: expect.objectContaining({
1232 history: expect.any(CircularArray)
1233 }),
1234 active: expect.objectContaining({
1235 history: expect.any(CircularArray)
1236 })
1237 })
1238 })
1239 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1240 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1241 max * maxMultiplier
1242 )
1243 if (workerNode.usage.runTime.aggregate == null) {
1244 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1245 } else {
1246 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1247 }
1248 if (workerNode.usage.runTime.average == null) {
1249 expect(workerNode.usage.runTime.average).toBeUndefined()
1250 } else {
1251 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1252 }
1253 if (workerNode.usage.elu.active.aggregate == null) {
1254 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1255 } else {
1256 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1257 }
1258 if (workerNode.usage.elu.idle.aggregate == null) {
1259 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1260 } else {
1261 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1262 }
1263 if (workerNode.usage.elu.utilization == null) {
1264 expect(workerNode.usage.elu.utilization).toBeUndefined()
1265 } else {
1266 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1267 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1268 }
1269 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1270 }
1271 expect(
1272 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1273 pool.workerChoiceStrategyContext.workerChoiceStrategy
1274 ).nextWorkerNodeKey
1275 ).toEqual(expect.any(Number))
1276 expect(
1277 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1278 pool.workerChoiceStrategyContext.workerChoiceStrategy
1279 ).previousWorkerNodeKey
1280 ).toEqual(expect.any(Number))
1281 // We need to clean up the resources after our test
1282 await pool.destroy()
1283 })
1284
1285 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1286 const pool = new DynamicThreadPool(
1287 min,
1288 max,
1289 './tests/worker-files/thread/testWorker.mjs',
1290 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1291 )
1292 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1293 const promises = new Set()
1294 const maxMultiplier = 2
1295 for (let i = 0; i < max * maxMultiplier; i++) {
1296 promises.add(pool.execute())
1297 }
1298 await Promise.all(promises)
1299 for (const workerNode of pool.workerNodes) {
1300 expect(workerNode.usage).toStrictEqual({
1301 tasks: {
1302 executed: expect.any(Number),
1303 executing: 0,
1304 queued: 0,
1305 maxQueued: 0,
1306 sequentiallyStolen: 0,
1307 stolen: 0,
1308 failed: 0
1309 },
1310 runTime: expect.objectContaining({
1311 history: expect.any(CircularArray)
1312 }),
1313 waitTime: {
1314 history: new CircularArray()
1315 },
1316 elu: expect.objectContaining({
1317 idle: expect.objectContaining({
1318 history: expect.any(CircularArray)
1319 }),
1320 active: expect.objectContaining({
1321 history: expect.any(CircularArray)
1322 })
1323 })
1324 })
1325 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1326 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1327 max * maxMultiplier
1328 )
1329 if (workerNode.usage.runTime.aggregate == null) {
1330 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1331 } else {
1332 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1333 }
1334 if (workerNode.usage.runTime.average == null) {
1335 expect(workerNode.usage.runTime.average).toBeUndefined()
1336 } else {
1337 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1338 }
1339 if (workerNode.usage.elu.active.aggregate == null) {
1340 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1341 } else {
1342 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1343 }
1344 if (workerNode.usage.elu.idle.aggregate == null) {
1345 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1346 } else {
1347 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1348 }
1349 if (workerNode.usage.elu.utilization == null) {
1350 expect(workerNode.usage.elu.utilization).toBeUndefined()
1351 } else {
1352 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1353 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1354 }
1355 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1356 }
1357 expect(
1358 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1359 pool.workerChoiceStrategyContext.workerChoiceStrategy
1360 ).nextWorkerNodeKey
1361 ).toEqual(expect.any(Number))
1362 expect(
1363 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1364 pool.workerChoiceStrategyContext.workerChoiceStrategy
1365 ).previousWorkerNodeKey
1366 ).toEqual(expect.any(Number))
1367 // We need to clean up the resources after our test
1368 await pool.destroy()
1369 })
1370
1371 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1372 const pool = new DynamicThreadPool(
1373 min,
1374 max,
1375 './tests/worker-files/thread/testWorker.mjs',
1376 {
1377 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1378 workerChoiceStrategyOptions: {
1379 runTime: { median: true }
1380 }
1381 }
1382 )
1383 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1384 const promises = new Set()
1385 const maxMultiplier = 2
1386 for (let i = 0; i < max * maxMultiplier; i++) {
1387 promises.add(pool.execute())
1388 }
1389 await Promise.all(promises)
1390 for (const workerNode of pool.workerNodes) {
1391 expect(workerNode.usage).toStrictEqual({
1392 tasks: {
1393 executed: expect.any(Number),
1394 executing: 0,
1395 queued: 0,
1396 maxQueued: 0,
1397 sequentiallyStolen: 0,
1398 stolen: 0,
1399 failed: 0
1400 },
1401 runTime: expect.objectContaining({
1402 history: expect.any(CircularArray)
1403 }),
1404 waitTime: {
1405 history: new CircularArray()
1406 },
1407 elu: expect.objectContaining({
1408 idle: expect.objectContaining({
1409 history: expect.any(CircularArray)
1410 }),
1411 active: expect.objectContaining({
1412 history: expect.any(CircularArray)
1413 })
1414 })
1415 })
1416 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1417 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1418 max * maxMultiplier
1419 )
1420 if (workerNode.usage.runTime.aggregate == null) {
1421 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1422 } else {
1423 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1424 }
1425 if (workerNode.usage.runTime.median == null) {
1426 expect(workerNode.usage.runTime.median).toBeUndefined()
1427 } else {
1428 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1429 }
1430 if (workerNode.usage.elu.active.aggregate == null) {
1431 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1432 } else {
1433 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1434 }
1435 if (workerNode.usage.elu.idle.aggregate == null) {
1436 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1437 } else {
1438 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1439 }
1440 if (workerNode.usage.elu.utilization == null) {
1441 expect(workerNode.usage.elu.utilization).toBeUndefined()
1442 } else {
1443 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1444 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1445 }
1446 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1447 }
1448 expect(
1449 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1450 pool.workerChoiceStrategyContext.workerChoiceStrategy
1451 ).nextWorkerNodeKey
1452 ).toEqual(expect.any(Number))
1453 expect(
1454 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1455 pool.workerChoiceStrategyContext.workerChoiceStrategy
1456 ).previousWorkerNodeKey
1457 ).toEqual(expect.any(Number))
1458 // We need to clean up the resources after our test
1459 await pool.destroy()
1460 })
1461
1462 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1463 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1464 let pool = new FixedThreadPool(
1465 max,
1466 './tests/worker-files/thread/testWorker.mjs'
1467 )
1468 for (const workerNode of pool.workerNodes) {
1469 workerNode.strategyData = {
1470 virtualTaskEndTimestamp: performance.now()
1471 }
1472 }
1473 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1474 for (const workerNode of pool.workerNodes) {
1475 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1476 }
1477 await pool.destroy()
1478 pool = new DynamicThreadPool(
1479 min,
1480 max,
1481 './tests/worker-files/thread/testWorker.mjs'
1482 )
1483 for (const workerNode of pool.workerNodes) {
1484 workerNode.strategyData = {
1485 virtualTaskEndTimestamp: performance.now()
1486 }
1487 }
1488 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1489 for (const workerNode of pool.workerNodes) {
1490 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1491 }
1492 // We need to clean up the resources after our test
1493 await pool.destroy()
1494 })
1495
1496 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1497 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1498 let pool = new FixedThreadPool(
1499 max,
1500 './tests/worker-files/thread/testWorker.mjs',
1501 { workerChoiceStrategy }
1502 )
1503 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1504 dynamicWorkerUsage: false,
1505 dynamicWorkerReady: true
1506 })
1507 await pool.destroy()
1508 pool = new DynamicThreadPool(
1509 min,
1510 max,
1511 './tests/worker-files/thread/testWorker.mjs',
1512 { workerChoiceStrategy }
1513 )
1514 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1515 dynamicWorkerUsage: false,
1516 dynamicWorkerReady: true
1517 })
1518 // We need to clean up the resources after our test
1519 await pool.destroy()
1520 })
1521
1522 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1523 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1524 let pool = new FixedThreadPool(
1525 max,
1526 './tests/worker-files/thread/testWorker.mjs',
1527 { workerChoiceStrategy }
1528 )
1529 expect(
1530 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1531 ).toStrictEqual({
1532 runTime: {
1533 aggregate: true,
1534 average: true,
1535 median: false
1536 },
1537 waitTime: {
1538 aggregate: false,
1539 average: false,
1540 median: false
1541 },
1542 elu: {
1543 aggregate: false,
1544 average: false,
1545 median: false
1546 }
1547 })
1548 await pool.destroy()
1549 pool = new DynamicThreadPool(
1550 min,
1551 max,
1552 './tests/worker-files/thread/testWorker.mjs',
1553 { workerChoiceStrategy }
1554 )
1555 expect(
1556 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1557 ).toStrictEqual({
1558 runTime: {
1559 aggregate: true,
1560 average: true,
1561 median: false
1562 },
1563 waitTime: {
1564 aggregate: false,
1565 average: false,
1566 median: false
1567 },
1568 elu: {
1569 aggregate: false,
1570 average: false,
1571 median: false
1572 }
1573 })
1574 // We need to clean up the resources after our test
1575 await pool.destroy()
1576 })
1577
1578 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1579 const pool = new FixedThreadPool(
1580 max,
1581 './tests/worker-files/thread/testWorker.mjs',
1582 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1583 )
1584 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1585 const promises = new Set()
1586 const maxMultiplier = 2
1587 for (let i = 0; i < max * maxMultiplier; i++) {
1588 promises.add(pool.execute())
1589 }
1590 await Promise.all(promises)
1591 for (const workerNode of pool.workerNodes) {
1592 expect(workerNode.usage).toStrictEqual({
1593 tasks: {
1594 executed: expect.any(Number),
1595 executing: 0,
1596 queued: 0,
1597 maxQueued: 0,
1598 sequentiallyStolen: 0,
1599 stolen: 0,
1600 failed: 0
1601 },
1602 runTime: expect.objectContaining({
1603 history: expect.any(CircularArray)
1604 }),
1605 waitTime: {
1606 history: new CircularArray()
1607 },
1608 elu: {
1609 idle: {
1610 history: new CircularArray()
1611 },
1612 active: {
1613 history: new CircularArray()
1614 }
1615 }
1616 })
1617 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1618 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1619 max * maxMultiplier
1620 )
1621 if (workerNode.usage.runTime.aggregate == null) {
1622 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1623 } else {
1624 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1625 }
1626 if (workerNode.usage.runTime.average == null) {
1627 expect(workerNode.usage.runTime.average).toBeUndefined()
1628 } else {
1629 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1630 }
1631 }
1632 expect(
1633 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1634 pool.workerChoiceStrategyContext.workerChoiceStrategy
1635 ).nextWorkerNodeKey
1636 ).toBe(0)
1637 expect(
1638 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1639 pool.workerChoiceStrategyContext.workerChoiceStrategy
1640 ).previousWorkerNodeKey
1641 ).toBe(0)
1642 expect(
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 pool.workerChoiceStrategyContext.workerChoiceStrategy
1645 ).defaultWorkerWeight
1646 ).toBeGreaterThan(0)
1647 expect(
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 ).workerNodeVirtualTaskRunTime
1651 ).toBeGreaterThanOrEqual(0)
1652 // We need to clean up the resources after our test
1653 await pool.destroy()
1654 })
1655
1656 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1657 const pool = new DynamicThreadPool(
1658 min,
1659 max,
1660 './tests/worker-files/thread/testWorker.mjs',
1661 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1662 )
1663 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1664 const promises = new Set()
1665 const maxMultiplier = 2
1666 for (let i = 0; i < max * maxMultiplier; i++) {
1667 promises.add(pool.execute())
1668 }
1669 await Promise.all(promises)
1670 for (const workerNode of pool.workerNodes) {
1671 expect(workerNode.usage).toStrictEqual({
1672 tasks: {
1673 executed: expect.any(Number),
1674 executing: 0,
1675 queued: 0,
1676 maxQueued: 0,
1677 sequentiallyStolen: 0,
1678 stolen: 0,
1679 failed: 0
1680 },
1681 runTime: expect.objectContaining({
1682 history: expect.any(CircularArray)
1683 }),
1684 waitTime: {
1685 history: new CircularArray()
1686 },
1687 elu: {
1688 idle: {
1689 history: new CircularArray()
1690 },
1691 active: {
1692 history: new CircularArray()
1693 }
1694 }
1695 })
1696 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1697 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1698 max * maxMultiplier
1699 )
1700 if (workerNode.usage.runTime.aggregate == null) {
1701 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1702 } else {
1703 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1704 }
1705 if (workerNode.usage.runTime.average == null) {
1706 expect(workerNode.usage.runTime.average).toBeUndefined()
1707 } else {
1708 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1709 }
1710 }
1711 expect(
1712 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1713 pool.workerChoiceStrategyContext.workerChoiceStrategy
1714 ).nextWorkerNodeKey
1715 ).toBe(0)
1716 expect(
1717 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1718 pool.workerChoiceStrategyContext.workerChoiceStrategy
1719 ).previousWorkerNodeKey
1720 ).toBe(0)
1721 expect(
1722 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1723 pool.workerChoiceStrategyContext.workerChoiceStrategy
1724 ).defaultWorkerWeight
1725 ).toBeGreaterThan(0)
1726 expect(
1727 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1728 pool.workerChoiceStrategyContext.workerChoiceStrategy
1729 ).workerNodeVirtualTaskRunTime
1730 ).toBeGreaterThanOrEqual(0)
1731 // We need to clean up the resources after our test
1732 await pool.destroy()
1733 })
1734
1735 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1736 const pool = new DynamicThreadPool(
1737 min,
1738 max,
1739 './tests/worker-files/thread/testWorker.mjs',
1740 {
1741 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1742 workerChoiceStrategyOptions: {
1743 runTime: { median: true }
1744 }
1745 }
1746 )
1747 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1748 const promises = new Set()
1749 const maxMultiplier = 2
1750 for (let i = 0; i < max * maxMultiplier; i++) {
1751 promises.add(pool.execute())
1752 }
1753 await Promise.all(promises)
1754 for (const workerNode of pool.workerNodes) {
1755 expect(workerNode.usage).toStrictEqual({
1756 tasks: {
1757 executed: expect.any(Number),
1758 executing: 0,
1759 queued: 0,
1760 maxQueued: 0,
1761 sequentiallyStolen: 0,
1762 stolen: 0,
1763 failed: 0
1764 },
1765 runTime: expect.objectContaining({
1766 history: expect.any(CircularArray)
1767 }),
1768 waitTime: {
1769 history: new CircularArray()
1770 },
1771 elu: {
1772 idle: {
1773 history: new CircularArray()
1774 },
1775 active: {
1776 history: new CircularArray()
1777 }
1778 }
1779 })
1780 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1781 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1782 max * maxMultiplier
1783 )
1784 if (workerNode.usage.runTime.aggregate == null) {
1785 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1786 } else {
1787 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1788 }
1789 if (workerNode.usage.runTime.median == null) {
1790 expect(workerNode.usage.runTime.median).toBeUndefined()
1791 } else {
1792 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1793 }
1794 }
1795 expect(
1796 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategy
1798 ).nextWorkerNodeKey
1799 ).toBe(0)
1800 expect(
1801 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1802 pool.workerChoiceStrategyContext.workerChoiceStrategy
1803 ).previousWorkerNodeKey
1804 ).toBe(0)
1805 expect(
1806 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1807 pool.workerChoiceStrategyContext.workerChoiceStrategy
1808 ).defaultWorkerWeight
1809 ).toBeGreaterThan(0)
1810 expect(
1811 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1812 pool.workerChoiceStrategyContext.workerChoiceStrategy
1813 ).workerNodeVirtualTaskRunTime
1814 ).toBeGreaterThanOrEqual(0)
1815 // We need to clean up the resources after our test
1816 await pool.destroy()
1817 })
1818
1819 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1820 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1821 let pool = new FixedThreadPool(
1822 max,
1823 './tests/worker-files/thread/testWorker.mjs'
1824 )
1825 expect(
1826 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1827 workerChoiceStrategy
1828 ).nextWorkerNodeKey
1829 ).toBeDefined()
1830 expect(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1832 workerChoiceStrategy
1833 ).previousWorkerNodeKey
1834 ).toBeDefined()
1835 expect(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 workerChoiceStrategy
1838 ).defaultWorkerWeight
1839 ).toBeDefined()
1840 expect(
1841 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1842 workerChoiceStrategy
1843 ).workerNodeVirtualTaskRunTime
1844 ).toBeDefined()
1845 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1846 expect(
1847 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1848 pool.workerChoiceStrategyContext.workerChoiceStrategy
1849 ).nextWorkerNodeKey
1850 ).toBe(0)
1851 expect(
1852 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1853 pool.workerChoiceStrategyContext.workerChoiceStrategy
1854 ).previousWorkerNodeKey
1855 ).toBe(0)
1856 expect(
1857 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1858 pool.workerChoiceStrategyContext.workerChoiceStrategy
1859 ).defaultWorkerWeight
1860 ).toBeGreaterThan(0)
1861 expect(
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 pool.workerChoiceStrategyContext.workerChoiceStrategy
1864 ).workerNodeVirtualTaskRunTime
1865 ).toBe(0)
1866 await pool.destroy()
1867 pool = new DynamicThreadPool(
1868 min,
1869 max,
1870 './tests/worker-files/thread/testWorker.mjs'
1871 )
1872 expect(
1873 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1874 workerChoiceStrategy
1875 ).nextWorkerNodeKey
1876 ).toBeDefined()
1877 expect(
1878 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1879 workerChoiceStrategy
1880 ).previousWorkerNodeKey
1881 ).toBeDefined()
1882 expect(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1884 workerChoiceStrategy
1885 ).defaultWorkerWeight
1886 ).toBeDefined()
1887 expect(
1888 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1889 workerChoiceStrategy
1890 ).workerNodeVirtualTaskRunTime
1891 ).toBeDefined()
1892 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1893 expect(
1894 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1895 pool.workerChoiceStrategyContext.workerChoiceStrategy
1896 ).nextWorkerNodeKey
1897 ).toBe(0)
1898 expect(
1899 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1900 pool.workerChoiceStrategyContext.workerChoiceStrategy
1901 ).previousWorkerNodeKey
1902 ).toBe(0)
1903 expect(
1904 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1905 pool.workerChoiceStrategyContext.workerChoiceStrategy
1906 ).defaultWorkerWeight
1907 ).toBeGreaterThan(0)
1908 expect(
1909 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1910 pool.workerChoiceStrategyContext.workerChoiceStrategy
1911 ).workerNodeVirtualTaskRunTime
1912 ).toBe(0)
1913 // We need to clean up the resources after our test
1914 await pool.destroy()
1915 })
1916
1917 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1918 const workerChoiceStrategy =
1919 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1920 let pool = new FixedThreadPool(
1921 max,
1922 './tests/worker-files/thread/testWorker.mjs',
1923 { workerChoiceStrategy }
1924 )
1925 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1926 dynamicWorkerUsage: false,
1927 dynamicWorkerReady: true
1928 })
1929 await pool.destroy()
1930 pool = new DynamicThreadPool(
1931 min,
1932 max,
1933 './tests/worker-files/thread/testWorker.mjs',
1934 { workerChoiceStrategy }
1935 )
1936 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1937 dynamicWorkerUsage: false,
1938 dynamicWorkerReady: true
1939 })
1940 // We need to clean up the resources after our test
1941 await pool.destroy()
1942 })
1943
1944 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1945 const workerChoiceStrategy =
1946 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1947 let pool = new FixedThreadPool(
1948 max,
1949 './tests/worker-files/thread/testWorker.mjs',
1950 { workerChoiceStrategy }
1951 )
1952 expect(
1953 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1954 ).toStrictEqual({
1955 runTime: {
1956 aggregate: true,
1957 average: true,
1958 median: false
1959 },
1960 waitTime: {
1961 aggregate: false,
1962 average: false,
1963 median: false
1964 },
1965 elu: {
1966 aggregate: false,
1967 average: false,
1968 median: false
1969 }
1970 })
1971 await pool.destroy()
1972 pool = new DynamicThreadPool(
1973 min,
1974 max,
1975 './tests/worker-files/thread/testWorker.mjs',
1976 { workerChoiceStrategy }
1977 )
1978 expect(
1979 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1980 ).toStrictEqual({
1981 runTime: {
1982 aggregate: true,
1983 average: true,
1984 median: false
1985 },
1986 waitTime: {
1987 aggregate: false,
1988 average: false,
1989 median: false
1990 },
1991 elu: {
1992 aggregate: false,
1993 average: false,
1994 median: false
1995 }
1996 })
1997 // We need to clean up the resources after our test
1998 await pool.destroy()
1999 })
2000
2001 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
2002 const pool = new FixedThreadPool(
2003 max,
2004 './tests/worker-files/thread/testWorker.mjs',
2005 {
2006 workerChoiceStrategy:
2007 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2008 }
2009 )
2010 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2011 const promises = new Set()
2012 const maxMultiplier = 2
2013 for (let i = 0; i < max * maxMultiplier; i++) {
2014 promises.add(pool.execute())
2015 }
2016 await Promise.all(promises)
2017 for (const workerNode of pool.workerNodes) {
2018 expect(workerNode.usage).toStrictEqual({
2019 tasks: {
2020 executed: expect.any(Number),
2021 executing: 0,
2022 queued: 0,
2023 maxQueued: 0,
2024 sequentiallyStolen: 0,
2025 stolen: 0,
2026 failed: 0
2027 },
2028 runTime: expect.objectContaining({
2029 history: expect.any(CircularArray)
2030 }),
2031 waitTime: {
2032 history: new CircularArray()
2033 },
2034 elu: {
2035 idle: {
2036 history: new CircularArray()
2037 },
2038 active: {
2039 history: new CircularArray()
2040 }
2041 }
2042 })
2043 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2044 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2045 max * maxMultiplier
2046 )
2047 }
2048 expect(
2049 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2050 pool.workerChoiceStrategyContext.workerChoiceStrategy
2051 ).defaultWorkerWeight
2052 ).toBeGreaterThan(0)
2053 expect(
2054 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2055 pool.workerChoiceStrategyContext.workerChoiceStrategy
2056 ).roundId
2057 ).toBe(0)
2058 expect(
2059 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2060 pool.workerChoiceStrategyContext.workerChoiceStrategy
2061 ).workerNodeId
2062 ).toBe(0)
2063 expect(
2064 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2065 pool.workerChoiceStrategyContext.workerChoiceStrategy
2066 ).nextWorkerNodeKey
2067 ).toBe(0)
2068 expect(
2069 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategyContext.workerChoiceStrategy
2071 ).previousWorkerNodeKey
2072 ).toEqual(expect.any(Number))
2073 expect(
2074 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2075 pool.workerChoiceStrategyContext.workerChoiceStrategy
2076 ).roundWeights
2077 ).toStrictEqual([
2078 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2079 pool.workerChoiceStrategyContext.workerChoiceStrategy
2080 ).defaultWorkerWeight
2081 ])
2082 // We need to clean up the resources after our test
2083 await pool.destroy()
2084 })
2085
2086 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2087 const pool = new DynamicThreadPool(
2088 min,
2089 max,
2090 './tests/worker-files/thread/testWorker.mjs',
2091 {
2092 workerChoiceStrategy:
2093 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2094 }
2095 )
2096 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2097 const promises = new Set()
2098 const maxMultiplier = 2
2099 for (let i = 0; i < max * maxMultiplier; i++) {
2100 promises.add(pool.execute())
2101 }
2102 await Promise.all(promises)
2103 for (const workerNode of pool.workerNodes) {
2104 expect(workerNode.usage).toStrictEqual({
2105 tasks: {
2106 executed: expect.any(Number),
2107 executing: 0,
2108 queued: 0,
2109 maxQueued: 0,
2110 sequentiallyStolen: 0,
2111 stolen: 0,
2112 failed: 0
2113 },
2114 runTime: expect.objectContaining({
2115 history: expect.any(CircularArray)
2116 }),
2117 waitTime: {
2118 history: new CircularArray()
2119 },
2120 elu: {
2121 idle: {
2122 history: new CircularArray()
2123 },
2124 active: {
2125 history: new CircularArray()
2126 }
2127 }
2128 })
2129 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2130 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2131 max * maxMultiplier
2132 )
2133 }
2134 expect(
2135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2136 pool.workerChoiceStrategyContext.workerChoiceStrategy
2137 ).defaultWorkerWeight
2138 ).toBeGreaterThan(0)
2139 expect(
2140 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2141 pool.workerChoiceStrategyContext.workerChoiceStrategy
2142 ).roundId
2143 ).toBe(0)
2144 expect(
2145 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2146 pool.workerChoiceStrategyContext.workerChoiceStrategy
2147 ).workerNodeId
2148 ).toBe(0)
2149 expect(
2150 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategy
2152 ).nextWorkerNodeKey
2153 ).toBe(0)
2154 expect(
2155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategy
2157 ).previousWorkerNodeKey
2158 ).toEqual(expect.any(Number))
2159 expect(
2160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2161 pool.workerChoiceStrategyContext.workerChoiceStrategy
2162 ).roundWeights
2163 ).toStrictEqual([
2164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2165 pool.workerChoiceStrategyContext.workerChoiceStrategy
2166 ).defaultWorkerWeight
2167 ])
2168 // We need to clean up the resources after our test
2169 await pool.destroy()
2170 })
2171
2172 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2173 const workerChoiceStrategy =
2174 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2175 let pool = new FixedThreadPool(
2176 max,
2177 './tests/worker-files/thread/testWorker.mjs'
2178 )
2179 expect(
2180 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2181 workerChoiceStrategy
2182 ).roundId
2183 ).toBeDefined()
2184 expect(
2185 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2186 workerChoiceStrategy
2187 ).workerNodeId
2188 ).toBeDefined()
2189 expect(
2190 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2191 workerChoiceStrategy
2192 ).nextWorkerNodeKey
2193 ).toBeDefined()
2194 expect(
2195 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2196 workerChoiceStrategy
2197 ).previousWorkerNodeKey
2198 ).toBeDefined()
2199 expect(
2200 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2201 workerChoiceStrategy
2202 ).defaultWorkerWeight
2203 ).toBeDefined()
2204 expect(
2205 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2206 workerChoiceStrategy
2207 ).roundWeights
2208 ).toBeDefined()
2209 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2210 expect(
2211 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2212 pool.workerChoiceStrategyContext.workerChoiceStrategy
2213 ).roundId
2214 ).toBe(0)
2215 expect(
2216 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2217 pool.workerChoiceStrategyContext.workerChoiceStrategy
2218 ).workerNodeId
2219 ).toBe(0)
2220 expect(
2221 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2222 pool.workerChoiceStrategyContext.workerChoiceStrategy
2223 ).nextWorkerNodeKey
2224 ).toBe(0)
2225 expect(
2226 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2227 pool.workerChoiceStrategyContext.workerChoiceStrategy
2228 ).previousWorkerNodeKey
2229 ).toBe(0)
2230 expect(
2231 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2232 pool.workerChoiceStrategyContext.workerChoiceStrategy
2233 ).defaultWorkerWeight
2234 ).toBeGreaterThan(0)
2235 expect(
2236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2237 pool.workerChoiceStrategyContext.workerChoiceStrategy
2238 ).roundWeights
2239 ).toStrictEqual([
2240 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2241 pool.workerChoiceStrategyContext.workerChoiceStrategy
2242 ).defaultWorkerWeight
2243 ])
2244 await pool.destroy()
2245 pool = new DynamicThreadPool(
2246 min,
2247 max,
2248 './tests/worker-files/thread/testWorker.mjs'
2249 )
2250 expect(
2251 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2252 workerChoiceStrategy
2253 ).roundId
2254 ).toBeDefined()
2255 expect(
2256 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2257 workerChoiceStrategy
2258 ).workerNodeId
2259 ).toBeDefined()
2260 expect(
2261 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2262 workerChoiceStrategy
2263 ).nextWorkerNodeKey
2264 ).toBeDefined()
2265 expect(
2266 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2267 workerChoiceStrategy
2268 ).previousWorkerNodeKey
2269 ).toBeDefined()
2270 expect(
2271 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2272 workerChoiceStrategy
2273 ).defaultWorkerWeight
2274 ).toBeDefined()
2275 expect(
2276 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2277 workerChoiceStrategy
2278 ).roundWeights
2279 ).toBeDefined()
2280 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2281 expect(
2282 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2283 pool.workerChoiceStrategyContext.workerChoiceStrategy
2284 ).roundId
2285 ).toBe(0)
2286 expect(
2287 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2288 pool.workerChoiceStrategyContext.workerChoiceStrategy
2289 ).workerNodeId
2290 ).toBe(0)
2291 expect(
2292 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2293 pool.workerChoiceStrategyContext.workerChoiceStrategy
2294 ).nextWorkerNodeKey
2295 ).toBe(0)
2296 expect(
2297 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2298 pool.workerChoiceStrategyContext.workerChoiceStrategy
2299 ).previousWorkerNodeKey
2300 ).toBe(0)
2301 expect(
2302 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2303 pool.workerChoiceStrategyContext.workerChoiceStrategy
2304 ).defaultWorkerWeight
2305 ).toBeGreaterThan(0)
2306 expect(
2307 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2308 pool.workerChoiceStrategyContext.workerChoiceStrategy
2309 ).roundWeights
2310 ).toStrictEqual([
2311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2312 pool.workerChoiceStrategyContext.workerChoiceStrategy
2313 ).defaultWorkerWeight
2314 ])
2315 // We need to clean up the resources after our test
2316 await pool.destroy()
2317 })
2318
2319 it('Verify unknown strategy throw error', () => {
2320 expect(
2321 () =>
2322 new DynamicThreadPool(
2323 min,
2324 max,
2325 './tests/worker-files/thread/testWorker.mjs',
2326 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2327 )
2328 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2329 })
2330 })