bca3c82b1647e22146b6094321d7c9ac60485403
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.cjs'
9 import { CircularArray } from '../../../lib/circular-array.cjs'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 await pool.destroy()
70 }
71 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
72 const pool = new DynamicClusterPool(
73 min,
74 max,
75 './tests/worker-files/cluster/testWorker.cjs'
76 )
77 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
78 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
79 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
80 workerChoiceStrategy
81 )
82 await pool.destroy()
83 }
84 })
85
86 it('Verify available strategies default internals at pool creation', async () => {
87 const pool = new FixedThreadPool(
88 max,
89 './tests/worker-files/thread/testWorker.mjs'
90 )
91 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
92 expect(
93 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
94 workerChoiceStrategy
95 ).nextWorkerNodeKey
96 ).toBe(0)
97 expect(
98 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
99 workerChoiceStrategy
100 ).previousWorkerNodeKey
101 ).toBe(0)
102 if (
103 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
104 ) {
105 expect(
106 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
107 workerChoiceStrategy
108 ).workerNodeVirtualTaskRunTime
109 ).toBe(0)
110 } else if (
111 workerChoiceStrategy ===
112 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
113 ) {
114 expect(
115 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
116 workerChoiceStrategy
117 ).workerNodeVirtualTaskRunTime
118 ).toBe(0)
119 expect(
120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
121 workerChoiceStrategy
122 ).roundId
123 ).toBe(0)
124 expect(
125 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
126 workerChoiceStrategy
127 ).workerNodeId
128 ).toBe(0)
129 expect(
130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
131 workerChoiceStrategy
132 ).roundWeights.length
133 ).toBe(1)
134 expect(
135 Number.isSafeInteger(
136 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
137 workerChoiceStrategy
138 ).roundWeights[0]
139 )
140 ).toBe(true)
141 }
142 }
143 await pool.destroy()
144 })
145
146 it('Verify ROUND_ROBIN strategy default policy', async () => {
147 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
148 let pool = new FixedThreadPool(
149 max,
150 './tests/worker-files/thread/testWorker.mjs',
151 { workerChoiceStrategy }
152 )
153 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
154 dynamicWorkerUsage: false,
155 dynamicWorkerReady: true
156 })
157 await pool.destroy()
158 pool = new DynamicThreadPool(
159 min,
160 max,
161 './tests/worker-files/thread/testWorker.mjs',
162 { workerChoiceStrategy }
163 )
164 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
165 dynamicWorkerUsage: false,
166 dynamicWorkerReady: true
167 })
168 // We need to clean up the resources after our test
169 await pool.destroy()
170 })
171
172 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
173 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
174 let pool = new FixedThreadPool(
175 max,
176 './tests/worker-files/thread/testWorker.mjs',
177 { workerChoiceStrategy }
178 )
179 expect(
180 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
181 ).toStrictEqual({
182 runTime: {
183 aggregate: false,
184 average: false,
185 median: false
186 },
187 waitTime: {
188 aggregate: false,
189 average: false,
190 median: false
191 },
192 elu: {
193 aggregate: false,
194 average: false,
195 median: false
196 }
197 })
198 await pool.destroy()
199 pool = new DynamicThreadPool(
200 min,
201 max,
202 './tests/worker-files/thread/testWorker.mjs',
203 { workerChoiceStrategy }
204 )
205 expect(
206 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
207 ).toStrictEqual({
208 runTime: {
209 aggregate: false,
210 average: false,
211 median: false
212 },
213 waitTime: {
214 aggregate: false,
215 average: false,
216 median: false
217 },
218 elu: {
219 aggregate: false,
220 average: false,
221 median: false
222 }
223 })
224 // We need to clean up the resources after our test
225 await pool.destroy()
226 })
227
228 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
229 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
230 const pool = new FixedThreadPool(
231 max,
232 './tests/worker-files/thread/testWorker.mjs',
233 { workerChoiceStrategy }
234 )
235 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
236 const promises = new Set()
237 const maxMultiplier = 2
238 for (let i = 0; i < max * maxMultiplier; i++) {
239 promises.add(pool.execute())
240 }
241 await Promise.all(promises)
242 for (const workerNode of pool.workerNodes) {
243 expect(workerNode.usage).toStrictEqual({
244 tasks: {
245 executed: maxMultiplier,
246 executing: 0,
247 queued: 0,
248 maxQueued: 0,
249 sequentiallyStolen: 0,
250 stolen: 0,
251 failed: 0
252 },
253 runTime: {
254 history: new CircularArray()
255 },
256 waitTime: {
257 history: new CircularArray()
258 },
259 elu: {
260 idle: {
261 history: new CircularArray()
262 },
263 active: {
264 history: new CircularArray()
265 }
266 }
267 })
268 }
269 expect(
270 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
271 pool.workerChoiceStrategyContext.workerChoiceStrategy
272 ).nextWorkerNodeKey
273 ).toBe(0)
274 expect(
275 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
276 pool.workerChoiceStrategyContext.workerChoiceStrategy
277 ).previousWorkerNodeKey
278 ).toBe(pool.workerNodes.length - 1)
279 // We need to clean up the resources after our test
280 await pool.destroy()
281 })
282
283 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
284 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
285 const pool = new DynamicThreadPool(
286 min,
287 max,
288 './tests/worker-files/thread/testWorker.mjs',
289 { workerChoiceStrategy }
290 )
291 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
292 const promises = new Set()
293 const maxMultiplier = 2
294 for (let i = 0; i < max * maxMultiplier; i++) {
295 promises.add(pool.execute())
296 }
297 await Promise.all(promises)
298 for (const workerNode of pool.workerNodes) {
299 expect(workerNode.usage).toStrictEqual({
300 tasks: {
301 executed: expect.any(Number),
302 executing: 0,
303 queued: 0,
304 maxQueued: 0,
305 sequentiallyStolen: 0,
306 stolen: 0,
307 failed: 0
308 },
309 runTime: {
310 history: new CircularArray()
311 },
312 waitTime: {
313 history: new CircularArray()
314 },
315 elu: {
316 idle: {
317 history: new CircularArray()
318 },
319 active: {
320 history: new CircularArray()
321 }
322 }
323 })
324 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
325 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
326 max * maxMultiplier
327 )
328 }
329 expect(
330 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
331 pool.workerChoiceStrategyContext.workerChoiceStrategy
332 ).nextWorkerNodeKey
333 ).toBe(0)
334 expect(
335 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
336 pool.workerChoiceStrategyContext.workerChoiceStrategy
337 ).previousWorkerNodeKey
338 ).toBe(pool.workerNodes.length - 1)
339 // We need to clean up the resources after our test
340 await pool.destroy()
341 })
342
343 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
344 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
345 let pool = new FixedClusterPool(
346 max,
347 './tests/worker-files/cluster/testWorker.cjs',
348 { workerChoiceStrategy }
349 )
350 let results = new Set()
351 for (let i = 0; i < max; i++) {
352 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
353 }
354 expect(results.size).toBe(max)
355 await pool.destroy()
356 pool = new FixedThreadPool(
357 max,
358 './tests/worker-files/thread/testWorker.mjs',
359 { workerChoiceStrategy }
360 )
361 results = new Set()
362 for (let i = 0; i < max; i++) {
363 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
364 }
365 expect(results.size).toBe(max)
366 await pool.destroy()
367 })
368
369 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
370 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
371 let pool = new FixedThreadPool(
372 max,
373 './tests/worker-files/thread/testWorker.mjs',
374 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
375 )
376 expect(
377 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
378 pool.workerChoiceStrategyContext.workerChoiceStrategy
379 ).nextWorkerNodeKey
380 ).toBeDefined()
381 expect(
382 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
383 pool.workerChoiceStrategyContext.workerChoiceStrategy
384 ).previousWorkerNodeKey
385 ).toBeDefined()
386 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
387 expect(
388 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
389 pool.workerChoiceStrategyContext.workerChoiceStrategy
390 ).nextWorkerNodeKey
391 ).toBe(0)
392 expect(
393 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
394 pool.workerChoiceStrategyContext.workerChoiceStrategy
395 ).previousWorkerNodeKey
396 ).toBe(0)
397 await pool.destroy()
398 pool = new DynamicThreadPool(
399 min,
400 max,
401 './tests/worker-files/thread/testWorker.mjs',
402 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
403 )
404 expect(
405 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
406 pool.workerChoiceStrategyContext.workerChoiceStrategy
407 ).nextWorkerNodeKey
408 ).toBeDefined()
409 expect(
410 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
411 pool.workerChoiceStrategyContext.workerChoiceStrategy
412 ).previousWorkerNodeKey
413 ).toBeDefined()
414 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
415 expect(
416 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
417 pool.workerChoiceStrategyContext.workerChoiceStrategy
418 ).nextWorkerNodeKey
419 ).toBe(0)
420 expect(
421 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
422 pool.workerChoiceStrategyContext.workerChoiceStrategy
423 ).previousWorkerNodeKey
424 ).toBe(0)
425 // We need to clean up the resources after our test
426 await pool.destroy()
427 })
428
429 it('Verify LEAST_USED strategy default policy', async () => {
430 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
431 let pool = new FixedThreadPool(
432 max,
433 './tests/worker-files/thread/testWorker.mjs',
434 { workerChoiceStrategy }
435 )
436 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
437 dynamicWorkerUsage: false,
438 dynamicWorkerReady: true
439 })
440 await pool.destroy()
441 pool = new DynamicThreadPool(
442 min,
443 max,
444 './tests/worker-files/thread/testWorker.mjs',
445 { workerChoiceStrategy }
446 )
447 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
448 dynamicWorkerUsage: false,
449 dynamicWorkerReady: true
450 })
451 // We need to clean up the resources after our test
452 await pool.destroy()
453 })
454
455 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
456 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
457 let pool = new FixedThreadPool(
458 max,
459 './tests/worker-files/thread/testWorker.mjs',
460 { workerChoiceStrategy }
461 )
462 expect(
463 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
464 ).toStrictEqual({
465 runTime: {
466 aggregate: false,
467 average: false,
468 median: false
469 },
470 waitTime: {
471 aggregate: false,
472 average: false,
473 median: false
474 },
475 elu: {
476 aggregate: false,
477 average: false,
478 median: false
479 }
480 })
481 await pool.destroy()
482 pool = new DynamicThreadPool(
483 min,
484 max,
485 './tests/worker-files/thread/testWorker.mjs',
486 { workerChoiceStrategy }
487 )
488 expect(
489 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
490 ).toStrictEqual({
491 runTime: {
492 aggregate: false,
493 average: false,
494 median: false
495 },
496 waitTime: {
497 aggregate: false,
498 average: false,
499 median: false
500 },
501 elu: {
502 aggregate: false,
503 average: false,
504 median: false
505 }
506 })
507 // We need to clean up the resources after our test
508 await pool.destroy()
509 })
510
511 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
512 const pool = new FixedThreadPool(
513 max,
514 './tests/worker-files/thread/testWorker.mjs',
515 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
516 )
517 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
518 const promises = new Set()
519 const maxMultiplier = 2
520 for (let i = 0; i < max * maxMultiplier; i++) {
521 promises.add(pool.execute())
522 }
523 await Promise.all(promises)
524 for (const workerNode of pool.workerNodes) {
525 expect(workerNode.usage).toStrictEqual({
526 tasks: {
527 executed: expect.any(Number),
528 executing: 0,
529 queued: 0,
530 maxQueued: 0,
531 sequentiallyStolen: 0,
532 stolen: 0,
533 failed: 0
534 },
535 runTime: {
536 history: new CircularArray()
537 },
538 waitTime: {
539 history: new CircularArray()
540 },
541 elu: {
542 idle: {
543 history: new CircularArray()
544 },
545 active: {
546 history: new CircularArray()
547 }
548 }
549 })
550 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
551 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
552 max * maxMultiplier
553 )
554 }
555 expect(
556 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
557 pool.workerChoiceStrategyContext.workerChoiceStrategy
558 ).nextWorkerNodeKey
559 ).toEqual(expect.any(Number))
560 expect(
561 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
562 pool.workerChoiceStrategyContext.workerChoiceStrategy
563 ).previousWorkerNodeKey
564 ).toEqual(expect.any(Number))
565 // We need to clean up the resources after our test
566 await pool.destroy()
567 })
568
569 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
570 const pool = new DynamicThreadPool(
571 min,
572 max,
573 './tests/worker-files/thread/testWorker.mjs',
574 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
575 )
576 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
577 const promises = new Set()
578 const maxMultiplier = 2
579 for (let i = 0; i < max * maxMultiplier; i++) {
580 promises.add(pool.execute())
581 }
582 await Promise.all(promises)
583 for (const workerNode of pool.workerNodes) {
584 expect(workerNode.usage).toStrictEqual({
585 tasks: {
586 executed: expect.any(Number),
587 executing: 0,
588 queued: 0,
589 maxQueued: 0,
590 sequentiallyStolen: 0,
591 stolen: 0,
592 failed: 0
593 },
594 runTime: {
595 history: new CircularArray()
596 },
597 waitTime: {
598 history: new CircularArray()
599 },
600 elu: {
601 idle: {
602 history: new CircularArray()
603 },
604 active: {
605 history: new CircularArray()
606 }
607 }
608 })
609 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
610 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
611 max * maxMultiplier
612 )
613 }
614 expect(
615 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
616 pool.workerChoiceStrategyContext.workerChoiceStrategy
617 ).nextWorkerNodeKey
618 ).toEqual(expect.any(Number))
619 expect(
620 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
621 pool.workerChoiceStrategyContext.workerChoiceStrategy
622 ).previousWorkerNodeKey
623 ).toEqual(expect.any(Number))
624 // We need to clean up the resources after our test
625 await pool.destroy()
626 })
627
628 it('Verify LEAST_BUSY strategy default policy', async () => {
629 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
630 let pool = new FixedThreadPool(
631 max,
632 './tests/worker-files/thread/testWorker.mjs',
633 { workerChoiceStrategy }
634 )
635 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
636 dynamicWorkerUsage: false,
637 dynamicWorkerReady: true
638 })
639 await pool.destroy()
640 pool = new DynamicThreadPool(
641 min,
642 max,
643 './tests/worker-files/thread/testWorker.mjs',
644 { workerChoiceStrategy }
645 )
646 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
647 dynamicWorkerUsage: false,
648 dynamicWorkerReady: true
649 })
650 // We need to clean up the resources after our test
651 await pool.destroy()
652 })
653
654 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
655 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
656 let pool = new FixedThreadPool(
657 max,
658 './tests/worker-files/thread/testWorker.mjs',
659 { workerChoiceStrategy }
660 )
661 expect(
662 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
663 ).toStrictEqual({
664 runTime: {
665 aggregate: true,
666 average: false,
667 median: false
668 },
669 waitTime: {
670 aggregate: true,
671 average: false,
672 median: false
673 },
674 elu: {
675 aggregate: false,
676 average: false,
677 median: false
678 }
679 })
680 await pool.destroy()
681 pool = new DynamicThreadPool(
682 min,
683 max,
684 './tests/worker-files/thread/testWorker.mjs',
685 { workerChoiceStrategy }
686 )
687 expect(
688 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
689 ).toStrictEqual({
690 runTime: {
691 aggregate: true,
692 average: false,
693 median: false
694 },
695 waitTime: {
696 aggregate: true,
697 average: false,
698 median: false
699 },
700 elu: {
701 aggregate: false,
702 average: false,
703 median: false
704 }
705 })
706 // We need to clean up the resources after our test
707 await pool.destroy()
708 })
709
710 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
711 const pool = new FixedThreadPool(
712 max,
713 './tests/worker-files/thread/testWorker.mjs',
714 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
715 )
716 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
717 const promises = new Set()
718 const maxMultiplier = 2
719 for (let i = 0; i < max * maxMultiplier; i++) {
720 promises.add(pool.execute())
721 }
722 await Promise.all(promises)
723 for (const workerNode of pool.workerNodes) {
724 expect(workerNode.usage).toStrictEqual({
725 tasks: {
726 executed: expect.any(Number),
727 executing: 0,
728 queued: 0,
729 maxQueued: 0,
730 sequentiallyStolen: 0,
731 stolen: 0,
732 failed: 0
733 },
734 runTime: expect.objectContaining({
735 history: expect.any(CircularArray)
736 }),
737 waitTime: expect.objectContaining({
738 history: expect.any(CircularArray)
739 }),
740 elu: {
741 idle: {
742 history: new CircularArray()
743 },
744 active: {
745 history: new CircularArray()
746 }
747 }
748 })
749 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
750 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
751 max * maxMultiplier
752 )
753 if (workerNode.usage.runTime.aggregate == null) {
754 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
755 } else {
756 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
757 }
758 if (workerNode.usage.waitTime.aggregate == null) {
759 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
760 } else {
761 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
762 }
763 }
764 expect(
765 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
766 pool.workerChoiceStrategyContext.workerChoiceStrategy
767 ).nextWorkerNodeKey
768 ).toEqual(expect.any(Number))
769 expect(
770 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
771 pool.workerChoiceStrategyContext.workerChoiceStrategy
772 ).previousWorkerNodeKey
773 ).toEqual(expect.any(Number))
774 // We need to clean up the resources after our test
775 await pool.destroy()
776 })
777
778 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
779 const pool = new DynamicThreadPool(
780 min,
781 max,
782 './tests/worker-files/thread/testWorker.mjs',
783 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
784 )
785 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
786 const promises = new Set()
787 const maxMultiplier = 2
788 for (let i = 0; i < max * maxMultiplier; i++) {
789 promises.add(pool.execute())
790 }
791 await Promise.all(promises)
792 for (const workerNode of pool.workerNodes) {
793 expect(workerNode.usage).toStrictEqual({
794 tasks: {
795 executed: expect.any(Number),
796 executing: 0,
797 queued: 0,
798 maxQueued: 0,
799 sequentiallyStolen: 0,
800 stolen: 0,
801 failed: 0
802 },
803 runTime: expect.objectContaining({
804 history: expect.any(CircularArray)
805 }),
806 waitTime: expect.objectContaining({
807 history: expect.any(CircularArray)
808 }),
809 elu: {
810 idle: {
811 history: new CircularArray()
812 },
813 active: {
814 history: new CircularArray()
815 }
816 }
817 })
818 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
819 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
820 max * maxMultiplier
821 )
822 if (workerNode.usage.runTime.aggregate == null) {
823 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
824 } else {
825 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
826 }
827 if (workerNode.usage.waitTime.aggregate == null) {
828 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
829 } else {
830 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
831 }
832 }
833 expect(
834 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
835 pool.workerChoiceStrategyContext.workerChoiceStrategy
836 ).nextWorkerNodeKey
837 ).toEqual(expect.any(Number))
838 expect(
839 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
840 pool.workerChoiceStrategyContext.workerChoiceStrategy
841 ).previousWorkerNodeKey
842 ).toEqual(expect.any(Number))
843 // We need to clean up the resources after our test
844 await pool.destroy()
845 })
846
847 it('Verify LEAST_ELU strategy default policy', async () => {
848 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
849 let pool = new FixedThreadPool(
850 max,
851 './tests/worker-files/thread/testWorker.mjs',
852 { workerChoiceStrategy }
853 )
854 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
855 dynamicWorkerUsage: false,
856 dynamicWorkerReady: true
857 })
858 await pool.destroy()
859 pool = new DynamicThreadPool(
860 min,
861 max,
862 './tests/worker-files/thread/testWorker.mjs',
863 { workerChoiceStrategy }
864 )
865 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
866 dynamicWorkerUsage: false,
867 dynamicWorkerReady: true
868 })
869 // We need to clean up the resources after our test
870 await pool.destroy()
871 })
872
873 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
874 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
875 let pool = new FixedThreadPool(
876 max,
877 './tests/worker-files/thread/testWorker.mjs',
878 { workerChoiceStrategy }
879 )
880 expect(
881 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
882 ).toStrictEqual({
883 runTime: {
884 aggregate: false,
885 average: false,
886 median: false
887 },
888 waitTime: {
889 aggregate: false,
890 average: false,
891 median: false
892 },
893 elu: {
894 aggregate: true,
895 average: false,
896 median: false
897 }
898 })
899 await pool.destroy()
900 pool = new DynamicThreadPool(
901 min,
902 max,
903 './tests/worker-files/thread/testWorker.mjs',
904 { workerChoiceStrategy }
905 )
906 expect(
907 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
908 ).toStrictEqual({
909 runTime: {
910 aggregate: false,
911 average: false,
912 median: false
913 },
914 waitTime: {
915 aggregate: false,
916 average: false,
917 median: false
918 },
919 elu: {
920 aggregate: true,
921 average: false,
922 median: false
923 }
924 })
925 // We need to clean up the resources after our test
926 await pool.destroy()
927 })
928
929 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
930 const pool = new FixedThreadPool(
931 max,
932 './tests/worker-files/thread/testWorker.mjs',
933 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
934 )
935 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
936 const promises = new Set()
937 const maxMultiplier = 2
938 for (let i = 0; i < max * maxMultiplier; i++) {
939 promises.add(pool.execute())
940 }
941 await Promise.all(promises)
942 for (const workerNode of pool.workerNodes) {
943 expect(workerNode.usage).toStrictEqual({
944 tasks: {
945 executed: expect.any(Number),
946 executing: 0,
947 queued: 0,
948 maxQueued: 0,
949 sequentiallyStolen: 0,
950 stolen: 0,
951 failed: 0
952 },
953 runTime: {
954 history: new CircularArray()
955 },
956 waitTime: {
957 history: new CircularArray()
958 },
959 elu: expect.objectContaining({
960 idle: expect.objectContaining({
961 history: expect.any(CircularArray)
962 }),
963 active: expect.objectContaining({
964 history: expect.any(CircularArray)
965 })
966 })
967 })
968 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
969 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
970 max * maxMultiplier
971 )
972 if (workerNode.usage.elu.active.aggregate == null) {
973 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
974 } else {
975 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
976 }
977 if (workerNode.usage.elu.idle.aggregate == null) {
978 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
979 } else {
980 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
981 }
982 if (workerNode.usage.elu.utilization == null) {
983 expect(workerNode.usage.elu.utilization).toBeUndefined()
984 } else {
985 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
986 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
987 }
988 }
989 expect(
990 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
991 pool.workerChoiceStrategyContext.workerChoiceStrategy
992 ).nextWorkerNodeKey
993 ).toEqual(expect.any(Number))
994 expect(
995 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
996 pool.workerChoiceStrategyContext.workerChoiceStrategy
997 ).previousWorkerNodeKey
998 ).toEqual(expect.any(Number))
999 // We need to clean up the resources after our test
1000 await pool.destroy()
1001 })
1002
1003 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1004 const pool = new DynamicThreadPool(
1005 min,
1006 max,
1007 './tests/worker-files/thread/testWorker.mjs',
1008 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1009 )
1010 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1011 const promises = new Set()
1012 const maxMultiplier = 2
1013 for (let i = 0; i < max * maxMultiplier; i++) {
1014 promises.add(pool.execute())
1015 }
1016 await Promise.all(promises)
1017 for (const workerNode of pool.workerNodes) {
1018 expect(workerNode.usage).toStrictEqual({
1019 tasks: {
1020 executed: expect.any(Number),
1021 executing: 0,
1022 queued: 0,
1023 maxQueued: 0,
1024 sequentiallyStolen: 0,
1025 stolen: 0,
1026 failed: 0
1027 },
1028 runTime: {
1029 history: new CircularArray()
1030 },
1031 waitTime: {
1032 history: new CircularArray()
1033 },
1034 elu: expect.objectContaining({
1035 idle: expect.objectContaining({
1036 history: expect.any(CircularArray)
1037 }),
1038 active: expect.objectContaining({
1039 history: expect.any(CircularArray)
1040 })
1041 })
1042 })
1043 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1044 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1045 max * maxMultiplier
1046 )
1047 if (workerNode.usage.elu.active.aggregate == null) {
1048 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1049 } else {
1050 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1051 }
1052 if (workerNode.usage.elu.idle.aggregate == null) {
1053 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1054 } else {
1055 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1056 }
1057 if (workerNode.usage.elu.utilization == null) {
1058 expect(workerNode.usage.elu.utilization).toBeUndefined()
1059 } else {
1060 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1061 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1062 }
1063 }
1064 expect(
1065 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1066 pool.workerChoiceStrategyContext.workerChoiceStrategy
1067 ).nextWorkerNodeKey
1068 ).toEqual(expect.any(Number))
1069 expect(
1070 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1071 pool.workerChoiceStrategyContext.workerChoiceStrategy
1072 ).previousWorkerNodeKey
1073 ).toEqual(expect.any(Number))
1074 // We need to clean up the resources after our test
1075 await pool.destroy()
1076 })
1077
1078 it('Verify FAIR_SHARE strategy default policy', async () => {
1079 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1080 let pool = new FixedThreadPool(
1081 max,
1082 './tests/worker-files/thread/testWorker.mjs',
1083 { workerChoiceStrategy }
1084 )
1085 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1086 dynamicWorkerUsage: false,
1087 dynamicWorkerReady: true
1088 })
1089 await pool.destroy()
1090 pool = new DynamicThreadPool(
1091 min,
1092 max,
1093 './tests/worker-files/thread/testWorker.mjs',
1094 { workerChoiceStrategy }
1095 )
1096 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1097 dynamicWorkerUsage: false,
1098 dynamicWorkerReady: true
1099 })
1100 // We need to clean up the resources after our test
1101 await pool.destroy()
1102 })
1103
1104 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1105 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1106 let pool = new FixedThreadPool(
1107 max,
1108 './tests/worker-files/thread/testWorker.mjs',
1109 { workerChoiceStrategy }
1110 )
1111 expect(
1112 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1113 ).toStrictEqual({
1114 runTime: {
1115 aggregate: true,
1116 average: true,
1117 median: false
1118 },
1119 waitTime: {
1120 aggregate: false,
1121 average: false,
1122 median: false
1123 },
1124 elu: {
1125 aggregate: true,
1126 average: true,
1127 median: false
1128 }
1129 })
1130 await pool.destroy()
1131 pool = new DynamicThreadPool(
1132 min,
1133 max,
1134 './tests/worker-files/thread/testWorker.mjs',
1135 { workerChoiceStrategy }
1136 )
1137 expect(
1138 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1139 ).toStrictEqual({
1140 runTime: {
1141 aggregate: true,
1142 average: true,
1143 median: false
1144 },
1145 waitTime: {
1146 aggregate: false,
1147 average: false,
1148 median: false
1149 },
1150 elu: {
1151 aggregate: true,
1152 average: true,
1153 median: false
1154 }
1155 })
1156 // We need to clean up the resources after our test
1157 await pool.destroy()
1158 })
1159
1160 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1161 const pool = new FixedThreadPool(
1162 max,
1163 './tests/worker-files/thread/testWorker.mjs',
1164 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1165 )
1166 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1167 const promises = new Set()
1168 const maxMultiplier = 2
1169 for (let i = 0; i < max * maxMultiplier; i++) {
1170 promises.add(pool.execute())
1171 }
1172 await Promise.all(promises)
1173 for (const workerNode of pool.workerNodes) {
1174 expect(workerNode.usage).toStrictEqual({
1175 tasks: {
1176 executed: expect.any(Number),
1177 executing: 0,
1178 queued: 0,
1179 maxQueued: 0,
1180 sequentiallyStolen: 0,
1181 stolen: 0,
1182 failed: 0
1183 },
1184 runTime: expect.objectContaining({
1185 history: expect.any(CircularArray)
1186 }),
1187 waitTime: {
1188 history: new CircularArray()
1189 },
1190 elu: expect.objectContaining({
1191 idle: expect.objectContaining({
1192 history: expect.any(CircularArray)
1193 }),
1194 active: expect.objectContaining({
1195 history: expect.any(CircularArray)
1196 })
1197 })
1198 })
1199 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1200 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1201 max * maxMultiplier
1202 )
1203 if (workerNode.usage.runTime.aggregate == null) {
1204 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1205 } else {
1206 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1207 }
1208 if (workerNode.usage.runTime.average == null) {
1209 expect(workerNode.usage.runTime.average).toBeUndefined()
1210 } else {
1211 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1212 }
1213 if (workerNode.usage.elu.active.aggregate == null) {
1214 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1215 } else {
1216 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1217 }
1218 if (workerNode.usage.elu.idle.aggregate == null) {
1219 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1220 } else {
1221 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1222 }
1223 if (workerNode.usage.elu.utilization == null) {
1224 expect(workerNode.usage.elu.utilization).toBeUndefined()
1225 } else {
1226 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1227 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1228 }
1229 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1230 }
1231 expect(
1232 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1233 pool.workerChoiceStrategyContext.workerChoiceStrategy
1234 ).nextWorkerNodeKey
1235 ).toEqual(expect.any(Number))
1236 expect(
1237 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1238 pool.workerChoiceStrategyContext.workerChoiceStrategy
1239 ).previousWorkerNodeKey
1240 ).toEqual(expect.any(Number))
1241 // We need to clean up the resources after our test
1242 await pool.destroy()
1243 })
1244
1245 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1246 const pool = new DynamicThreadPool(
1247 min,
1248 max,
1249 './tests/worker-files/thread/testWorker.mjs',
1250 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1251 )
1252 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1253 const promises = new Set()
1254 const maxMultiplier = 2
1255 for (let i = 0; i < max * maxMultiplier; i++) {
1256 promises.add(pool.execute())
1257 }
1258 await Promise.all(promises)
1259 for (const workerNode of pool.workerNodes) {
1260 expect(workerNode.usage).toStrictEqual({
1261 tasks: {
1262 executed: expect.any(Number),
1263 executing: 0,
1264 queued: 0,
1265 maxQueued: 0,
1266 sequentiallyStolen: 0,
1267 stolen: 0,
1268 failed: 0
1269 },
1270 runTime: expect.objectContaining({
1271 history: expect.any(CircularArray)
1272 }),
1273 waitTime: {
1274 history: new CircularArray()
1275 },
1276 elu: expect.objectContaining({
1277 idle: expect.objectContaining({
1278 history: expect.any(CircularArray)
1279 }),
1280 active: expect.objectContaining({
1281 history: expect.any(CircularArray)
1282 })
1283 })
1284 })
1285 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1286 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1287 max * maxMultiplier
1288 )
1289 if (workerNode.usage.runTime.aggregate == null) {
1290 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1291 } else {
1292 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1293 }
1294 if (workerNode.usage.runTime.average == null) {
1295 expect(workerNode.usage.runTime.average).toBeUndefined()
1296 } else {
1297 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1298 }
1299 if (workerNode.usage.elu.active.aggregate == null) {
1300 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1301 } else {
1302 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1303 }
1304 if (workerNode.usage.elu.idle.aggregate == null) {
1305 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1306 } else {
1307 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1308 }
1309 if (workerNode.usage.elu.utilization == null) {
1310 expect(workerNode.usage.elu.utilization).toBeUndefined()
1311 } else {
1312 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1313 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1314 }
1315 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1316 }
1317 expect(
1318 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1319 pool.workerChoiceStrategyContext.workerChoiceStrategy
1320 ).nextWorkerNodeKey
1321 ).toEqual(expect.any(Number))
1322 expect(
1323 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1324 pool.workerChoiceStrategyContext.workerChoiceStrategy
1325 ).previousWorkerNodeKey
1326 ).toEqual(expect.any(Number))
1327 // We need to clean up the resources after our test
1328 await pool.destroy()
1329 })
1330
1331 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1332 const pool = new DynamicThreadPool(
1333 min,
1334 max,
1335 './tests/worker-files/thread/testWorker.mjs',
1336 {
1337 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1338 workerChoiceStrategyOptions: {
1339 runTime: { median: true }
1340 }
1341 }
1342 )
1343 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1344 const promises = new Set()
1345 const maxMultiplier = 2
1346 for (let i = 0; i < max * maxMultiplier; i++) {
1347 promises.add(pool.execute())
1348 }
1349 await Promise.all(promises)
1350 for (const workerNode of pool.workerNodes) {
1351 expect(workerNode.usage).toStrictEqual({
1352 tasks: {
1353 executed: expect.any(Number),
1354 executing: 0,
1355 queued: 0,
1356 maxQueued: 0,
1357 sequentiallyStolen: 0,
1358 stolen: 0,
1359 failed: 0
1360 },
1361 runTime: expect.objectContaining({
1362 history: expect.any(CircularArray)
1363 }),
1364 waitTime: {
1365 history: new CircularArray()
1366 },
1367 elu: expect.objectContaining({
1368 idle: expect.objectContaining({
1369 history: expect.any(CircularArray)
1370 }),
1371 active: expect.objectContaining({
1372 history: expect.any(CircularArray)
1373 })
1374 })
1375 })
1376 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1377 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1378 max * maxMultiplier
1379 )
1380 if (workerNode.usage.runTime.aggregate == null) {
1381 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1382 } else {
1383 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1384 }
1385 if (workerNode.usage.runTime.median == null) {
1386 expect(workerNode.usage.runTime.median).toBeUndefined()
1387 } else {
1388 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1389 }
1390 if (workerNode.usage.elu.active.aggregate == null) {
1391 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1392 } else {
1393 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1394 }
1395 if (workerNode.usage.elu.idle.aggregate == null) {
1396 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1397 } else {
1398 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1399 }
1400 if (workerNode.usage.elu.utilization == null) {
1401 expect(workerNode.usage.elu.utilization).toBeUndefined()
1402 } else {
1403 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1404 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1405 }
1406 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1407 }
1408 expect(
1409 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1410 pool.workerChoiceStrategyContext.workerChoiceStrategy
1411 ).nextWorkerNodeKey
1412 ).toEqual(expect.any(Number))
1413 expect(
1414 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1415 pool.workerChoiceStrategyContext.workerChoiceStrategy
1416 ).previousWorkerNodeKey
1417 ).toEqual(expect.any(Number))
1418 // We need to clean up the resources after our test
1419 await pool.destroy()
1420 })
1421
1422 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1423 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1424 let pool = new FixedThreadPool(
1425 max,
1426 './tests/worker-files/thread/testWorker.mjs'
1427 )
1428 for (const workerNode of pool.workerNodes) {
1429 workerNode.strategyData = {
1430 virtualTaskEndTimestamp: performance.now()
1431 }
1432 }
1433 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1434 for (const workerNode of pool.workerNodes) {
1435 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1436 }
1437 await pool.destroy()
1438 pool = new DynamicThreadPool(
1439 min,
1440 max,
1441 './tests/worker-files/thread/testWorker.mjs'
1442 )
1443 for (const workerNode of pool.workerNodes) {
1444 workerNode.strategyData = {
1445 virtualTaskEndTimestamp: performance.now()
1446 }
1447 }
1448 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1449 for (const workerNode of pool.workerNodes) {
1450 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1451 }
1452 // We need to clean up the resources after our test
1453 await pool.destroy()
1454 })
1455
1456 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1457 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1458 let pool = new FixedThreadPool(
1459 max,
1460 './tests/worker-files/thread/testWorker.mjs',
1461 { workerChoiceStrategy }
1462 )
1463 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1464 dynamicWorkerUsage: false,
1465 dynamicWorkerReady: true
1466 })
1467 await pool.destroy()
1468 pool = new DynamicThreadPool(
1469 min,
1470 max,
1471 './tests/worker-files/thread/testWorker.mjs',
1472 { workerChoiceStrategy }
1473 )
1474 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1475 dynamicWorkerUsage: false,
1476 dynamicWorkerReady: true
1477 })
1478 // We need to clean up the resources after our test
1479 await pool.destroy()
1480 })
1481
1482 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1483 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1484 let pool = new FixedThreadPool(
1485 max,
1486 './tests/worker-files/thread/testWorker.mjs',
1487 { workerChoiceStrategy }
1488 )
1489 expect(
1490 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1491 ).toStrictEqual({
1492 runTime: {
1493 aggregate: true,
1494 average: true,
1495 median: false
1496 },
1497 waitTime: {
1498 aggregate: false,
1499 average: false,
1500 median: false
1501 },
1502 elu: {
1503 aggregate: false,
1504 average: false,
1505 median: false
1506 }
1507 })
1508 await pool.destroy()
1509 pool = new DynamicThreadPool(
1510 min,
1511 max,
1512 './tests/worker-files/thread/testWorker.mjs',
1513 { workerChoiceStrategy }
1514 )
1515 expect(
1516 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1517 ).toStrictEqual({
1518 runTime: {
1519 aggregate: true,
1520 average: true,
1521 median: false
1522 },
1523 waitTime: {
1524 aggregate: false,
1525 average: false,
1526 median: false
1527 },
1528 elu: {
1529 aggregate: false,
1530 average: false,
1531 median: false
1532 }
1533 })
1534 // We need to clean up the resources after our test
1535 await pool.destroy()
1536 })
1537
1538 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1539 const pool = new FixedThreadPool(
1540 max,
1541 './tests/worker-files/thread/testWorker.mjs',
1542 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1543 )
1544 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1545 const promises = new Set()
1546 const maxMultiplier = 2
1547 for (let i = 0; i < max * maxMultiplier; i++) {
1548 promises.add(pool.execute())
1549 }
1550 await Promise.all(promises)
1551 for (const workerNode of pool.workerNodes) {
1552 expect(workerNode.usage).toStrictEqual({
1553 tasks: {
1554 executed: expect.any(Number),
1555 executing: 0,
1556 queued: 0,
1557 maxQueued: 0,
1558 sequentiallyStolen: 0,
1559 stolen: 0,
1560 failed: 0
1561 },
1562 runTime: expect.objectContaining({
1563 history: expect.any(CircularArray)
1564 }),
1565 waitTime: {
1566 history: new CircularArray()
1567 },
1568 elu: {
1569 idle: {
1570 history: new CircularArray()
1571 },
1572 active: {
1573 history: new CircularArray()
1574 }
1575 }
1576 })
1577 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1578 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1579 max * maxMultiplier
1580 )
1581 if (workerNode.usage.runTime.aggregate == null) {
1582 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1583 } else {
1584 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1585 }
1586 if (workerNode.usage.runTime.average == null) {
1587 expect(workerNode.usage.runTime.average).toBeUndefined()
1588 } else {
1589 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1590 }
1591 }
1592 expect(
1593 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1594 pool.workerChoiceStrategyContext.workerChoiceStrategy
1595 ).nextWorkerNodeKey
1596 ).toBe(0)
1597 expect(
1598 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1599 pool.workerChoiceStrategyContext.workerChoiceStrategy
1600 ).previousWorkerNodeKey
1601 ).toEqual(0)
1602 expect(
1603 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1604 pool.workerChoiceStrategyContext.workerChoiceStrategy
1605 ).workerNodeVirtualTaskRunTime
1606 ).toBeGreaterThanOrEqual(0)
1607 // We need to clean up the resources after our test
1608 await pool.destroy()
1609 })
1610
1611 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1612 const pool = new DynamicThreadPool(
1613 min,
1614 max,
1615 './tests/worker-files/thread/testWorker.mjs',
1616 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1617 )
1618 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1619 const promises = new Set()
1620 const maxMultiplier = 2
1621 for (let i = 0; i < max * maxMultiplier; i++) {
1622 promises.add(pool.execute())
1623 }
1624 await Promise.all(promises)
1625 for (const workerNode of pool.workerNodes) {
1626 expect(workerNode.usage).toStrictEqual({
1627 tasks: {
1628 executed: expect.any(Number),
1629 executing: 0,
1630 queued: 0,
1631 maxQueued: 0,
1632 sequentiallyStolen: 0,
1633 stolen: 0,
1634 failed: 0
1635 },
1636 runTime: expect.objectContaining({
1637 history: expect.any(CircularArray)
1638 }),
1639 waitTime: {
1640 history: new CircularArray()
1641 },
1642 elu: {
1643 idle: {
1644 history: new CircularArray()
1645 },
1646 active: {
1647 history: new CircularArray()
1648 }
1649 }
1650 })
1651 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1652 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1653 max * maxMultiplier
1654 )
1655 if (workerNode.usage.runTime.aggregate == null) {
1656 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1657 } else {
1658 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1659 }
1660 if (workerNode.usage.runTime.average == null) {
1661 expect(workerNode.usage.runTime.average).toBeUndefined()
1662 } else {
1663 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1664 }
1665 }
1666 expect(
1667 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1668 pool.workerChoiceStrategyContext.workerChoiceStrategy
1669 ).nextWorkerNodeKey
1670 ).toEqual(0)
1671 expect(
1672 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1673 pool.workerChoiceStrategyContext.workerChoiceStrategy
1674 ).previousWorkerNodeKey
1675 ).toEqual(0)
1676 expect(
1677 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1678 pool.workerChoiceStrategyContext.workerChoiceStrategy
1679 ).workerNodeVirtualTaskRunTime
1680 ).toBeGreaterThanOrEqual(0)
1681 // We need to clean up the resources after our test
1682 await pool.destroy()
1683 })
1684
1685 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1686 const pool = new DynamicThreadPool(
1687 min,
1688 max,
1689 './tests/worker-files/thread/testWorker.mjs',
1690 {
1691 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1692 workerChoiceStrategyOptions: {
1693 runTime: { median: true }
1694 }
1695 }
1696 )
1697 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1698 const promises = new Set()
1699 const maxMultiplier = 2
1700 for (let i = 0; i < max * maxMultiplier; i++) {
1701 promises.add(pool.execute())
1702 }
1703 await Promise.all(promises)
1704 for (const workerNode of pool.workerNodes) {
1705 expect(workerNode.usage).toStrictEqual({
1706 tasks: {
1707 executed: expect.any(Number),
1708 executing: 0,
1709 queued: 0,
1710 maxQueued: 0,
1711 sequentiallyStolen: 0,
1712 stolen: 0,
1713 failed: 0
1714 },
1715 runTime: expect.objectContaining({
1716 history: expect.any(CircularArray)
1717 }),
1718 waitTime: {
1719 history: new CircularArray()
1720 },
1721 elu: {
1722 idle: {
1723 history: new CircularArray()
1724 },
1725 active: {
1726 history: new CircularArray()
1727 }
1728 }
1729 })
1730 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1731 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1732 max * maxMultiplier
1733 )
1734 if (workerNode.usage.runTime.aggregate == null) {
1735 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1736 } else {
1737 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1738 }
1739 if (workerNode.usage.runTime.median == null) {
1740 expect(workerNode.usage.runTime.median).toBeUndefined()
1741 } else {
1742 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1743 }
1744 }
1745 expect(
1746 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1747 pool.workerChoiceStrategyContext.workerChoiceStrategy
1748 ).nextWorkerNodeKey
1749 ).toEqual(0)
1750 expect(
1751 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1752 pool.workerChoiceStrategyContext.workerChoiceStrategy
1753 ).previousWorkerNodeKey
1754 ).toEqual(0)
1755 expect(
1756 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1757 pool.workerChoiceStrategyContext.workerChoiceStrategy
1758 ).workerNodeVirtualTaskRunTime
1759 ).toBeGreaterThanOrEqual(0)
1760 // We need to clean up the resources after our test
1761 await pool.destroy()
1762 })
1763
1764 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1765 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1766 let pool = new FixedThreadPool(
1767 max,
1768 './tests/worker-files/thread/testWorker.mjs'
1769 )
1770 expect(
1771 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1772 workerChoiceStrategy
1773 ).nextWorkerNodeKey
1774 ).toBeDefined()
1775 expect(
1776 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1777 workerChoiceStrategy
1778 ).previousWorkerNodeKey
1779 ).toBeDefined()
1780 expect(
1781 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1782 workerChoiceStrategy
1783 ).workerNodeVirtualTaskRunTime
1784 ).toBeDefined()
1785 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1786 expect(
1787 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1788 pool.workerChoiceStrategyContext.workerChoiceStrategy
1789 ).nextWorkerNodeKey
1790 ).toBe(0)
1791 expect(
1792 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1793 pool.workerChoiceStrategyContext.workerChoiceStrategy
1794 ).previousWorkerNodeKey
1795 ).toBe(0)
1796 expect(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1798 pool.workerChoiceStrategyContext.workerChoiceStrategy
1799 ).workerNodeVirtualTaskRunTime
1800 ).toBe(0)
1801 await pool.destroy()
1802 pool = new DynamicThreadPool(
1803 min,
1804 max,
1805 './tests/worker-files/thread/testWorker.mjs'
1806 )
1807 expect(
1808 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1809 workerChoiceStrategy
1810 ).nextWorkerNodeKey
1811 ).toBeDefined()
1812 expect(
1813 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1814 workerChoiceStrategy
1815 ).previousWorkerNodeKey
1816 ).toBeDefined()
1817 expect(
1818 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1819 workerChoiceStrategy
1820 ).workerNodeVirtualTaskRunTime
1821 ).toBeDefined()
1822 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1823 expect(
1824 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1825 pool.workerChoiceStrategyContext.workerChoiceStrategy
1826 ).nextWorkerNodeKey
1827 ).toBe(0)
1828 expect(
1829 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1830 pool.workerChoiceStrategyContext.workerChoiceStrategy
1831 ).previousWorkerNodeKey
1832 ).toBe(0)
1833 expect(
1834 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1835 pool.workerChoiceStrategyContext.workerChoiceStrategy
1836 ).workerNodeVirtualTaskRunTime
1837 ).toBe(0)
1838 // We need to clean up the resources after our test
1839 await pool.destroy()
1840 })
1841
1842 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1843 const workerChoiceStrategy =
1844 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1845 let pool = new FixedThreadPool(
1846 max,
1847 './tests/worker-files/thread/testWorker.mjs',
1848 { workerChoiceStrategy }
1849 )
1850 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1851 dynamicWorkerUsage: false,
1852 dynamicWorkerReady: true
1853 })
1854 await pool.destroy()
1855 pool = new DynamicThreadPool(
1856 min,
1857 max,
1858 './tests/worker-files/thread/testWorker.mjs',
1859 { workerChoiceStrategy }
1860 )
1861 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1862 dynamicWorkerUsage: false,
1863 dynamicWorkerReady: true
1864 })
1865 // We need to clean up the resources after our test
1866 await pool.destroy()
1867 })
1868
1869 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1870 const workerChoiceStrategy =
1871 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1872 let pool = new FixedThreadPool(
1873 max,
1874 './tests/worker-files/thread/testWorker.mjs',
1875 { workerChoiceStrategy }
1876 )
1877 expect(
1878 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1879 ).toStrictEqual({
1880 runTime: {
1881 aggregate: true,
1882 average: true,
1883 median: false
1884 },
1885 waitTime: {
1886 aggregate: false,
1887 average: false,
1888 median: false
1889 },
1890 elu: {
1891 aggregate: false,
1892 average: false,
1893 median: false
1894 }
1895 })
1896 await pool.destroy()
1897 pool = new DynamicThreadPool(
1898 min,
1899 max,
1900 './tests/worker-files/thread/testWorker.mjs',
1901 { workerChoiceStrategy }
1902 )
1903 expect(
1904 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1905 ).toStrictEqual({
1906 runTime: {
1907 aggregate: true,
1908 average: true,
1909 median: false
1910 },
1911 waitTime: {
1912 aggregate: false,
1913 average: false,
1914 median: false
1915 },
1916 elu: {
1917 aggregate: false,
1918 average: false,
1919 median: false
1920 }
1921 })
1922 // We need to clean up the resources after our test
1923 await pool.destroy()
1924 })
1925
1926 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1927 const pool = new FixedThreadPool(
1928 max,
1929 './tests/worker-files/thread/testWorker.mjs',
1930 {
1931 workerChoiceStrategy:
1932 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1933 }
1934 )
1935 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1936 const promises = new Set()
1937 const maxMultiplier = 2
1938 for (let i = 0; i < max * maxMultiplier; i++) {
1939 promises.add(pool.execute())
1940 }
1941 await Promise.all(promises)
1942 for (const workerNode of pool.workerNodes) {
1943 expect(workerNode.usage).toStrictEqual({
1944 tasks: {
1945 executed: expect.any(Number),
1946 executing: 0,
1947 queued: 0,
1948 maxQueued: 0,
1949 sequentiallyStolen: 0,
1950 stolen: 0,
1951 failed: 0
1952 },
1953 runTime: expect.objectContaining({
1954 history: expect.any(CircularArray)
1955 }),
1956 waitTime: {
1957 history: new CircularArray()
1958 },
1959 elu: {
1960 idle: {
1961 history: new CircularArray()
1962 },
1963 active: {
1964 history: new CircularArray()
1965 }
1966 }
1967 })
1968 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1969 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1970 max * maxMultiplier
1971 )
1972 }
1973 expect(
1974 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1975 pool.workerChoiceStrategyContext.workerChoiceStrategy
1976 ).roundId
1977 ).toBe(0)
1978 expect(
1979 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1980 pool.workerChoiceStrategyContext.workerChoiceStrategy
1981 ).workerNodeId
1982 ).toBe(0)
1983 expect(
1984 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1985 pool.workerChoiceStrategyContext.workerChoiceStrategy
1986 ).nextWorkerNodeKey
1987 ).toBe(0)
1988 expect(
1989 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1990 pool.workerChoiceStrategyContext.workerChoiceStrategy
1991 ).previousWorkerNodeKey
1992 ).toEqual(0)
1993 expect(
1994 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1995 pool.workerChoiceStrategyContext.workerChoiceStrategy
1996 ).roundWeights.length
1997 ).toBe(1)
1998 expect(
1999 Number.isSafeInteger(
2000 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2001 pool.workerChoiceStrategyContext.workerChoiceStrategy
2002 ).roundWeights[0]
2003 )
2004 ).toBe(true)
2005 // We need to clean up the resources after our test
2006 await pool.destroy()
2007 })
2008
2009 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2010 const pool = new DynamicThreadPool(
2011 min,
2012 max,
2013 './tests/worker-files/thread/testWorker.mjs',
2014 {
2015 workerChoiceStrategy:
2016 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2017 }
2018 )
2019 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2020 const promises = new Set()
2021 const maxMultiplier = 2
2022 for (let i = 0; i < max * maxMultiplier; i++) {
2023 promises.add(pool.execute())
2024 }
2025 await Promise.all(promises)
2026 for (const workerNode of pool.workerNodes) {
2027 expect(workerNode.usage).toStrictEqual({
2028 tasks: {
2029 executed: expect.any(Number),
2030 executing: 0,
2031 queued: 0,
2032 maxQueued: 0,
2033 sequentiallyStolen: 0,
2034 stolen: 0,
2035 failed: 0
2036 },
2037 runTime: expect.objectContaining({
2038 history: expect.any(CircularArray)
2039 }),
2040 waitTime: {
2041 history: new CircularArray()
2042 },
2043 elu: {
2044 idle: {
2045 history: new CircularArray()
2046 },
2047 active: {
2048 history: new CircularArray()
2049 }
2050 }
2051 })
2052 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2054 max * maxMultiplier
2055 )
2056 }
2057 expect(
2058 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2059 pool.workerChoiceStrategyContext.workerChoiceStrategy
2060 ).roundId
2061 ).toBe(0)
2062 expect(
2063 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2064 pool.workerChoiceStrategyContext.workerChoiceStrategy
2065 ).workerNodeId
2066 ).toBe(0)
2067 expect(
2068 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2069 pool.workerChoiceStrategyContext.workerChoiceStrategy
2070 ).nextWorkerNodeKey
2071 ).toBe(0)
2072 expect(
2073 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2074 pool.workerChoiceStrategyContext.workerChoiceStrategy
2075 ).previousWorkerNodeKey
2076 ).toEqual(0)
2077 expect(
2078 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2079 pool.workerChoiceStrategyContext.workerChoiceStrategy
2080 ).roundWeights.length
2081 ).toBe(1)
2082 expect(
2083 Number.isSafeInteger(
2084 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2085 pool.workerChoiceStrategyContext.workerChoiceStrategy
2086 ).roundWeights[0]
2087 )
2088 ).toBe(true)
2089 // We need to clean up the resources after our test
2090 await pool.destroy()
2091 })
2092
2093 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2094 const workerChoiceStrategy =
2095 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2096 let pool = new FixedThreadPool(
2097 max,
2098 './tests/worker-files/thread/testWorker.mjs'
2099 )
2100 expect(
2101 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2102 workerChoiceStrategy
2103 ).roundId
2104 ).toBeDefined()
2105 expect(
2106 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2107 workerChoiceStrategy
2108 ).workerNodeId
2109 ).toBeDefined()
2110 expect(
2111 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2112 workerChoiceStrategy
2113 ).nextWorkerNodeKey
2114 ).toBeDefined()
2115 expect(
2116 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2117 workerChoiceStrategy
2118 ).previousWorkerNodeKey
2119 ).toBeDefined()
2120 expect(
2121 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2122 workerChoiceStrategy
2123 ).roundWeights
2124 ).toBeDefined()
2125 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2126 expect(
2127 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2128 pool.workerChoiceStrategyContext.workerChoiceStrategy
2129 ).roundId
2130 ).toBe(0)
2131 expect(
2132 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2133 pool.workerChoiceStrategyContext.workerChoiceStrategy
2134 ).workerNodeId
2135 ).toBe(0)
2136 expect(
2137 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2138 pool.workerChoiceStrategyContext.workerChoiceStrategy
2139 ).nextWorkerNodeKey
2140 ).toBe(0)
2141 expect(
2142 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2143 pool.workerChoiceStrategyContext.workerChoiceStrategy
2144 ).previousWorkerNodeKey
2145 ).toBe(0)
2146 expect(
2147 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2148 pool.workerChoiceStrategyContext.workerChoiceStrategy
2149 ).roundWeights.length
2150 ).toBe(1)
2151 expect(
2152 Number.isSafeInteger(
2153 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2154 pool.workerChoiceStrategyContext.workerChoiceStrategy
2155 ).roundWeights[0]
2156 )
2157 ).toBe(true)
2158 await pool.destroy()
2159 pool = new DynamicThreadPool(
2160 min,
2161 max,
2162 './tests/worker-files/thread/testWorker.mjs'
2163 )
2164 expect(
2165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2166 workerChoiceStrategy
2167 ).roundId
2168 ).toBeDefined()
2169 expect(
2170 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2171 workerChoiceStrategy
2172 ).workerNodeId
2173 ).toBeDefined()
2174 expect(
2175 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2176 workerChoiceStrategy
2177 ).nextWorkerNodeKey
2178 ).toBeDefined()
2179 expect(
2180 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2181 workerChoiceStrategy
2182 ).previousWorkerNodeKey
2183 ).toBeDefined()
2184 expect(
2185 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2186 workerChoiceStrategy
2187 ).roundWeights
2188 ).toBeDefined()
2189 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2190 expect(
2191 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2192 pool.workerChoiceStrategyContext.workerChoiceStrategy
2193 ).roundId
2194 ).toBe(0)
2195 expect(
2196 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2197 pool.workerChoiceStrategyContext.workerChoiceStrategy
2198 ).workerNodeId
2199 ).toBe(0)
2200 expect(
2201 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2202 pool.workerChoiceStrategyContext.workerChoiceStrategy
2203 ).nextWorkerNodeKey
2204 ).toBe(0)
2205 expect(
2206 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2207 pool.workerChoiceStrategyContext.workerChoiceStrategy
2208 ).previousWorkerNodeKey
2209 ).toBe(0)
2210 expect(
2211 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2212 pool.workerChoiceStrategyContext.workerChoiceStrategy
2213 ).roundWeights.length
2214 ).toBe(1)
2215 expect(
2216 Number.isSafeInteger(
2217 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2218 pool.workerChoiceStrategyContext.workerChoiceStrategy
2219 ).roundWeights[0]
2220 )
2221 ).toBe(true)
2222 // We need to clean up the resources after our test
2223 await pool.destroy()
2224 })
2225
2226 it('Verify unknown strategy throw error', () => {
2227 expect(
2228 () =>
2229 new DynamicThreadPool(
2230 min,
2231 max,
2232 './tests/worker-files/thread/testWorker.mjs',
2233 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2234 )
2235 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2236 })
2237 })