perf: optimize tasks stealing in corner case
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.mjs'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 expect(
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
118 workerChoiceStrategy
119 ).nextWorkerNodeKey
120 ).toBe(0)
121 expect(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).previousWorkerNodeKey
125 ).toBe(0)
126 if (
127 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
128 ) {
129 expect(
130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
131 workerChoiceStrategy
132 ).defaultWorkerWeight
133 ).toBeGreaterThan(0)
134 expect(
135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
136 workerChoiceStrategy
137 ).workerNodeVirtualTaskRunTime
138 ).toBe(0)
139 } else if (
140 workerChoiceStrategy ===
141 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
142 ) {
143 expect(
144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
145 workerChoiceStrategy
146 ).defaultWorkerWeight
147 ).toBeGreaterThan(0)
148 expect(
149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
150 workerChoiceStrategy
151 ).workerNodeVirtualTaskRunTime
152 ).toBe(0)
153 expect(
154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
155 workerChoiceStrategy
156 ).roundId
157 ).toBe(0)
158 expect(
159 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
160 workerChoiceStrategy
161 ).workerNodeId
162 ).toBe(0)
163 expect(
164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
165 workerChoiceStrategy
166 ).roundWeights
167 ).toStrictEqual([
168 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
169 workerChoiceStrategy
170 ).defaultWorkerWeight
171 ])
172 }
173 }
174 await pool.destroy()
175 })
176
177 it('Verify ROUND_ROBIN strategy default policy', async () => {
178 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
179 let pool = new FixedThreadPool(
180 max,
181 './tests/worker-files/thread/testWorker.mjs',
182 { workerChoiceStrategy }
183 )
184 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
185 dynamicWorkerUsage: false,
186 dynamicWorkerReady: true
187 })
188 await pool.destroy()
189 pool = new DynamicThreadPool(
190 min,
191 max,
192 './tests/worker-files/thread/testWorker.mjs',
193 { workerChoiceStrategy }
194 )
195 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
196 dynamicWorkerUsage: false,
197 dynamicWorkerReady: true
198 })
199 // We need to clean up the resources after our test
200 await pool.destroy()
201 })
202
203 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
204 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
205 let pool = new FixedThreadPool(
206 max,
207 './tests/worker-files/thread/testWorker.mjs',
208 { workerChoiceStrategy }
209 )
210 expect(
211 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
212 ).toStrictEqual({
213 runTime: {
214 aggregate: false,
215 average: false,
216 median: false
217 },
218 waitTime: {
219 aggregate: false,
220 average: false,
221 median: false
222 },
223 elu: {
224 aggregate: false,
225 average: false,
226 median: false
227 }
228 })
229 await pool.destroy()
230 pool = new DynamicThreadPool(
231 min,
232 max,
233 './tests/worker-files/thread/testWorker.mjs',
234 { workerChoiceStrategy }
235 )
236 expect(
237 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
238 ).toStrictEqual({
239 runTime: {
240 aggregate: false,
241 average: false,
242 median: false
243 },
244 waitTime: {
245 aggregate: false,
246 average: false,
247 median: false
248 },
249 elu: {
250 aggregate: false,
251 average: false,
252 median: false
253 }
254 })
255 // We need to clean up the resources after our test
256 await pool.destroy()
257 })
258
259 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
260 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
261 const pool = new FixedThreadPool(
262 max,
263 './tests/worker-files/thread/testWorker.mjs',
264 { workerChoiceStrategy }
265 )
266 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
267 const promises = new Set()
268 const maxMultiplier = 2
269 for (let i = 0; i < max * maxMultiplier; i++) {
270 promises.add(pool.execute())
271 }
272 await Promise.all(promises)
273 for (const workerNode of pool.workerNodes) {
274 expect(workerNode.usage).toStrictEqual({
275 tasks: {
276 executed: maxMultiplier,
277 executing: 0,
278 queued: 0,
279 maxQueued: 0,
280 sequentiallyStolen: 0,
281 stolen: 0,
282 failed: 0
283 },
284 runTime: {
285 history: new CircularArray()
286 },
287 waitTime: {
288 history: new CircularArray()
289 },
290 elu: {
291 idle: {
292 history: new CircularArray()
293 },
294 active: {
295 history: new CircularArray()
296 }
297 }
298 })
299 }
300 expect(
301 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
302 pool.workerChoiceStrategyContext.workerChoiceStrategy
303 ).nextWorkerNodeKey
304 ).toBe(0)
305 expect(
306 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
307 pool.workerChoiceStrategyContext.workerChoiceStrategy
308 ).previousWorkerNodeKey
309 ).toBe(pool.workerNodes.length - 1)
310 // We need to clean up the resources after our test
311 await pool.destroy()
312 })
313
314 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
315 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
316 const pool = new DynamicThreadPool(
317 min,
318 max,
319 './tests/worker-files/thread/testWorker.mjs',
320 { workerChoiceStrategy }
321 )
322 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
323 const promises = new Set()
324 const maxMultiplier = 2
325 for (let i = 0; i < max * maxMultiplier; i++) {
326 promises.add(pool.execute())
327 }
328 await Promise.all(promises)
329 for (const workerNode of pool.workerNodes) {
330 expect(workerNode.usage).toStrictEqual({
331 tasks: {
332 executed: expect.any(Number),
333 executing: 0,
334 queued: 0,
335 maxQueued: 0,
336 sequentiallyStolen: 0,
337 stolen: 0,
338 failed: 0
339 },
340 runTime: {
341 history: new CircularArray()
342 },
343 waitTime: {
344 history: new CircularArray()
345 },
346 elu: {
347 idle: {
348 history: new CircularArray()
349 },
350 active: {
351 history: new CircularArray()
352 }
353 }
354 })
355 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
356 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
357 max * maxMultiplier
358 )
359 }
360 expect(
361 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
362 pool.workerChoiceStrategyContext.workerChoiceStrategy
363 ).nextWorkerNodeKey
364 ).toBe(0)
365 expect(
366 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
367 pool.workerChoiceStrategyContext.workerChoiceStrategy
368 ).previousWorkerNodeKey
369 ).toBe(pool.workerNodes.length - 1)
370 // We need to clean up the resources after our test
371 await pool.destroy()
372 })
373
374 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
375 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
376 let pool = new FixedClusterPool(
377 max,
378 './tests/worker-files/cluster/testWorker.js',
379 { workerChoiceStrategy }
380 )
381 let results = new Set()
382 for (let i = 0; i < max; i++) {
383 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
384 }
385 expect(results.size).toBe(max)
386 await pool.destroy()
387 pool = new FixedThreadPool(
388 max,
389 './tests/worker-files/thread/testWorker.mjs',
390 { workerChoiceStrategy }
391 )
392 results = new Set()
393 for (let i = 0; i < max; i++) {
394 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
395 }
396 expect(results.size).toBe(max)
397 await pool.destroy()
398 })
399
400 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
401 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
402 let pool = new FixedThreadPool(
403 max,
404 './tests/worker-files/thread/testWorker.mjs',
405 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
406 )
407 expect(
408 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
409 pool.workerChoiceStrategyContext.workerChoiceStrategy
410 ).nextWorkerNodeKey
411 ).toBeDefined()
412 expect(
413 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
414 pool.workerChoiceStrategyContext.workerChoiceStrategy
415 ).previousWorkerNodeKey
416 ).toBeDefined()
417 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
418 expect(
419 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
420 pool.workerChoiceStrategyContext.workerChoiceStrategy
421 ).nextWorkerNodeKey
422 ).toBe(0)
423 expect(
424 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
425 pool.workerChoiceStrategyContext.workerChoiceStrategy
426 ).previousWorkerNodeKey
427 ).toBe(0)
428 await pool.destroy()
429 pool = new DynamicThreadPool(
430 min,
431 max,
432 './tests/worker-files/thread/testWorker.mjs',
433 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
434 )
435 expect(
436 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
437 pool.workerChoiceStrategyContext.workerChoiceStrategy
438 ).nextWorkerNodeKey
439 ).toBeDefined()
440 expect(
441 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
442 pool.workerChoiceStrategyContext.workerChoiceStrategy
443 ).previousWorkerNodeKey
444 ).toBeDefined()
445 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
446 expect(
447 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
448 pool.workerChoiceStrategyContext.workerChoiceStrategy
449 ).nextWorkerNodeKey
450 ).toBe(0)
451 expect(
452 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
453 pool.workerChoiceStrategyContext.workerChoiceStrategy
454 ).previousWorkerNodeKey
455 ).toBe(0)
456 // We need to clean up the resources after our test
457 await pool.destroy()
458 })
459
460 it('Verify LEAST_USED strategy default policy', async () => {
461 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
462 let pool = new FixedThreadPool(
463 max,
464 './tests/worker-files/thread/testWorker.mjs',
465 { workerChoiceStrategy }
466 )
467 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
468 dynamicWorkerUsage: false,
469 dynamicWorkerReady: true
470 })
471 await pool.destroy()
472 pool = new DynamicThreadPool(
473 min,
474 max,
475 './tests/worker-files/thread/testWorker.mjs',
476 { workerChoiceStrategy }
477 )
478 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
479 dynamicWorkerUsage: false,
480 dynamicWorkerReady: true
481 })
482 // We need to clean up the resources after our test
483 await pool.destroy()
484 })
485
486 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
487 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
488 let pool = new FixedThreadPool(
489 max,
490 './tests/worker-files/thread/testWorker.mjs',
491 { workerChoiceStrategy }
492 )
493 expect(
494 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
495 ).toStrictEqual({
496 runTime: {
497 aggregate: false,
498 average: false,
499 median: false
500 },
501 waitTime: {
502 aggregate: false,
503 average: false,
504 median: false
505 },
506 elu: {
507 aggregate: false,
508 average: false,
509 median: false
510 }
511 })
512 await pool.destroy()
513 pool = new DynamicThreadPool(
514 min,
515 max,
516 './tests/worker-files/thread/testWorker.mjs',
517 { workerChoiceStrategy }
518 )
519 expect(
520 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
521 ).toStrictEqual({
522 runTime: {
523 aggregate: false,
524 average: false,
525 median: false
526 },
527 waitTime: {
528 aggregate: false,
529 average: false,
530 median: false
531 },
532 elu: {
533 aggregate: false,
534 average: false,
535 median: false
536 }
537 })
538 // We need to clean up the resources after our test
539 await pool.destroy()
540 })
541
542 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
543 const pool = new FixedThreadPool(
544 max,
545 './tests/worker-files/thread/testWorker.mjs',
546 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
547 )
548 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
549 const promises = new Set()
550 const maxMultiplier = 2
551 for (let i = 0; i < max * maxMultiplier; i++) {
552 promises.add(pool.execute())
553 }
554 await Promise.all(promises)
555 for (const workerNode of pool.workerNodes) {
556 expect(workerNode.usage).toStrictEqual({
557 tasks: {
558 executed: expect.any(Number),
559 executing: 0,
560 queued: 0,
561 maxQueued: 0,
562 sequentiallyStolen: 0,
563 stolen: 0,
564 failed: 0
565 },
566 runTime: {
567 history: new CircularArray()
568 },
569 waitTime: {
570 history: new CircularArray()
571 },
572 elu: {
573 idle: {
574 history: new CircularArray()
575 },
576 active: {
577 history: new CircularArray()
578 }
579 }
580 })
581 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
582 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
583 max * maxMultiplier
584 )
585 }
586 expect(
587 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
588 pool.workerChoiceStrategyContext.workerChoiceStrategy
589 ).nextWorkerNodeKey
590 ).toEqual(expect.any(Number))
591 expect(
592 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
593 pool.workerChoiceStrategyContext.workerChoiceStrategy
594 ).previousWorkerNodeKey
595 ).toEqual(expect.any(Number))
596 // We need to clean up the resources after our test
597 await pool.destroy()
598 })
599
600 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
601 const pool = new DynamicThreadPool(
602 min,
603 max,
604 './tests/worker-files/thread/testWorker.mjs',
605 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
606 )
607 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
608 const promises = new Set()
609 const maxMultiplier = 2
610 for (let i = 0; i < max * maxMultiplier; i++) {
611 promises.add(pool.execute())
612 }
613 await Promise.all(promises)
614 for (const workerNode of pool.workerNodes) {
615 expect(workerNode.usage).toStrictEqual({
616 tasks: {
617 executed: expect.any(Number),
618 executing: 0,
619 queued: 0,
620 maxQueued: 0,
621 sequentiallyStolen: 0,
622 stolen: 0,
623 failed: 0
624 },
625 runTime: {
626 history: new CircularArray()
627 },
628 waitTime: {
629 history: new CircularArray()
630 },
631 elu: {
632 idle: {
633 history: new CircularArray()
634 },
635 active: {
636 history: new CircularArray()
637 }
638 }
639 })
640 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
641 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
642 max * maxMultiplier
643 )
644 }
645 expect(
646 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
647 pool.workerChoiceStrategyContext.workerChoiceStrategy
648 ).nextWorkerNodeKey
649 ).toEqual(expect.any(Number))
650 expect(
651 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
652 pool.workerChoiceStrategyContext.workerChoiceStrategy
653 ).previousWorkerNodeKey
654 ).toEqual(expect.any(Number))
655 // We need to clean up the resources after our test
656 await pool.destroy()
657 })
658
659 it('Verify LEAST_BUSY strategy default policy', async () => {
660 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
661 let pool = new FixedThreadPool(
662 max,
663 './tests/worker-files/thread/testWorker.mjs',
664 { workerChoiceStrategy }
665 )
666 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
667 dynamicWorkerUsage: false,
668 dynamicWorkerReady: true
669 })
670 await pool.destroy()
671 pool = new DynamicThreadPool(
672 min,
673 max,
674 './tests/worker-files/thread/testWorker.mjs',
675 { workerChoiceStrategy }
676 )
677 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
678 dynamicWorkerUsage: false,
679 dynamicWorkerReady: true
680 })
681 // We need to clean up the resources after our test
682 await pool.destroy()
683 })
684
685 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
686 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
687 let pool = new FixedThreadPool(
688 max,
689 './tests/worker-files/thread/testWorker.mjs',
690 { workerChoiceStrategy }
691 )
692 expect(
693 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
694 ).toStrictEqual({
695 runTime: {
696 aggregate: true,
697 average: false,
698 median: false
699 },
700 waitTime: {
701 aggregate: true,
702 average: false,
703 median: false
704 },
705 elu: {
706 aggregate: false,
707 average: false,
708 median: false
709 }
710 })
711 await pool.destroy()
712 pool = new DynamicThreadPool(
713 min,
714 max,
715 './tests/worker-files/thread/testWorker.mjs',
716 { workerChoiceStrategy }
717 )
718 expect(
719 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
720 ).toStrictEqual({
721 runTime: {
722 aggregate: true,
723 average: false,
724 median: false
725 },
726 waitTime: {
727 aggregate: true,
728 average: false,
729 median: false
730 },
731 elu: {
732 aggregate: false,
733 average: false,
734 median: false
735 }
736 })
737 // We need to clean up the resources after our test
738 await pool.destroy()
739 })
740
741 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
742 const pool = new FixedThreadPool(
743 max,
744 './tests/worker-files/thread/testWorker.mjs',
745 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
746 )
747 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
748 const promises = new Set()
749 const maxMultiplier = 2
750 for (let i = 0; i < max * maxMultiplier; i++) {
751 promises.add(pool.execute())
752 }
753 await Promise.all(promises)
754 for (const workerNode of pool.workerNodes) {
755 expect(workerNode.usage).toStrictEqual({
756 tasks: {
757 executed: expect.any(Number),
758 executing: 0,
759 queued: 0,
760 maxQueued: 0,
761 sequentiallyStolen: 0,
762 stolen: 0,
763 failed: 0
764 },
765 runTime: expect.objectContaining({
766 history: expect.any(CircularArray)
767 }),
768 waitTime: expect.objectContaining({
769 history: expect.any(CircularArray)
770 }),
771 elu: {
772 idle: {
773 history: new CircularArray()
774 },
775 active: {
776 history: new CircularArray()
777 }
778 }
779 })
780 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
781 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
782 max * maxMultiplier
783 )
784 if (workerNode.usage.runTime.aggregate == null) {
785 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
786 } else {
787 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
788 }
789 if (workerNode.usage.waitTime.aggregate == null) {
790 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
791 } else {
792 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
793 }
794 }
795 expect(
796 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
797 pool.workerChoiceStrategyContext.workerChoiceStrategy
798 ).nextWorkerNodeKey
799 ).toEqual(expect.any(Number))
800 expect(
801 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
802 pool.workerChoiceStrategyContext.workerChoiceStrategy
803 ).previousWorkerNodeKey
804 ).toEqual(expect.any(Number))
805 // We need to clean up the resources after our test
806 await pool.destroy()
807 })
808
809 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
810 const pool = new DynamicThreadPool(
811 min,
812 max,
813 './tests/worker-files/thread/testWorker.mjs',
814 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
815 )
816 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
817 const promises = new Set()
818 const maxMultiplier = 2
819 for (let i = 0; i < max * maxMultiplier; i++) {
820 promises.add(pool.execute())
821 }
822 await Promise.all(promises)
823 for (const workerNode of pool.workerNodes) {
824 expect(workerNode.usage).toStrictEqual({
825 tasks: {
826 executed: expect.any(Number),
827 executing: 0,
828 queued: 0,
829 maxQueued: 0,
830 sequentiallyStolen: 0,
831 stolen: 0,
832 failed: 0
833 },
834 runTime: expect.objectContaining({
835 history: expect.any(CircularArray)
836 }),
837 waitTime: expect.objectContaining({
838 history: expect.any(CircularArray)
839 }),
840 elu: {
841 idle: {
842 history: new CircularArray()
843 },
844 active: {
845 history: new CircularArray()
846 }
847 }
848 })
849 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
850 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
851 max * maxMultiplier
852 )
853 if (workerNode.usage.runTime.aggregate == null) {
854 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
855 } else {
856 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
857 }
858 if (workerNode.usage.waitTime.aggregate == null) {
859 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
860 } else {
861 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
862 }
863 }
864 expect(
865 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
866 pool.workerChoiceStrategyContext.workerChoiceStrategy
867 ).nextWorkerNodeKey
868 ).toEqual(expect.any(Number))
869 expect(
870 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
871 pool.workerChoiceStrategyContext.workerChoiceStrategy
872 ).previousWorkerNodeKey
873 ).toEqual(expect.any(Number))
874 // We need to clean up the resources after our test
875 await pool.destroy()
876 })
877
878 it('Verify LEAST_ELU strategy default policy', async () => {
879 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
880 let pool = new FixedThreadPool(
881 max,
882 './tests/worker-files/thread/testWorker.mjs',
883 { workerChoiceStrategy }
884 )
885 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
886 dynamicWorkerUsage: false,
887 dynamicWorkerReady: true
888 })
889 await pool.destroy()
890 pool = new DynamicThreadPool(
891 min,
892 max,
893 './tests/worker-files/thread/testWorker.mjs',
894 { workerChoiceStrategy }
895 )
896 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
897 dynamicWorkerUsage: false,
898 dynamicWorkerReady: true
899 })
900 // We need to clean up the resources after our test
901 await pool.destroy()
902 })
903
904 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
905 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
906 let pool = new FixedThreadPool(
907 max,
908 './tests/worker-files/thread/testWorker.mjs',
909 { workerChoiceStrategy }
910 )
911 expect(
912 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
913 ).toStrictEqual({
914 runTime: {
915 aggregate: false,
916 average: false,
917 median: false
918 },
919 waitTime: {
920 aggregate: false,
921 average: false,
922 median: false
923 },
924 elu: {
925 aggregate: true,
926 average: false,
927 median: false
928 }
929 })
930 await pool.destroy()
931 pool = new DynamicThreadPool(
932 min,
933 max,
934 './tests/worker-files/thread/testWorker.mjs',
935 { workerChoiceStrategy }
936 )
937 expect(
938 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
939 ).toStrictEqual({
940 runTime: {
941 aggregate: false,
942 average: false,
943 median: false
944 },
945 waitTime: {
946 aggregate: false,
947 average: false,
948 median: false
949 },
950 elu: {
951 aggregate: true,
952 average: false,
953 median: false
954 }
955 })
956 // We need to clean up the resources after our test
957 await pool.destroy()
958 })
959
960 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
961 const pool = new FixedThreadPool(
962 max,
963 './tests/worker-files/thread/testWorker.mjs',
964 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
965 )
966 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
967 const promises = new Set()
968 const maxMultiplier = 2
969 for (let i = 0; i < max * maxMultiplier; i++) {
970 promises.add(pool.execute())
971 }
972 await Promise.all(promises)
973 for (const workerNode of pool.workerNodes) {
974 expect(workerNode.usage).toStrictEqual({
975 tasks: {
976 executed: expect.any(Number),
977 executing: 0,
978 queued: 0,
979 maxQueued: 0,
980 sequentiallyStolen: 0,
981 stolen: 0,
982 failed: 0
983 },
984 runTime: {
985 history: new CircularArray()
986 },
987 waitTime: {
988 history: new CircularArray()
989 },
990 elu: expect.objectContaining({
991 idle: expect.objectContaining({
992 history: expect.any(CircularArray)
993 }),
994 active: expect.objectContaining({
995 history: expect.any(CircularArray)
996 })
997 })
998 })
999 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1000 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1001 max * maxMultiplier
1002 )
1003 if (workerNode.usage.elu.active.aggregate == null) {
1004 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1005 } else {
1006 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1007 }
1008 if (workerNode.usage.elu.idle.aggregate == null) {
1009 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1010 } else {
1011 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1012 }
1013 if (workerNode.usage.elu.utilization == null) {
1014 expect(workerNode.usage.elu.utilization).toBeUndefined()
1015 } else {
1016 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1017 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1018 }
1019 }
1020 expect(
1021 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1022 pool.workerChoiceStrategyContext.workerChoiceStrategy
1023 ).nextWorkerNodeKey
1024 ).toEqual(expect.any(Number))
1025 expect(
1026 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1027 pool.workerChoiceStrategyContext.workerChoiceStrategy
1028 ).previousWorkerNodeKey
1029 ).toEqual(expect.any(Number))
1030 // We need to clean up the resources after our test
1031 await pool.destroy()
1032 })
1033
1034 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1035 const pool = new DynamicThreadPool(
1036 min,
1037 max,
1038 './tests/worker-files/thread/testWorker.mjs',
1039 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1040 )
1041 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1042 const promises = new Set()
1043 const maxMultiplier = 2
1044 for (let i = 0; i < max * maxMultiplier; i++) {
1045 promises.add(pool.execute())
1046 }
1047 await Promise.all(promises)
1048 for (const workerNode of pool.workerNodes) {
1049 expect(workerNode.usage).toStrictEqual({
1050 tasks: {
1051 executed: expect.any(Number),
1052 executing: 0,
1053 queued: 0,
1054 maxQueued: 0,
1055 sequentiallyStolen: 0,
1056 stolen: 0,
1057 failed: 0
1058 },
1059 runTime: {
1060 history: new CircularArray()
1061 },
1062 waitTime: {
1063 history: new CircularArray()
1064 },
1065 elu: expect.objectContaining({
1066 idle: expect.objectContaining({
1067 history: expect.any(CircularArray)
1068 }),
1069 active: expect.objectContaining({
1070 history: expect.any(CircularArray)
1071 })
1072 })
1073 })
1074 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1075 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1076 max * maxMultiplier
1077 )
1078 if (workerNode.usage.elu.active.aggregate == null) {
1079 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1080 } else {
1081 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1082 }
1083 if (workerNode.usage.elu.idle.aggregate == null) {
1084 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1085 } else {
1086 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1087 }
1088 if (workerNode.usage.elu.utilization == null) {
1089 expect(workerNode.usage.elu.utilization).toBeUndefined()
1090 } else {
1091 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1092 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1093 }
1094 }
1095 expect(
1096 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1097 pool.workerChoiceStrategyContext.workerChoiceStrategy
1098 ).nextWorkerNodeKey
1099 ).toEqual(expect.any(Number))
1100 expect(
1101 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1102 pool.workerChoiceStrategyContext.workerChoiceStrategy
1103 ).previousWorkerNodeKey
1104 ).toEqual(expect.any(Number))
1105 // We need to clean up the resources after our test
1106 await pool.destroy()
1107 })
1108
1109 it('Verify FAIR_SHARE strategy default policy', async () => {
1110 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1111 let pool = new FixedThreadPool(
1112 max,
1113 './tests/worker-files/thread/testWorker.mjs',
1114 { workerChoiceStrategy }
1115 )
1116 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1117 dynamicWorkerUsage: false,
1118 dynamicWorkerReady: true
1119 })
1120 await pool.destroy()
1121 pool = new DynamicThreadPool(
1122 min,
1123 max,
1124 './tests/worker-files/thread/testWorker.mjs',
1125 { workerChoiceStrategy }
1126 )
1127 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1128 dynamicWorkerUsage: false,
1129 dynamicWorkerReady: true
1130 })
1131 // We need to clean up the resources after our test
1132 await pool.destroy()
1133 })
1134
1135 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1136 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1137 let pool = new FixedThreadPool(
1138 max,
1139 './tests/worker-files/thread/testWorker.mjs',
1140 { workerChoiceStrategy }
1141 )
1142 expect(
1143 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1144 ).toStrictEqual({
1145 runTime: {
1146 aggregate: true,
1147 average: true,
1148 median: false
1149 },
1150 waitTime: {
1151 aggregate: false,
1152 average: false,
1153 median: false
1154 },
1155 elu: {
1156 aggregate: true,
1157 average: true,
1158 median: false
1159 }
1160 })
1161 await pool.destroy()
1162 pool = new DynamicThreadPool(
1163 min,
1164 max,
1165 './tests/worker-files/thread/testWorker.mjs',
1166 { workerChoiceStrategy }
1167 )
1168 expect(
1169 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1170 ).toStrictEqual({
1171 runTime: {
1172 aggregate: true,
1173 average: true,
1174 median: false
1175 },
1176 waitTime: {
1177 aggregate: false,
1178 average: false,
1179 median: false
1180 },
1181 elu: {
1182 aggregate: true,
1183 average: true,
1184 median: false
1185 }
1186 })
1187 // We need to clean up the resources after our test
1188 await pool.destroy()
1189 })
1190
1191 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1192 const pool = new FixedThreadPool(
1193 max,
1194 './tests/worker-files/thread/testWorker.mjs',
1195 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1196 )
1197 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1198 const promises = new Set()
1199 const maxMultiplier = 2
1200 for (let i = 0; i < max * maxMultiplier; i++) {
1201 promises.add(pool.execute())
1202 }
1203 await Promise.all(promises)
1204 for (const workerNode of pool.workerNodes) {
1205 expect(workerNode.usage).toStrictEqual({
1206 tasks: {
1207 executed: expect.any(Number),
1208 executing: 0,
1209 queued: 0,
1210 maxQueued: 0,
1211 sequentiallyStolen: 0,
1212 stolen: 0,
1213 failed: 0
1214 },
1215 runTime: expect.objectContaining({
1216 history: expect.any(CircularArray)
1217 }),
1218 waitTime: {
1219 history: new CircularArray()
1220 },
1221 elu: expect.objectContaining({
1222 idle: expect.objectContaining({
1223 history: expect.any(CircularArray)
1224 }),
1225 active: expect.objectContaining({
1226 history: expect.any(CircularArray)
1227 })
1228 })
1229 })
1230 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1231 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1232 max * maxMultiplier
1233 )
1234 if (workerNode.usage.runTime.aggregate == null) {
1235 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1236 } else {
1237 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1238 }
1239 if (workerNode.usage.runTime.average == null) {
1240 expect(workerNode.usage.runTime.average).toBeUndefined()
1241 } else {
1242 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1243 }
1244 if (workerNode.usage.elu.active.aggregate == null) {
1245 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1246 } else {
1247 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1248 }
1249 if (workerNode.usage.elu.idle.aggregate == null) {
1250 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1251 } else {
1252 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1253 }
1254 if (workerNode.usage.elu.utilization == null) {
1255 expect(workerNode.usage.elu.utilization).toBeUndefined()
1256 } else {
1257 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1258 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1259 }
1260 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1261 }
1262 expect(
1263 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1264 pool.workerChoiceStrategyContext.workerChoiceStrategy
1265 ).nextWorkerNodeKey
1266 ).toEqual(expect.any(Number))
1267 expect(
1268 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1269 pool.workerChoiceStrategyContext.workerChoiceStrategy
1270 ).previousWorkerNodeKey
1271 ).toEqual(expect.any(Number))
1272 // We need to clean up the resources after our test
1273 await pool.destroy()
1274 })
1275
1276 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1277 const pool = new DynamicThreadPool(
1278 min,
1279 max,
1280 './tests/worker-files/thread/testWorker.mjs',
1281 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1282 )
1283 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1284 const promises = new Set()
1285 const maxMultiplier = 2
1286 for (let i = 0; i < max * maxMultiplier; i++) {
1287 promises.add(pool.execute())
1288 }
1289 await Promise.all(promises)
1290 for (const workerNode of pool.workerNodes) {
1291 expect(workerNode.usage).toStrictEqual({
1292 tasks: {
1293 executed: expect.any(Number),
1294 executing: 0,
1295 queued: 0,
1296 maxQueued: 0,
1297 sequentiallyStolen: 0,
1298 stolen: 0,
1299 failed: 0
1300 },
1301 runTime: expect.objectContaining({
1302 history: expect.any(CircularArray)
1303 }),
1304 waitTime: {
1305 history: new CircularArray()
1306 },
1307 elu: expect.objectContaining({
1308 idle: expect.objectContaining({
1309 history: expect.any(CircularArray)
1310 }),
1311 active: expect.objectContaining({
1312 history: expect.any(CircularArray)
1313 })
1314 })
1315 })
1316 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1317 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1318 max * maxMultiplier
1319 )
1320 if (workerNode.usage.runTime.aggregate == null) {
1321 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1322 } else {
1323 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1324 }
1325 if (workerNode.usage.runTime.average == null) {
1326 expect(workerNode.usage.runTime.average).toBeUndefined()
1327 } else {
1328 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1329 }
1330 if (workerNode.usage.elu.active.aggregate == null) {
1331 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1332 } else {
1333 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1334 }
1335 if (workerNode.usage.elu.idle.aggregate == null) {
1336 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1337 } else {
1338 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1339 }
1340 if (workerNode.usage.elu.utilization == null) {
1341 expect(workerNode.usage.elu.utilization).toBeUndefined()
1342 } else {
1343 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1344 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1345 }
1346 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1347 }
1348 expect(
1349 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1350 pool.workerChoiceStrategyContext.workerChoiceStrategy
1351 ).nextWorkerNodeKey
1352 ).toEqual(expect.any(Number))
1353 expect(
1354 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1355 pool.workerChoiceStrategyContext.workerChoiceStrategy
1356 ).previousWorkerNodeKey
1357 ).toEqual(expect.any(Number))
1358 // We need to clean up the resources after our test
1359 await pool.destroy()
1360 })
1361
1362 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1363 const pool = new DynamicThreadPool(
1364 min,
1365 max,
1366 './tests/worker-files/thread/testWorker.mjs',
1367 {
1368 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1369 workerChoiceStrategyOptions: {
1370 runTime: { median: true }
1371 }
1372 }
1373 )
1374 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1375 const promises = new Set()
1376 const maxMultiplier = 2
1377 for (let i = 0; i < max * maxMultiplier; i++) {
1378 promises.add(pool.execute())
1379 }
1380 await Promise.all(promises)
1381 for (const workerNode of pool.workerNodes) {
1382 expect(workerNode.usage).toStrictEqual({
1383 tasks: {
1384 executed: expect.any(Number),
1385 executing: 0,
1386 queued: 0,
1387 maxQueued: 0,
1388 sequentiallyStolen: 0,
1389 stolen: 0,
1390 failed: 0
1391 },
1392 runTime: expect.objectContaining({
1393 history: expect.any(CircularArray)
1394 }),
1395 waitTime: {
1396 history: new CircularArray()
1397 },
1398 elu: expect.objectContaining({
1399 idle: expect.objectContaining({
1400 history: expect.any(CircularArray)
1401 }),
1402 active: expect.objectContaining({
1403 history: expect.any(CircularArray)
1404 })
1405 })
1406 })
1407 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1408 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1409 max * maxMultiplier
1410 )
1411 if (workerNode.usage.runTime.aggregate == null) {
1412 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1413 } else {
1414 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1415 }
1416 if (workerNode.usage.runTime.median == null) {
1417 expect(workerNode.usage.runTime.median).toBeUndefined()
1418 } else {
1419 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1420 }
1421 if (workerNode.usage.elu.active.aggregate == null) {
1422 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1423 } else {
1424 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1425 }
1426 if (workerNode.usage.elu.idle.aggregate == null) {
1427 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1428 } else {
1429 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1430 }
1431 if (workerNode.usage.elu.utilization == null) {
1432 expect(workerNode.usage.elu.utilization).toBeUndefined()
1433 } else {
1434 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1435 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1436 }
1437 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1438 }
1439 expect(
1440 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1441 pool.workerChoiceStrategyContext.workerChoiceStrategy
1442 ).nextWorkerNodeKey
1443 ).toEqual(expect.any(Number))
1444 expect(
1445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1446 pool.workerChoiceStrategyContext.workerChoiceStrategy
1447 ).previousWorkerNodeKey
1448 ).toEqual(expect.any(Number))
1449 // We need to clean up the resources after our test
1450 await pool.destroy()
1451 })
1452
1453 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1454 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1455 let pool = new FixedThreadPool(
1456 max,
1457 './tests/worker-files/thread/testWorker.mjs'
1458 )
1459 for (const workerNode of pool.workerNodes) {
1460 workerNode.strategyData = {
1461 virtualTaskEndTimestamp: performance.now()
1462 }
1463 }
1464 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1465 for (const workerNode of pool.workerNodes) {
1466 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1467 }
1468 await pool.destroy()
1469 pool = new DynamicThreadPool(
1470 min,
1471 max,
1472 './tests/worker-files/thread/testWorker.mjs'
1473 )
1474 for (const workerNode of pool.workerNodes) {
1475 workerNode.strategyData = {
1476 virtualTaskEndTimestamp: performance.now()
1477 }
1478 }
1479 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1480 for (const workerNode of pool.workerNodes) {
1481 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1482 }
1483 // We need to clean up the resources after our test
1484 await pool.destroy()
1485 })
1486
1487 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1488 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1489 let pool = new FixedThreadPool(
1490 max,
1491 './tests/worker-files/thread/testWorker.mjs',
1492 { workerChoiceStrategy }
1493 )
1494 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1495 dynamicWorkerUsage: false,
1496 dynamicWorkerReady: true
1497 })
1498 await pool.destroy()
1499 pool = new DynamicThreadPool(
1500 min,
1501 max,
1502 './tests/worker-files/thread/testWorker.mjs',
1503 { workerChoiceStrategy }
1504 )
1505 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1506 dynamicWorkerUsage: false,
1507 dynamicWorkerReady: true
1508 })
1509 // We need to clean up the resources after our test
1510 await pool.destroy()
1511 })
1512
1513 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1514 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1515 let pool = new FixedThreadPool(
1516 max,
1517 './tests/worker-files/thread/testWorker.mjs',
1518 { workerChoiceStrategy }
1519 )
1520 expect(
1521 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1522 ).toStrictEqual({
1523 runTime: {
1524 aggregate: true,
1525 average: true,
1526 median: false
1527 },
1528 waitTime: {
1529 aggregate: false,
1530 average: false,
1531 median: false
1532 },
1533 elu: {
1534 aggregate: false,
1535 average: false,
1536 median: false
1537 }
1538 })
1539 await pool.destroy()
1540 pool = new DynamicThreadPool(
1541 min,
1542 max,
1543 './tests/worker-files/thread/testWorker.mjs',
1544 { workerChoiceStrategy }
1545 )
1546 expect(
1547 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1548 ).toStrictEqual({
1549 runTime: {
1550 aggregate: true,
1551 average: true,
1552 median: false
1553 },
1554 waitTime: {
1555 aggregate: false,
1556 average: false,
1557 median: false
1558 },
1559 elu: {
1560 aggregate: false,
1561 average: false,
1562 median: false
1563 }
1564 })
1565 // We need to clean up the resources after our test
1566 await pool.destroy()
1567 })
1568
1569 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1570 const pool = new FixedThreadPool(
1571 max,
1572 './tests/worker-files/thread/testWorker.mjs',
1573 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1574 )
1575 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1576 const promises = new Set()
1577 const maxMultiplier = 2
1578 for (let i = 0; i < max * maxMultiplier; i++) {
1579 promises.add(pool.execute())
1580 }
1581 await Promise.all(promises)
1582 for (const workerNode of pool.workerNodes) {
1583 expect(workerNode.usage).toStrictEqual({
1584 tasks: {
1585 executed: expect.any(Number),
1586 executing: 0,
1587 queued: 0,
1588 maxQueued: 0,
1589 sequentiallyStolen: 0,
1590 stolen: 0,
1591 failed: 0
1592 },
1593 runTime: expect.objectContaining({
1594 history: expect.any(CircularArray)
1595 }),
1596 waitTime: {
1597 history: new CircularArray()
1598 },
1599 elu: {
1600 idle: {
1601 history: new CircularArray()
1602 },
1603 active: {
1604 history: new CircularArray()
1605 }
1606 }
1607 })
1608 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1609 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1610 max * maxMultiplier
1611 )
1612 if (workerNode.usage.runTime.aggregate == null) {
1613 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1614 } else {
1615 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1616 }
1617 if (workerNode.usage.runTime.average == null) {
1618 expect(workerNode.usage.runTime.average).toBeUndefined()
1619 } else {
1620 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1621 }
1622 }
1623 expect(
1624 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1625 pool.workerChoiceStrategyContext.workerChoiceStrategy
1626 ).nextWorkerNodeKey
1627 ).toBe(0)
1628 expect(
1629 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1630 pool.workerChoiceStrategyContext.workerChoiceStrategy
1631 ).previousWorkerNodeKey
1632 ).toBe(0)
1633 expect(
1634 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1635 pool.workerChoiceStrategyContext.workerChoiceStrategy
1636 ).defaultWorkerWeight
1637 ).toBeGreaterThan(0)
1638 expect(
1639 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1640 pool.workerChoiceStrategyContext.workerChoiceStrategy
1641 ).workerNodeVirtualTaskRunTime
1642 ).toBeGreaterThanOrEqual(0)
1643 // We need to clean up the resources after our test
1644 await pool.destroy()
1645 })
1646
1647 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1648 const pool = new DynamicThreadPool(
1649 min,
1650 max,
1651 './tests/worker-files/thread/testWorker.mjs',
1652 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1653 )
1654 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1655 const promises = new Set()
1656 const maxMultiplier = 2
1657 for (let i = 0; i < max * maxMultiplier; i++) {
1658 promises.add(pool.execute())
1659 }
1660 await Promise.all(promises)
1661 for (const workerNode of pool.workerNodes) {
1662 expect(workerNode.usage).toStrictEqual({
1663 tasks: {
1664 executed: expect.any(Number),
1665 executing: 0,
1666 queued: 0,
1667 maxQueued: 0,
1668 sequentiallyStolen: 0,
1669 stolen: 0,
1670 failed: 0
1671 },
1672 runTime: expect.objectContaining({
1673 history: expect.any(CircularArray)
1674 }),
1675 waitTime: {
1676 history: new CircularArray()
1677 },
1678 elu: {
1679 idle: {
1680 history: new CircularArray()
1681 },
1682 active: {
1683 history: new CircularArray()
1684 }
1685 }
1686 })
1687 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1688 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1689 max * maxMultiplier
1690 )
1691 if (workerNode.usage.runTime.aggregate == null) {
1692 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1693 } else {
1694 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1695 }
1696 if (workerNode.usage.runTime.average == null) {
1697 expect(workerNode.usage.runTime.average).toBeUndefined()
1698 } else {
1699 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1700 }
1701 }
1702 expect(
1703 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1704 pool.workerChoiceStrategyContext.workerChoiceStrategy
1705 ).nextWorkerNodeKey
1706 ).toBe(0)
1707 expect(
1708 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1709 pool.workerChoiceStrategyContext.workerChoiceStrategy
1710 ).previousWorkerNodeKey
1711 ).toBe(0)
1712 expect(
1713 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1714 pool.workerChoiceStrategyContext.workerChoiceStrategy
1715 ).defaultWorkerWeight
1716 ).toBeGreaterThan(0)
1717 expect(
1718 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1719 pool.workerChoiceStrategyContext.workerChoiceStrategy
1720 ).workerNodeVirtualTaskRunTime
1721 ).toBeGreaterThanOrEqual(0)
1722 // We need to clean up the resources after our test
1723 await pool.destroy()
1724 })
1725
1726 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1727 const pool = new DynamicThreadPool(
1728 min,
1729 max,
1730 './tests/worker-files/thread/testWorker.mjs',
1731 {
1732 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1733 workerChoiceStrategyOptions: {
1734 runTime: { median: true }
1735 }
1736 }
1737 )
1738 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1739 const promises = new Set()
1740 const maxMultiplier = 2
1741 for (let i = 0; i < max * maxMultiplier; i++) {
1742 promises.add(pool.execute())
1743 }
1744 await Promise.all(promises)
1745 for (const workerNode of pool.workerNodes) {
1746 expect(workerNode.usage).toStrictEqual({
1747 tasks: {
1748 executed: expect.any(Number),
1749 executing: 0,
1750 queued: 0,
1751 maxQueued: 0,
1752 sequentiallyStolen: 0,
1753 stolen: 0,
1754 failed: 0
1755 },
1756 runTime: expect.objectContaining({
1757 history: expect.any(CircularArray)
1758 }),
1759 waitTime: {
1760 history: new CircularArray()
1761 },
1762 elu: {
1763 idle: {
1764 history: new CircularArray()
1765 },
1766 active: {
1767 history: new CircularArray()
1768 }
1769 }
1770 })
1771 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1772 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1773 max * maxMultiplier
1774 )
1775 if (workerNode.usage.runTime.aggregate == null) {
1776 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1777 } else {
1778 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1779 }
1780 if (workerNode.usage.runTime.median == null) {
1781 expect(workerNode.usage.runTime.median).toBeUndefined()
1782 } else {
1783 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1784 }
1785 }
1786 expect(
1787 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1788 pool.workerChoiceStrategyContext.workerChoiceStrategy
1789 ).nextWorkerNodeKey
1790 ).toBe(0)
1791 expect(
1792 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1793 pool.workerChoiceStrategyContext.workerChoiceStrategy
1794 ).previousWorkerNodeKey
1795 ).toBe(0)
1796 expect(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1798 pool.workerChoiceStrategyContext.workerChoiceStrategy
1799 ).defaultWorkerWeight
1800 ).toBeGreaterThan(0)
1801 expect(
1802 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1803 pool.workerChoiceStrategyContext.workerChoiceStrategy
1804 ).workerNodeVirtualTaskRunTime
1805 ).toBeGreaterThanOrEqual(0)
1806 // We need to clean up the resources after our test
1807 await pool.destroy()
1808 })
1809
1810 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1811 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1812 let pool = new FixedThreadPool(
1813 max,
1814 './tests/worker-files/thread/testWorker.mjs'
1815 )
1816 expect(
1817 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1818 workerChoiceStrategy
1819 ).nextWorkerNodeKey
1820 ).toBeDefined()
1821 expect(
1822 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1823 workerChoiceStrategy
1824 ).previousWorkerNodeKey
1825 ).toBeDefined()
1826 expect(
1827 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1828 workerChoiceStrategy
1829 ).defaultWorkerWeight
1830 ).toBeDefined()
1831 expect(
1832 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1833 workerChoiceStrategy
1834 ).workerNodeVirtualTaskRunTime
1835 ).toBeDefined()
1836 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1837 expect(
1838 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1839 pool.workerChoiceStrategyContext.workerChoiceStrategy
1840 ).nextWorkerNodeKey
1841 ).toBe(0)
1842 expect(
1843 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1844 pool.workerChoiceStrategyContext.workerChoiceStrategy
1845 ).previousWorkerNodeKey
1846 ).toBe(0)
1847 expect(
1848 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1849 pool.workerChoiceStrategyContext.workerChoiceStrategy
1850 ).defaultWorkerWeight
1851 ).toBeGreaterThan(0)
1852 expect(
1853 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1854 pool.workerChoiceStrategyContext.workerChoiceStrategy
1855 ).workerNodeVirtualTaskRunTime
1856 ).toBe(0)
1857 await pool.destroy()
1858 pool = new DynamicThreadPool(
1859 min,
1860 max,
1861 './tests/worker-files/thread/testWorker.mjs'
1862 )
1863 expect(
1864 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1865 workerChoiceStrategy
1866 ).nextWorkerNodeKey
1867 ).toBeDefined()
1868 expect(
1869 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1870 workerChoiceStrategy
1871 ).previousWorkerNodeKey
1872 ).toBeDefined()
1873 expect(
1874 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1875 workerChoiceStrategy
1876 ).defaultWorkerWeight
1877 ).toBeDefined()
1878 expect(
1879 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1880 workerChoiceStrategy
1881 ).workerNodeVirtualTaskRunTime
1882 ).toBeDefined()
1883 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1884 expect(
1885 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1886 pool.workerChoiceStrategyContext.workerChoiceStrategy
1887 ).nextWorkerNodeKey
1888 ).toBe(0)
1889 expect(
1890 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1891 pool.workerChoiceStrategyContext.workerChoiceStrategy
1892 ).previousWorkerNodeKey
1893 ).toBe(0)
1894 expect(
1895 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1896 pool.workerChoiceStrategyContext.workerChoiceStrategy
1897 ).defaultWorkerWeight
1898 ).toBeGreaterThan(0)
1899 expect(
1900 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1901 pool.workerChoiceStrategyContext.workerChoiceStrategy
1902 ).workerNodeVirtualTaskRunTime
1903 ).toBe(0)
1904 // We need to clean up the resources after our test
1905 await pool.destroy()
1906 })
1907
1908 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1909 const workerChoiceStrategy =
1910 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1911 let pool = new FixedThreadPool(
1912 max,
1913 './tests/worker-files/thread/testWorker.mjs',
1914 { workerChoiceStrategy }
1915 )
1916 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1917 dynamicWorkerUsage: false,
1918 dynamicWorkerReady: true
1919 })
1920 await pool.destroy()
1921 pool = new DynamicThreadPool(
1922 min,
1923 max,
1924 './tests/worker-files/thread/testWorker.mjs',
1925 { workerChoiceStrategy }
1926 )
1927 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1928 dynamicWorkerUsage: false,
1929 dynamicWorkerReady: true
1930 })
1931 // We need to clean up the resources after our test
1932 await pool.destroy()
1933 })
1934
1935 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1936 const workerChoiceStrategy =
1937 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1938 let pool = new FixedThreadPool(
1939 max,
1940 './tests/worker-files/thread/testWorker.mjs',
1941 { workerChoiceStrategy }
1942 )
1943 expect(
1944 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1945 ).toStrictEqual({
1946 runTime: {
1947 aggregate: true,
1948 average: true,
1949 median: false
1950 },
1951 waitTime: {
1952 aggregate: false,
1953 average: false,
1954 median: false
1955 },
1956 elu: {
1957 aggregate: false,
1958 average: false,
1959 median: false
1960 }
1961 })
1962 await pool.destroy()
1963 pool = new DynamicThreadPool(
1964 min,
1965 max,
1966 './tests/worker-files/thread/testWorker.mjs',
1967 { workerChoiceStrategy }
1968 )
1969 expect(
1970 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1971 ).toStrictEqual({
1972 runTime: {
1973 aggregate: true,
1974 average: true,
1975 median: false
1976 },
1977 waitTime: {
1978 aggregate: false,
1979 average: false,
1980 median: false
1981 },
1982 elu: {
1983 aggregate: false,
1984 average: false,
1985 median: false
1986 }
1987 })
1988 // We need to clean up the resources after our test
1989 await pool.destroy()
1990 })
1991
1992 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1993 const pool = new FixedThreadPool(
1994 max,
1995 './tests/worker-files/thread/testWorker.mjs',
1996 {
1997 workerChoiceStrategy:
1998 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1999 }
2000 )
2001 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2002 const promises = new Set()
2003 const maxMultiplier = 2
2004 for (let i = 0; i < max * maxMultiplier; i++) {
2005 promises.add(pool.execute())
2006 }
2007 await Promise.all(promises)
2008 for (const workerNode of pool.workerNodes) {
2009 expect(workerNode.usage).toStrictEqual({
2010 tasks: {
2011 executed: expect.any(Number),
2012 executing: 0,
2013 queued: 0,
2014 maxQueued: 0,
2015 sequentiallyStolen: 0,
2016 stolen: 0,
2017 failed: 0
2018 },
2019 runTime: expect.objectContaining({
2020 history: expect.any(CircularArray)
2021 }),
2022 waitTime: {
2023 history: new CircularArray()
2024 },
2025 elu: {
2026 idle: {
2027 history: new CircularArray()
2028 },
2029 active: {
2030 history: new CircularArray()
2031 }
2032 }
2033 })
2034 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2035 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2036 max * maxMultiplier
2037 )
2038 }
2039 expect(
2040 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2041 pool.workerChoiceStrategyContext.workerChoiceStrategy
2042 ).defaultWorkerWeight
2043 ).toBeGreaterThan(0)
2044 expect(
2045 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2046 pool.workerChoiceStrategyContext.workerChoiceStrategy
2047 ).roundId
2048 ).toBe(0)
2049 expect(
2050 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2051 pool.workerChoiceStrategyContext.workerChoiceStrategy
2052 ).workerNodeId
2053 ).toBe(0)
2054 expect(
2055 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2056 pool.workerChoiceStrategyContext.workerChoiceStrategy
2057 ).nextWorkerNodeKey
2058 ).toBe(0)
2059 expect(
2060 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2061 pool.workerChoiceStrategyContext.workerChoiceStrategy
2062 ).previousWorkerNodeKey
2063 ).toEqual(expect.any(Number))
2064 expect(
2065 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2066 pool.workerChoiceStrategyContext.workerChoiceStrategy
2067 ).roundWeights
2068 ).toStrictEqual([
2069 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategyContext.workerChoiceStrategy
2071 ).defaultWorkerWeight
2072 ])
2073 // We need to clean up the resources after our test
2074 await pool.destroy()
2075 })
2076
2077 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2078 const pool = new DynamicThreadPool(
2079 min,
2080 max,
2081 './tests/worker-files/thread/testWorker.mjs',
2082 {
2083 workerChoiceStrategy:
2084 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2085 }
2086 )
2087 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2088 const promises = new Set()
2089 const maxMultiplier = 2
2090 for (let i = 0; i < max * maxMultiplier; i++) {
2091 promises.add(pool.execute())
2092 }
2093 await Promise.all(promises)
2094 for (const workerNode of pool.workerNodes) {
2095 expect(workerNode.usage).toStrictEqual({
2096 tasks: {
2097 executed: expect.any(Number),
2098 executing: 0,
2099 queued: 0,
2100 maxQueued: 0,
2101 sequentiallyStolen: 0,
2102 stolen: 0,
2103 failed: 0
2104 },
2105 runTime: expect.objectContaining({
2106 history: expect.any(CircularArray)
2107 }),
2108 waitTime: {
2109 history: new CircularArray()
2110 },
2111 elu: {
2112 idle: {
2113 history: new CircularArray()
2114 },
2115 active: {
2116 history: new CircularArray()
2117 }
2118 }
2119 })
2120 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2121 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2122 max * maxMultiplier
2123 )
2124 }
2125 expect(
2126 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2127 pool.workerChoiceStrategyContext.workerChoiceStrategy
2128 ).defaultWorkerWeight
2129 ).toBeGreaterThan(0)
2130 expect(
2131 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2132 pool.workerChoiceStrategyContext.workerChoiceStrategy
2133 ).roundId
2134 ).toBe(0)
2135 expect(
2136 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2137 pool.workerChoiceStrategyContext.workerChoiceStrategy
2138 ).workerNodeId
2139 ).toBe(0)
2140 expect(
2141 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2142 pool.workerChoiceStrategyContext.workerChoiceStrategy
2143 ).nextWorkerNodeKey
2144 ).toBe(0)
2145 expect(
2146 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2147 pool.workerChoiceStrategyContext.workerChoiceStrategy
2148 ).previousWorkerNodeKey
2149 ).toEqual(expect.any(Number))
2150 expect(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2152 pool.workerChoiceStrategyContext.workerChoiceStrategy
2153 ).roundWeights
2154 ).toStrictEqual([
2155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategy
2157 ).defaultWorkerWeight
2158 ])
2159 // We need to clean up the resources after our test
2160 await pool.destroy()
2161 })
2162
2163 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2164 const workerChoiceStrategy =
2165 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2166 let pool = new FixedThreadPool(
2167 max,
2168 './tests/worker-files/thread/testWorker.mjs'
2169 )
2170 expect(
2171 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2172 workerChoiceStrategy
2173 ).roundId
2174 ).toBeDefined()
2175 expect(
2176 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2177 workerChoiceStrategy
2178 ).workerNodeId
2179 ).toBeDefined()
2180 expect(
2181 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2182 workerChoiceStrategy
2183 ).nextWorkerNodeKey
2184 ).toBeDefined()
2185 expect(
2186 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2187 workerChoiceStrategy
2188 ).previousWorkerNodeKey
2189 ).toBeDefined()
2190 expect(
2191 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2192 workerChoiceStrategy
2193 ).defaultWorkerWeight
2194 ).toBeDefined()
2195 expect(
2196 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2197 workerChoiceStrategy
2198 ).roundWeights
2199 ).toBeDefined()
2200 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2201 expect(
2202 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2203 pool.workerChoiceStrategyContext.workerChoiceStrategy
2204 ).roundId
2205 ).toBe(0)
2206 expect(
2207 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2208 pool.workerChoiceStrategyContext.workerChoiceStrategy
2209 ).workerNodeId
2210 ).toBe(0)
2211 expect(
2212 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2213 pool.workerChoiceStrategyContext.workerChoiceStrategy
2214 ).nextWorkerNodeKey
2215 ).toBe(0)
2216 expect(
2217 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2218 pool.workerChoiceStrategyContext.workerChoiceStrategy
2219 ).previousWorkerNodeKey
2220 ).toBe(0)
2221 expect(
2222 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2223 pool.workerChoiceStrategyContext.workerChoiceStrategy
2224 ).defaultWorkerWeight
2225 ).toBeGreaterThan(0)
2226 expect(
2227 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2228 pool.workerChoiceStrategyContext.workerChoiceStrategy
2229 ).roundWeights
2230 ).toStrictEqual([
2231 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2232 pool.workerChoiceStrategyContext.workerChoiceStrategy
2233 ).defaultWorkerWeight
2234 ])
2235 await pool.destroy()
2236 pool = new DynamicThreadPool(
2237 min,
2238 max,
2239 './tests/worker-files/thread/testWorker.mjs'
2240 )
2241 expect(
2242 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2243 workerChoiceStrategy
2244 ).roundId
2245 ).toBeDefined()
2246 expect(
2247 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2248 workerChoiceStrategy
2249 ).workerNodeId
2250 ).toBeDefined()
2251 expect(
2252 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2253 workerChoiceStrategy
2254 ).nextWorkerNodeKey
2255 ).toBeDefined()
2256 expect(
2257 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2258 workerChoiceStrategy
2259 ).previousWorkerNodeKey
2260 ).toBeDefined()
2261 expect(
2262 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2263 workerChoiceStrategy
2264 ).defaultWorkerWeight
2265 ).toBeDefined()
2266 expect(
2267 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2268 workerChoiceStrategy
2269 ).roundWeights
2270 ).toBeDefined()
2271 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2272 expect(
2273 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2274 pool.workerChoiceStrategyContext.workerChoiceStrategy
2275 ).roundId
2276 ).toBe(0)
2277 expect(
2278 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2279 pool.workerChoiceStrategyContext.workerChoiceStrategy
2280 ).workerNodeId
2281 ).toBe(0)
2282 expect(
2283 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2284 pool.workerChoiceStrategyContext.workerChoiceStrategy
2285 ).nextWorkerNodeKey
2286 ).toBe(0)
2287 expect(
2288 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2289 pool.workerChoiceStrategyContext.workerChoiceStrategy
2290 ).previousWorkerNodeKey
2291 ).toBe(0)
2292 expect(
2293 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2294 pool.workerChoiceStrategyContext.workerChoiceStrategy
2295 ).defaultWorkerWeight
2296 ).toBeGreaterThan(0)
2297 expect(
2298 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2299 pool.workerChoiceStrategyContext.workerChoiceStrategy
2300 ).roundWeights
2301 ).toStrictEqual([
2302 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2303 pool.workerChoiceStrategyContext.workerChoiceStrategy
2304 ).defaultWorkerWeight
2305 ])
2306 // We need to clean up the resources after our test
2307 await pool.destroy()
2308 })
2309
2310 it('Verify unknown strategy throw error', () => {
2311 expect(
2312 () =>
2313 new DynamicThreadPool(
2314 min,
2315 max,
2316 './tests/worker-files/thread/testWorker.mjs',
2317 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2318 )
2319 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2320 })
2321 })