a13d1dade9d0c704e0d7025cf1596cbec739d49f
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
70 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
71 retries:
72 pool.info.maxSize +
73 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
74 runTime: { median: false },
75 waitTime: { median: false },
76 elu: { median: false },
77 weights: expect.objectContaining({
78 0: expect.any(Number),
79 [pool.info.maxSize - 1]: expect.any(Number)
80 })
81 })
82 await pool.destroy()
83 }
84 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
85 const pool = new DynamicClusterPool(
86 min,
87 max,
88 './tests/worker-files/cluster/testWorker.js'
89 )
90 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
91 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
92 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
93 workerChoiceStrategy
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
96 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
97 retries:
98 pool.info.maxSize +
99 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
100 runTime: { median: false },
101 waitTime: { median: false },
102 elu: { median: false },
103 weights: expect.objectContaining({
104 0: expect.any(Number),
105 [pool.info.maxSize - 1]: expect.any(Number)
106 })
107 })
108 await pool.destroy()
109 }
110 })
111
112 it('Verify available strategies default internals at pool creation', async () => {
113 const pool = new FixedThreadPool(
114 max,
115 './tests/worker-files/thread/testWorker.mjs'
116 )
117 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
118 expect(
119 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
120 workerChoiceStrategy
121 ).nextWorkerNodeKey
122 ).toBe(0)
123 expect(
124 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
125 workerChoiceStrategy
126 ).previousWorkerNodeKey
127 ).toBe(0)
128 if (
129 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
130 ) {
131 expect(
132 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
133 workerChoiceStrategy
134 ).workerNodeVirtualTaskRunTime
135 ).toBe(0)
136 } else if (
137 workerChoiceStrategy ===
138 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
139 ) {
140 expect(
141 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
142 workerChoiceStrategy
143 ).workerNodeVirtualTaskRunTime
144 ).toBe(0)
145 expect(
146 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
147 workerChoiceStrategy
148 ).roundId
149 ).toBe(0)
150 expect(
151 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
152 workerChoiceStrategy
153 ).workerNodeId
154 ).toBe(0)
155 expect(
156 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
157 workerChoiceStrategy
158 ).roundWeights.length
159 ).toBe(1)
160 expect(
161 Number.isSafeInteger(
162 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
163 workerChoiceStrategy
164 ).roundWeights[0]
165 )
166 ).toBe(true)
167 }
168 }
169 await pool.destroy()
170 })
171
172 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
173 const pool = new DynamicThreadPool(
174 min,
175 max,
176 './tests/worker-files/thread/testWorker.mjs'
177 )
178 expect(pool.starting).toBe(false)
179 expect(pool.workerNodes.length).toBe(min)
180 const maxMultiplier = 10000
181 const promises = new Set()
182 for (let i = 0; i < max * maxMultiplier; i++) {
183 promises.add(pool.execute())
184 }
185 await Promise.all(promises)
186 expect(pool.workerNodes.length).toBe(max)
187 // We need to clean up the resources after our test
188 await pool.destroy()
189 })
190
191 it('Verify ROUND_ROBIN strategy default policy', async () => {
192 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
193 let pool = new FixedThreadPool(
194 max,
195 './tests/worker-files/thread/testWorker.mjs',
196 { workerChoiceStrategy }
197 )
198 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
199 dynamicWorkerUsage: false,
200 dynamicWorkerReady: true
201 })
202 await pool.destroy()
203 pool = new DynamicThreadPool(
204 min,
205 max,
206 './tests/worker-files/thread/testWorker.mjs',
207 { workerChoiceStrategy }
208 )
209 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
210 dynamicWorkerUsage: false,
211 dynamicWorkerReady: true
212 })
213 // We need to clean up the resources after our test
214 await pool.destroy()
215 })
216
217 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
218 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
219 let pool = new FixedThreadPool(
220 max,
221 './tests/worker-files/thread/testWorker.mjs',
222 { workerChoiceStrategy }
223 )
224 expect(
225 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
226 ).toStrictEqual({
227 runTime: {
228 aggregate: false,
229 average: false,
230 median: false
231 },
232 waitTime: {
233 aggregate: false,
234 average: false,
235 median: false
236 },
237 elu: {
238 aggregate: false,
239 average: false,
240 median: false
241 }
242 })
243 await pool.destroy()
244 pool = new DynamicThreadPool(
245 min,
246 max,
247 './tests/worker-files/thread/testWorker.mjs',
248 { workerChoiceStrategy }
249 )
250 expect(
251 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
252 ).toStrictEqual({
253 runTime: {
254 aggregate: false,
255 average: false,
256 median: false
257 },
258 waitTime: {
259 aggregate: false,
260 average: false,
261 median: false
262 },
263 elu: {
264 aggregate: false,
265 average: false,
266 median: false
267 }
268 })
269 // We need to clean up the resources after our test
270 await pool.destroy()
271 })
272
273 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
274 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
275 const pool = new FixedThreadPool(
276 max,
277 './tests/worker-files/thread/testWorker.mjs',
278 { workerChoiceStrategy }
279 )
280 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
281 const promises = new Set()
282 const maxMultiplier = 2
283 for (let i = 0; i < max * maxMultiplier; i++) {
284 promises.add(pool.execute())
285 }
286 await Promise.all(promises)
287 for (const workerNode of pool.workerNodes) {
288 expect(workerNode.usage).toStrictEqual({
289 tasks: {
290 executed: maxMultiplier,
291 executing: 0,
292 queued: 0,
293 maxQueued: 0,
294 sequentiallyStolen: 0,
295 stolen: 0,
296 failed: 0
297 },
298 runTime: {
299 history: new CircularArray()
300 },
301 waitTime: {
302 history: new CircularArray()
303 },
304 elu: {
305 idle: {
306 history: new CircularArray()
307 },
308 active: {
309 history: new CircularArray()
310 }
311 }
312 })
313 }
314 expect(
315 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
316 pool.workerChoiceStrategyContext.workerChoiceStrategy
317 ).nextWorkerNodeKey
318 ).toBe(0)
319 expect(
320 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
321 pool.workerChoiceStrategyContext.workerChoiceStrategy
322 ).previousWorkerNodeKey
323 ).toBe(pool.workerNodes.length - 1)
324 // We need to clean up the resources after our test
325 await pool.destroy()
326 })
327
328 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
329 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
330 const pool = new DynamicThreadPool(
331 min,
332 max,
333 './tests/worker-files/thread/testWorker.mjs',
334 { workerChoiceStrategy }
335 )
336 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
337 const promises = new Set()
338 const maxMultiplier = 2
339 for (let i = 0; i < max * maxMultiplier; i++) {
340 promises.add(pool.execute())
341 }
342 await Promise.all(promises)
343 for (const workerNode of pool.workerNodes) {
344 expect(workerNode.usage).toStrictEqual({
345 tasks: {
346 executed: expect.any(Number),
347 executing: 0,
348 queued: 0,
349 maxQueued: 0,
350 sequentiallyStolen: 0,
351 stolen: 0,
352 failed: 0
353 },
354 runTime: {
355 history: new CircularArray()
356 },
357 waitTime: {
358 history: new CircularArray()
359 },
360 elu: {
361 idle: {
362 history: new CircularArray()
363 },
364 active: {
365 history: new CircularArray()
366 }
367 }
368 })
369 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
370 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
371 max * maxMultiplier
372 )
373 }
374 expect(
375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
376 pool.workerChoiceStrategyContext.workerChoiceStrategy
377 ).nextWorkerNodeKey
378 ).toBe(0)
379 expect(
380 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
381 pool.workerChoiceStrategyContext.workerChoiceStrategy
382 ).previousWorkerNodeKey
383 ).toBe(pool.workerNodes.length - 1)
384 // We need to clean up the resources after our test
385 await pool.destroy()
386 })
387
388 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
389 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
390 let pool = new FixedClusterPool(
391 max,
392 './tests/worker-files/cluster/testWorker.js',
393 { workerChoiceStrategy }
394 )
395 let results = new Set()
396 for (let i = 0; i < max; i++) {
397 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
398 }
399 expect(results.size).toBe(max)
400 await pool.destroy()
401 pool = new FixedThreadPool(
402 max,
403 './tests/worker-files/thread/testWorker.mjs',
404 { workerChoiceStrategy }
405 )
406 results = new Set()
407 for (let i = 0; i < max; i++) {
408 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
409 }
410 expect(results.size).toBe(max)
411 await pool.destroy()
412 })
413
414 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
415 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
416 let pool = new FixedThreadPool(
417 max,
418 './tests/worker-files/thread/testWorker.mjs',
419 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
420 )
421 expect(
422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
423 pool.workerChoiceStrategyContext.workerChoiceStrategy
424 ).nextWorkerNodeKey
425 ).toBeDefined()
426 expect(
427 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
428 pool.workerChoiceStrategyContext.workerChoiceStrategy
429 ).previousWorkerNodeKey
430 ).toBeDefined()
431 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
432 expect(
433 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
434 pool.workerChoiceStrategyContext.workerChoiceStrategy
435 ).nextWorkerNodeKey
436 ).toBe(0)
437 expect(
438 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
439 pool.workerChoiceStrategyContext.workerChoiceStrategy
440 ).previousWorkerNodeKey
441 ).toBe(0)
442 await pool.destroy()
443 pool = new DynamicThreadPool(
444 min,
445 max,
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
448 )
449 expect(
450 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
451 pool.workerChoiceStrategyContext.workerChoiceStrategy
452 ).nextWorkerNodeKey
453 ).toBeDefined()
454 expect(
455 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
456 pool.workerChoiceStrategyContext.workerChoiceStrategy
457 ).previousWorkerNodeKey
458 ).toBeDefined()
459 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
460 expect(
461 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
462 pool.workerChoiceStrategyContext.workerChoiceStrategy
463 ).nextWorkerNodeKey
464 ).toBe(0)
465 expect(
466 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
467 pool.workerChoiceStrategyContext.workerChoiceStrategy
468 ).previousWorkerNodeKey
469 ).toBe(0)
470 // We need to clean up the resources after our test
471 await pool.destroy()
472 })
473
474 it('Verify LEAST_USED strategy default policy', async () => {
475 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
476 let pool = new FixedThreadPool(
477 max,
478 './tests/worker-files/thread/testWorker.mjs',
479 { workerChoiceStrategy }
480 )
481 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
482 dynamicWorkerUsage: false,
483 dynamicWorkerReady: true
484 })
485 await pool.destroy()
486 pool = new DynamicThreadPool(
487 min,
488 max,
489 './tests/worker-files/thread/testWorker.mjs',
490 { workerChoiceStrategy }
491 )
492 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
493 dynamicWorkerUsage: false,
494 dynamicWorkerReady: true
495 })
496 // We need to clean up the resources after our test
497 await pool.destroy()
498 })
499
500 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
501 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
502 let pool = new FixedThreadPool(
503 max,
504 './tests/worker-files/thread/testWorker.mjs',
505 { workerChoiceStrategy }
506 )
507 expect(
508 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
509 ).toStrictEqual({
510 runTime: {
511 aggregate: false,
512 average: false,
513 median: false
514 },
515 waitTime: {
516 aggregate: false,
517 average: false,
518 median: false
519 },
520 elu: {
521 aggregate: false,
522 average: false,
523 median: false
524 }
525 })
526 await pool.destroy()
527 pool = new DynamicThreadPool(
528 min,
529 max,
530 './tests/worker-files/thread/testWorker.mjs',
531 { workerChoiceStrategy }
532 )
533 expect(
534 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
535 ).toStrictEqual({
536 runTime: {
537 aggregate: false,
538 average: false,
539 median: false
540 },
541 waitTime: {
542 aggregate: false,
543 average: false,
544 median: false
545 },
546 elu: {
547 aggregate: false,
548 average: false,
549 median: false
550 }
551 })
552 // We need to clean up the resources after our test
553 await pool.destroy()
554 })
555
556 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
557 const pool = new FixedThreadPool(
558 max,
559 './tests/worker-files/thread/testWorker.mjs',
560 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
561 )
562 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
563 const promises = new Set()
564 const maxMultiplier = 2
565 for (let i = 0; i < max * maxMultiplier; i++) {
566 promises.add(pool.execute())
567 }
568 await Promise.all(promises)
569 for (const workerNode of pool.workerNodes) {
570 expect(workerNode.usage).toStrictEqual({
571 tasks: {
572 executed: expect.any(Number),
573 executing: 0,
574 queued: 0,
575 maxQueued: 0,
576 sequentiallyStolen: 0,
577 stolen: 0,
578 failed: 0
579 },
580 runTime: {
581 history: new CircularArray()
582 },
583 waitTime: {
584 history: new CircularArray()
585 },
586 elu: {
587 idle: {
588 history: new CircularArray()
589 },
590 active: {
591 history: new CircularArray()
592 }
593 }
594 })
595 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
596 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
597 max * maxMultiplier
598 )
599 }
600 expect(
601 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
602 pool.workerChoiceStrategyContext.workerChoiceStrategy
603 ).nextWorkerNodeKey
604 ).toEqual(expect.any(Number))
605 expect(
606 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
607 pool.workerChoiceStrategyContext.workerChoiceStrategy
608 ).previousWorkerNodeKey
609 ).toEqual(expect.any(Number))
610 // We need to clean up the resources after our test
611 await pool.destroy()
612 })
613
614 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
615 const pool = new DynamicThreadPool(
616 min,
617 max,
618 './tests/worker-files/thread/testWorker.mjs',
619 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
620 )
621 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
622 const promises = new Set()
623 const maxMultiplier = 2
624 for (let i = 0; i < max * maxMultiplier; i++) {
625 promises.add(pool.execute())
626 }
627 await Promise.all(promises)
628 for (const workerNode of pool.workerNodes) {
629 expect(workerNode.usage).toStrictEqual({
630 tasks: {
631 executed: expect.any(Number),
632 executing: 0,
633 queued: 0,
634 maxQueued: 0,
635 sequentiallyStolen: 0,
636 stolen: 0,
637 failed: 0
638 },
639 runTime: {
640 history: new CircularArray()
641 },
642 waitTime: {
643 history: new CircularArray()
644 },
645 elu: {
646 idle: {
647 history: new CircularArray()
648 },
649 active: {
650 history: new CircularArray()
651 }
652 }
653 })
654 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
655 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
656 max * maxMultiplier
657 )
658 }
659 expect(
660 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
661 pool.workerChoiceStrategyContext.workerChoiceStrategy
662 ).nextWorkerNodeKey
663 ).toEqual(expect.any(Number))
664 expect(
665 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
666 pool.workerChoiceStrategyContext.workerChoiceStrategy
667 ).previousWorkerNodeKey
668 ).toEqual(expect.any(Number))
669 // We need to clean up the resources after our test
670 await pool.destroy()
671 })
672
673 it('Verify LEAST_BUSY strategy default policy', async () => {
674 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
675 let pool = new FixedThreadPool(
676 max,
677 './tests/worker-files/thread/testWorker.mjs',
678 { workerChoiceStrategy }
679 )
680 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
681 dynamicWorkerUsage: false,
682 dynamicWorkerReady: true
683 })
684 await pool.destroy()
685 pool = new DynamicThreadPool(
686 min,
687 max,
688 './tests/worker-files/thread/testWorker.mjs',
689 { workerChoiceStrategy }
690 )
691 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
692 dynamicWorkerUsage: false,
693 dynamicWorkerReady: true
694 })
695 // We need to clean up the resources after our test
696 await pool.destroy()
697 })
698
699 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
700 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
701 let pool = new FixedThreadPool(
702 max,
703 './tests/worker-files/thread/testWorker.mjs',
704 { workerChoiceStrategy }
705 )
706 expect(
707 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
708 ).toStrictEqual({
709 runTime: {
710 aggregate: true,
711 average: false,
712 median: false
713 },
714 waitTime: {
715 aggregate: true,
716 average: false,
717 median: false
718 },
719 elu: {
720 aggregate: false,
721 average: false,
722 median: false
723 }
724 })
725 await pool.destroy()
726 pool = new DynamicThreadPool(
727 min,
728 max,
729 './tests/worker-files/thread/testWorker.mjs',
730 { workerChoiceStrategy }
731 )
732 expect(
733 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
734 ).toStrictEqual({
735 runTime: {
736 aggregate: true,
737 average: false,
738 median: false
739 },
740 waitTime: {
741 aggregate: true,
742 average: false,
743 median: false
744 },
745 elu: {
746 aggregate: false,
747 average: false,
748 median: false
749 }
750 })
751 // We need to clean up the resources after our test
752 await pool.destroy()
753 })
754
755 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
756 const pool = new FixedThreadPool(
757 max,
758 './tests/worker-files/thread/testWorker.mjs',
759 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
760 )
761 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
762 const promises = new Set()
763 const maxMultiplier = 2
764 for (let i = 0; i < max * maxMultiplier; i++) {
765 promises.add(pool.execute())
766 }
767 await Promise.all(promises)
768 for (const workerNode of pool.workerNodes) {
769 expect(workerNode.usage).toStrictEqual({
770 tasks: {
771 executed: expect.any(Number),
772 executing: 0,
773 queued: 0,
774 maxQueued: 0,
775 sequentiallyStolen: 0,
776 stolen: 0,
777 failed: 0
778 },
779 runTime: expect.objectContaining({
780 history: expect.any(CircularArray)
781 }),
782 waitTime: expect.objectContaining({
783 history: expect.any(CircularArray)
784 }),
785 elu: {
786 idle: {
787 history: new CircularArray()
788 },
789 active: {
790 history: new CircularArray()
791 }
792 }
793 })
794 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
795 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
796 max * maxMultiplier
797 )
798 if (workerNode.usage.runTime.aggregate == null) {
799 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
800 } else {
801 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
802 }
803 if (workerNode.usage.waitTime.aggregate == null) {
804 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
805 } else {
806 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
807 }
808 }
809 expect(
810 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
811 pool.workerChoiceStrategyContext.workerChoiceStrategy
812 ).nextWorkerNodeKey
813 ).toEqual(expect.any(Number))
814 expect(
815 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
816 pool.workerChoiceStrategyContext.workerChoiceStrategy
817 ).previousWorkerNodeKey
818 ).toEqual(expect.any(Number))
819 // We need to clean up the resources after our test
820 await pool.destroy()
821 })
822
823 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
824 const pool = new DynamicThreadPool(
825 min,
826 max,
827 './tests/worker-files/thread/testWorker.mjs',
828 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
829 )
830 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
831 const promises = new Set()
832 const maxMultiplier = 2
833 for (let i = 0; i < max * maxMultiplier; i++) {
834 promises.add(pool.execute())
835 }
836 await Promise.all(promises)
837 for (const workerNode of pool.workerNodes) {
838 expect(workerNode.usage).toStrictEqual({
839 tasks: {
840 executed: expect.any(Number),
841 executing: 0,
842 queued: 0,
843 maxQueued: 0,
844 sequentiallyStolen: 0,
845 stolen: 0,
846 failed: 0
847 },
848 runTime: expect.objectContaining({
849 history: expect.any(CircularArray)
850 }),
851 waitTime: expect.objectContaining({
852 history: expect.any(CircularArray)
853 }),
854 elu: {
855 idle: {
856 history: new CircularArray()
857 },
858 active: {
859 history: new CircularArray()
860 }
861 }
862 })
863 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
864 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
865 max * maxMultiplier
866 )
867 if (workerNode.usage.runTime.aggregate == null) {
868 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
869 } else {
870 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
871 }
872 if (workerNode.usage.waitTime.aggregate == null) {
873 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
874 } else {
875 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
876 }
877 }
878 expect(
879 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
880 pool.workerChoiceStrategyContext.workerChoiceStrategy
881 ).nextWorkerNodeKey
882 ).toEqual(expect.any(Number))
883 expect(
884 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
885 pool.workerChoiceStrategyContext.workerChoiceStrategy
886 ).previousWorkerNodeKey
887 ).toEqual(expect.any(Number))
888 // We need to clean up the resources after our test
889 await pool.destroy()
890 })
891
892 it('Verify LEAST_ELU strategy default policy', async () => {
893 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
894 let pool = new FixedThreadPool(
895 max,
896 './tests/worker-files/thread/testWorker.mjs',
897 { workerChoiceStrategy }
898 )
899 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
900 dynamicWorkerUsage: false,
901 dynamicWorkerReady: true
902 })
903 await pool.destroy()
904 pool = new DynamicThreadPool(
905 min,
906 max,
907 './tests/worker-files/thread/testWorker.mjs',
908 { workerChoiceStrategy }
909 )
910 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
911 dynamicWorkerUsage: false,
912 dynamicWorkerReady: true
913 })
914 // We need to clean up the resources after our test
915 await pool.destroy()
916 })
917
918 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
919 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
920 let pool = new FixedThreadPool(
921 max,
922 './tests/worker-files/thread/testWorker.mjs',
923 { workerChoiceStrategy }
924 )
925 expect(
926 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
927 ).toStrictEqual({
928 runTime: {
929 aggregate: false,
930 average: false,
931 median: false
932 },
933 waitTime: {
934 aggregate: false,
935 average: false,
936 median: false
937 },
938 elu: {
939 aggregate: true,
940 average: false,
941 median: false
942 }
943 })
944 await pool.destroy()
945 pool = new DynamicThreadPool(
946 min,
947 max,
948 './tests/worker-files/thread/testWorker.mjs',
949 { workerChoiceStrategy }
950 )
951 expect(
952 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
953 ).toStrictEqual({
954 runTime: {
955 aggregate: false,
956 average: false,
957 median: false
958 },
959 waitTime: {
960 aggregate: false,
961 average: false,
962 median: false
963 },
964 elu: {
965 aggregate: true,
966 average: false,
967 median: false
968 }
969 })
970 // We need to clean up the resources after our test
971 await pool.destroy()
972 })
973
974 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
975 const pool = new FixedThreadPool(
976 max,
977 './tests/worker-files/thread/testWorker.mjs',
978 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
979 )
980 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
981 const promises = new Set()
982 const maxMultiplier = 2
983 for (let i = 0; i < max * maxMultiplier; i++) {
984 promises.add(pool.execute())
985 }
986 await Promise.all(promises)
987 for (const workerNode of pool.workerNodes) {
988 expect(workerNode.usage).toStrictEqual({
989 tasks: {
990 executed: expect.any(Number),
991 executing: 0,
992 queued: 0,
993 maxQueued: 0,
994 sequentiallyStolen: 0,
995 stolen: 0,
996 failed: 0
997 },
998 runTime: {
999 history: new CircularArray()
1000 },
1001 waitTime: {
1002 history: new CircularArray()
1003 },
1004 elu: expect.objectContaining({
1005 idle: expect.objectContaining({
1006 history: expect.any(CircularArray)
1007 }),
1008 active: expect.objectContaining({
1009 history: expect.any(CircularArray)
1010 })
1011 })
1012 })
1013 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1014 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1015 max * maxMultiplier
1016 )
1017 if (workerNode.usage.elu.active.aggregate == null) {
1018 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1019 } else {
1020 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1021 }
1022 if (workerNode.usage.elu.idle.aggregate == null) {
1023 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1024 } else {
1025 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1026 }
1027 if (workerNode.usage.elu.utilization == null) {
1028 expect(workerNode.usage.elu.utilization).toBeUndefined()
1029 } else {
1030 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1031 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1032 }
1033 }
1034 expect(
1035 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1036 pool.workerChoiceStrategyContext.workerChoiceStrategy
1037 ).nextWorkerNodeKey
1038 ).toEqual(expect.any(Number))
1039 expect(
1040 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1041 pool.workerChoiceStrategyContext.workerChoiceStrategy
1042 ).previousWorkerNodeKey
1043 ).toEqual(expect.any(Number))
1044 // We need to clean up the resources after our test
1045 await pool.destroy()
1046 })
1047
1048 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1049 const pool = new DynamicThreadPool(
1050 min,
1051 max,
1052 './tests/worker-files/thread/testWorker.mjs',
1053 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1054 )
1055 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1056 const promises = new Set()
1057 const maxMultiplier = 2
1058 for (let i = 0; i < max * maxMultiplier; i++) {
1059 promises.add(pool.execute())
1060 }
1061 await Promise.all(promises)
1062 for (const workerNode of pool.workerNodes) {
1063 expect(workerNode.usage).toStrictEqual({
1064 tasks: {
1065 executed: expect.any(Number),
1066 executing: 0,
1067 queued: 0,
1068 maxQueued: 0,
1069 sequentiallyStolen: 0,
1070 stolen: 0,
1071 failed: 0
1072 },
1073 runTime: {
1074 history: new CircularArray()
1075 },
1076 waitTime: {
1077 history: new CircularArray()
1078 },
1079 elu: expect.objectContaining({
1080 idle: expect.objectContaining({
1081 history: expect.any(CircularArray)
1082 }),
1083 active: expect.objectContaining({
1084 history: expect.any(CircularArray)
1085 })
1086 })
1087 })
1088 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1089 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1090 max * maxMultiplier
1091 )
1092 if (workerNode.usage.elu.active.aggregate == null) {
1093 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1094 } else {
1095 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1096 }
1097 if (workerNode.usage.elu.idle.aggregate == null) {
1098 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1099 } else {
1100 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1101 }
1102 if (workerNode.usage.elu.utilization == null) {
1103 expect(workerNode.usage.elu.utilization).toBeUndefined()
1104 } else {
1105 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1106 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1107 }
1108 }
1109 expect(
1110 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1111 pool.workerChoiceStrategyContext.workerChoiceStrategy
1112 ).nextWorkerNodeKey
1113 ).toEqual(expect.any(Number))
1114 expect(
1115 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1116 pool.workerChoiceStrategyContext.workerChoiceStrategy
1117 ).previousWorkerNodeKey
1118 ).toEqual(expect.any(Number))
1119 // We need to clean up the resources after our test
1120 await pool.destroy()
1121 })
1122
1123 it('Verify FAIR_SHARE strategy default policy', async () => {
1124 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1125 let pool = new FixedThreadPool(
1126 max,
1127 './tests/worker-files/thread/testWorker.mjs',
1128 { workerChoiceStrategy }
1129 )
1130 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1131 dynamicWorkerUsage: false,
1132 dynamicWorkerReady: true
1133 })
1134 await pool.destroy()
1135 pool = new DynamicThreadPool(
1136 min,
1137 max,
1138 './tests/worker-files/thread/testWorker.mjs',
1139 { workerChoiceStrategy }
1140 )
1141 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1142 dynamicWorkerUsage: false,
1143 dynamicWorkerReady: true
1144 })
1145 // We need to clean up the resources after our test
1146 await pool.destroy()
1147 })
1148
1149 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1150 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1151 let pool = new FixedThreadPool(
1152 max,
1153 './tests/worker-files/thread/testWorker.mjs',
1154 { workerChoiceStrategy }
1155 )
1156 expect(
1157 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1158 ).toStrictEqual({
1159 runTime: {
1160 aggregate: true,
1161 average: true,
1162 median: false
1163 },
1164 waitTime: {
1165 aggregate: false,
1166 average: false,
1167 median: false
1168 },
1169 elu: {
1170 aggregate: true,
1171 average: true,
1172 median: false
1173 }
1174 })
1175 await pool.destroy()
1176 pool = new DynamicThreadPool(
1177 min,
1178 max,
1179 './tests/worker-files/thread/testWorker.mjs',
1180 { workerChoiceStrategy }
1181 )
1182 expect(
1183 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1184 ).toStrictEqual({
1185 runTime: {
1186 aggregate: true,
1187 average: true,
1188 median: false
1189 },
1190 waitTime: {
1191 aggregate: false,
1192 average: false,
1193 median: false
1194 },
1195 elu: {
1196 aggregate: true,
1197 average: true,
1198 median: false
1199 }
1200 })
1201 // We need to clean up the resources after our test
1202 await pool.destroy()
1203 })
1204
1205 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1206 const pool = new FixedThreadPool(
1207 max,
1208 './tests/worker-files/thread/testWorker.mjs',
1209 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1210 )
1211 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1212 const promises = new Set()
1213 const maxMultiplier = 2
1214 for (let i = 0; i < max * maxMultiplier; i++) {
1215 promises.add(pool.execute())
1216 }
1217 await Promise.all(promises)
1218 for (const workerNode of pool.workerNodes) {
1219 expect(workerNode.usage).toStrictEqual({
1220 tasks: {
1221 executed: expect.any(Number),
1222 executing: 0,
1223 queued: 0,
1224 maxQueued: 0,
1225 sequentiallyStolen: 0,
1226 stolen: 0,
1227 failed: 0
1228 },
1229 runTime: expect.objectContaining({
1230 history: expect.any(CircularArray)
1231 }),
1232 waitTime: {
1233 history: new CircularArray()
1234 },
1235 elu: expect.objectContaining({
1236 idle: expect.objectContaining({
1237 history: expect.any(CircularArray)
1238 }),
1239 active: expect.objectContaining({
1240 history: expect.any(CircularArray)
1241 })
1242 })
1243 })
1244 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1245 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1246 max * maxMultiplier
1247 )
1248 if (workerNode.usage.runTime.aggregate == null) {
1249 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1250 } else {
1251 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1252 }
1253 if (workerNode.usage.runTime.average == null) {
1254 expect(workerNode.usage.runTime.average).toBeUndefined()
1255 } else {
1256 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1257 }
1258 if (workerNode.usage.elu.active.aggregate == null) {
1259 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1260 } else {
1261 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1262 }
1263 if (workerNode.usage.elu.idle.aggregate == null) {
1264 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1265 } else {
1266 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1267 }
1268 if (workerNode.usage.elu.utilization == null) {
1269 expect(workerNode.usage.elu.utilization).toBeUndefined()
1270 } else {
1271 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1272 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1273 }
1274 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1275 }
1276 expect(
1277 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1278 pool.workerChoiceStrategyContext.workerChoiceStrategy
1279 ).nextWorkerNodeKey
1280 ).toEqual(expect.any(Number))
1281 expect(
1282 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1283 pool.workerChoiceStrategyContext.workerChoiceStrategy
1284 ).previousWorkerNodeKey
1285 ).toEqual(expect.any(Number))
1286 // We need to clean up the resources after our test
1287 await pool.destroy()
1288 })
1289
1290 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1291 const pool = new DynamicThreadPool(
1292 min,
1293 max,
1294 './tests/worker-files/thread/testWorker.mjs',
1295 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1296 )
1297 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1298 const promises = new Set()
1299 const maxMultiplier = 2
1300 for (let i = 0; i < max * maxMultiplier; i++) {
1301 promises.add(pool.execute())
1302 }
1303 await Promise.all(promises)
1304 for (const workerNode of pool.workerNodes) {
1305 expect(workerNode.usage).toStrictEqual({
1306 tasks: {
1307 executed: expect.any(Number),
1308 executing: 0,
1309 queued: 0,
1310 maxQueued: 0,
1311 sequentiallyStolen: 0,
1312 stolen: 0,
1313 failed: 0
1314 },
1315 runTime: expect.objectContaining({
1316 history: expect.any(CircularArray)
1317 }),
1318 waitTime: {
1319 history: new CircularArray()
1320 },
1321 elu: expect.objectContaining({
1322 idle: expect.objectContaining({
1323 history: expect.any(CircularArray)
1324 }),
1325 active: expect.objectContaining({
1326 history: expect.any(CircularArray)
1327 })
1328 })
1329 })
1330 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1331 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1332 max * maxMultiplier
1333 )
1334 if (workerNode.usage.runTime.aggregate == null) {
1335 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1336 } else {
1337 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1338 }
1339 if (workerNode.usage.runTime.average == null) {
1340 expect(workerNode.usage.runTime.average).toBeUndefined()
1341 } else {
1342 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1343 }
1344 if (workerNode.usage.elu.active.aggregate == null) {
1345 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1346 } else {
1347 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1348 }
1349 if (workerNode.usage.elu.idle.aggregate == null) {
1350 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1351 } else {
1352 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1353 }
1354 if (workerNode.usage.elu.utilization == null) {
1355 expect(workerNode.usage.elu.utilization).toBeUndefined()
1356 } else {
1357 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1358 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1359 }
1360 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1361 }
1362 expect(
1363 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1364 pool.workerChoiceStrategyContext.workerChoiceStrategy
1365 ).nextWorkerNodeKey
1366 ).toEqual(expect.any(Number))
1367 expect(
1368 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1369 pool.workerChoiceStrategyContext.workerChoiceStrategy
1370 ).previousWorkerNodeKey
1371 ).toEqual(expect.any(Number))
1372 // We need to clean up the resources after our test
1373 await pool.destroy()
1374 })
1375
1376 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1377 const pool = new DynamicThreadPool(
1378 min,
1379 max,
1380 './tests/worker-files/thread/testWorker.mjs',
1381 {
1382 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1383 workerChoiceStrategyOptions: {
1384 runTime: { median: true }
1385 }
1386 }
1387 )
1388 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1389 const promises = new Set()
1390 const maxMultiplier = 2
1391 for (let i = 0; i < max * maxMultiplier; i++) {
1392 promises.add(pool.execute())
1393 }
1394 await Promise.all(promises)
1395 for (const workerNode of pool.workerNodes) {
1396 expect(workerNode.usage).toStrictEqual({
1397 tasks: {
1398 executed: expect.any(Number),
1399 executing: 0,
1400 queued: 0,
1401 maxQueued: 0,
1402 sequentiallyStolen: 0,
1403 stolen: 0,
1404 failed: 0
1405 },
1406 runTime: expect.objectContaining({
1407 history: expect.any(CircularArray)
1408 }),
1409 waitTime: {
1410 history: new CircularArray()
1411 },
1412 elu: expect.objectContaining({
1413 idle: expect.objectContaining({
1414 history: expect.any(CircularArray)
1415 }),
1416 active: expect.objectContaining({
1417 history: expect.any(CircularArray)
1418 })
1419 })
1420 })
1421 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1422 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1423 max * maxMultiplier
1424 )
1425 if (workerNode.usage.runTime.aggregate == null) {
1426 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1427 } else {
1428 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1429 }
1430 if (workerNode.usage.runTime.median == null) {
1431 expect(workerNode.usage.runTime.median).toBeUndefined()
1432 } else {
1433 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1434 }
1435 if (workerNode.usage.elu.active.aggregate == null) {
1436 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1437 } else {
1438 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1439 }
1440 if (workerNode.usage.elu.idle.aggregate == null) {
1441 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1442 } else {
1443 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1444 }
1445 if (workerNode.usage.elu.utilization == null) {
1446 expect(workerNode.usage.elu.utilization).toBeUndefined()
1447 } else {
1448 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1449 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1450 }
1451 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1452 }
1453 expect(
1454 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1455 pool.workerChoiceStrategyContext.workerChoiceStrategy
1456 ).nextWorkerNodeKey
1457 ).toEqual(expect.any(Number))
1458 expect(
1459 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1460 pool.workerChoiceStrategyContext.workerChoiceStrategy
1461 ).previousWorkerNodeKey
1462 ).toEqual(expect.any(Number))
1463 // We need to clean up the resources after our test
1464 await pool.destroy()
1465 })
1466
1467 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1468 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1469 let pool = new FixedThreadPool(
1470 max,
1471 './tests/worker-files/thread/testWorker.mjs'
1472 )
1473 for (const workerNode of pool.workerNodes) {
1474 workerNode.strategyData = {
1475 virtualTaskEndTimestamp: performance.now()
1476 }
1477 }
1478 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1479 for (const workerNode of pool.workerNodes) {
1480 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1481 }
1482 await pool.destroy()
1483 pool = new DynamicThreadPool(
1484 min,
1485 max,
1486 './tests/worker-files/thread/testWorker.mjs'
1487 )
1488 for (const workerNode of pool.workerNodes) {
1489 workerNode.strategyData = {
1490 virtualTaskEndTimestamp: performance.now()
1491 }
1492 }
1493 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1494 for (const workerNode of pool.workerNodes) {
1495 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1496 }
1497 // We need to clean up the resources after our test
1498 await pool.destroy()
1499 })
1500
1501 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1502 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1503 let pool = new FixedThreadPool(
1504 max,
1505 './tests/worker-files/thread/testWorker.mjs',
1506 { workerChoiceStrategy }
1507 )
1508 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1509 dynamicWorkerUsage: false,
1510 dynamicWorkerReady: true
1511 })
1512 await pool.destroy()
1513 pool = new DynamicThreadPool(
1514 min,
1515 max,
1516 './tests/worker-files/thread/testWorker.mjs',
1517 { workerChoiceStrategy }
1518 )
1519 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1520 dynamicWorkerUsage: false,
1521 dynamicWorkerReady: true
1522 })
1523 // We need to clean up the resources after our test
1524 await pool.destroy()
1525 })
1526
1527 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1528 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1529 let pool = new FixedThreadPool(
1530 max,
1531 './tests/worker-files/thread/testWorker.mjs',
1532 { workerChoiceStrategy }
1533 )
1534 expect(
1535 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1536 ).toStrictEqual({
1537 runTime: {
1538 aggregate: true,
1539 average: true,
1540 median: false
1541 },
1542 waitTime: {
1543 aggregate: false,
1544 average: false,
1545 median: false
1546 },
1547 elu: {
1548 aggregate: false,
1549 average: false,
1550 median: false
1551 }
1552 })
1553 await pool.destroy()
1554 pool = new DynamicThreadPool(
1555 min,
1556 max,
1557 './tests/worker-files/thread/testWorker.mjs',
1558 { workerChoiceStrategy }
1559 )
1560 expect(
1561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1562 ).toStrictEqual({
1563 runTime: {
1564 aggregate: true,
1565 average: true,
1566 median: false
1567 },
1568 waitTime: {
1569 aggregate: false,
1570 average: false,
1571 median: false
1572 },
1573 elu: {
1574 aggregate: false,
1575 average: false,
1576 median: false
1577 }
1578 })
1579 // We need to clean up the resources after our test
1580 await pool.destroy()
1581 })
1582
1583 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1584 const pool = new FixedThreadPool(
1585 max,
1586 './tests/worker-files/thread/testWorker.mjs',
1587 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1588 )
1589 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1590 const promises = new Set()
1591 const maxMultiplier = 2
1592 for (let i = 0; i < max * maxMultiplier; i++) {
1593 promises.add(pool.execute())
1594 }
1595 await Promise.all(promises)
1596 for (const workerNode of pool.workerNodes) {
1597 expect(workerNode.usage).toStrictEqual({
1598 tasks: {
1599 executed: expect.any(Number),
1600 executing: 0,
1601 queued: 0,
1602 maxQueued: 0,
1603 sequentiallyStolen: 0,
1604 stolen: 0,
1605 failed: 0
1606 },
1607 runTime: expect.objectContaining({
1608 history: expect.any(CircularArray)
1609 }),
1610 waitTime: {
1611 history: new CircularArray()
1612 },
1613 elu: {
1614 idle: {
1615 history: new CircularArray()
1616 },
1617 active: {
1618 history: new CircularArray()
1619 }
1620 }
1621 })
1622 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1623 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1624 max * maxMultiplier
1625 )
1626 if (workerNode.usage.runTime.aggregate == null) {
1627 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1628 } else {
1629 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1630 }
1631 if (workerNode.usage.runTime.average == null) {
1632 expect(workerNode.usage.runTime.average).toBeUndefined()
1633 } else {
1634 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1635 }
1636 }
1637 expect(
1638 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1639 pool.workerChoiceStrategyContext.workerChoiceStrategy
1640 ).nextWorkerNodeKey
1641 ).toBe(0)
1642 expect(
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 pool.workerChoiceStrategyContext.workerChoiceStrategy
1645 ).previousWorkerNodeKey
1646 ).toEqual(expect.any(Number))
1647 expect(
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 ).workerNodeVirtualTaskRunTime
1651 ).toBeGreaterThanOrEqual(0)
1652 // We need to clean up the resources after our test
1653 await pool.destroy()
1654 })
1655
1656 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1657 const pool = new DynamicThreadPool(
1658 min,
1659 max,
1660 './tests/worker-files/thread/testWorker.mjs',
1661 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1662 )
1663 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1664 const promises = new Set()
1665 const maxMultiplier = 2
1666 for (let i = 0; i < max * maxMultiplier; i++) {
1667 promises.add(pool.execute())
1668 }
1669 await Promise.all(promises)
1670 for (const workerNode of pool.workerNodes) {
1671 expect(workerNode.usage).toStrictEqual({
1672 tasks: {
1673 executed: expect.any(Number),
1674 executing: 0,
1675 queued: 0,
1676 maxQueued: 0,
1677 sequentiallyStolen: 0,
1678 stolen: 0,
1679 failed: 0
1680 },
1681 runTime: expect.objectContaining({
1682 history: expect.any(CircularArray)
1683 }),
1684 waitTime: {
1685 history: new CircularArray()
1686 },
1687 elu: {
1688 idle: {
1689 history: new CircularArray()
1690 },
1691 active: {
1692 history: new CircularArray()
1693 }
1694 }
1695 })
1696 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1697 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1698 max * maxMultiplier
1699 )
1700 if (workerNode.usage.runTime.aggregate == null) {
1701 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1702 } else {
1703 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1704 }
1705 if (workerNode.usage.runTime.average == null) {
1706 expect(workerNode.usage.runTime.average).toBeUndefined()
1707 } else {
1708 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1709 }
1710 }
1711 expect(
1712 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1713 pool.workerChoiceStrategyContext.workerChoiceStrategy
1714 ).nextWorkerNodeKey
1715 ).toEqual(expect.any(Number))
1716 expect(
1717 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1718 pool.workerChoiceStrategyContext.workerChoiceStrategy
1719 ).previousWorkerNodeKey
1720 ).toEqual(expect.any(Number))
1721 expect(
1722 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1723 pool.workerChoiceStrategyContext.workerChoiceStrategy
1724 ).workerNodeVirtualTaskRunTime
1725 ).toBeGreaterThanOrEqual(0)
1726 // We need to clean up the resources after our test
1727 await pool.destroy()
1728 })
1729
1730 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1731 const pool = new DynamicThreadPool(
1732 min,
1733 max,
1734 './tests/worker-files/thread/testWorker.mjs',
1735 {
1736 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1737 workerChoiceStrategyOptions: {
1738 runTime: { median: true }
1739 }
1740 }
1741 )
1742 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1743 const promises = new Set()
1744 const maxMultiplier = 2
1745 for (let i = 0; i < max * maxMultiplier; i++) {
1746 promises.add(pool.execute())
1747 }
1748 await Promise.all(promises)
1749 for (const workerNode of pool.workerNodes) {
1750 expect(workerNode.usage).toStrictEqual({
1751 tasks: {
1752 executed: expect.any(Number),
1753 executing: 0,
1754 queued: 0,
1755 maxQueued: 0,
1756 sequentiallyStolen: 0,
1757 stolen: 0,
1758 failed: 0
1759 },
1760 runTime: expect.objectContaining({
1761 history: expect.any(CircularArray)
1762 }),
1763 waitTime: {
1764 history: new CircularArray()
1765 },
1766 elu: {
1767 idle: {
1768 history: new CircularArray()
1769 },
1770 active: {
1771 history: new CircularArray()
1772 }
1773 }
1774 })
1775 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1776 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1777 max * maxMultiplier
1778 )
1779 if (workerNode.usage.runTime.aggregate == null) {
1780 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1781 } else {
1782 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1783 }
1784 if (workerNode.usage.runTime.median == null) {
1785 expect(workerNode.usage.runTime.median).toBeUndefined()
1786 } else {
1787 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1788 }
1789 }
1790 expect(
1791 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1792 pool.workerChoiceStrategyContext.workerChoiceStrategy
1793 ).nextWorkerNodeKey
1794 ).toEqual(expect.any(Number))
1795 expect(
1796 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategy
1798 ).previousWorkerNodeKey
1799 ).toEqual(expect.any(Number))
1800 expect(
1801 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1802 pool.workerChoiceStrategyContext.workerChoiceStrategy
1803 ).workerNodeVirtualTaskRunTime
1804 ).toBeGreaterThanOrEqual(0)
1805 // We need to clean up the resources after our test
1806 await pool.destroy()
1807 })
1808
1809 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1810 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1811 let pool = new FixedThreadPool(
1812 max,
1813 './tests/worker-files/thread/testWorker.mjs'
1814 )
1815 expect(
1816 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1817 workerChoiceStrategy
1818 ).nextWorkerNodeKey
1819 ).toBeDefined()
1820 expect(
1821 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1822 workerChoiceStrategy
1823 ).previousWorkerNodeKey
1824 ).toBeDefined()
1825 expect(
1826 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1827 workerChoiceStrategy
1828 ).workerNodeVirtualTaskRunTime
1829 ).toBeDefined()
1830 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1831 expect(
1832 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1833 pool.workerChoiceStrategyContext.workerChoiceStrategy
1834 ).nextWorkerNodeKey
1835 ).toBe(0)
1836 expect(
1837 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1838 pool.workerChoiceStrategyContext.workerChoiceStrategy
1839 ).previousWorkerNodeKey
1840 ).toBe(0)
1841 expect(
1842 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1843 pool.workerChoiceStrategyContext.workerChoiceStrategy
1844 ).workerNodeVirtualTaskRunTime
1845 ).toBe(0)
1846 await pool.destroy()
1847 pool = new DynamicThreadPool(
1848 min,
1849 max,
1850 './tests/worker-files/thread/testWorker.mjs'
1851 )
1852 expect(
1853 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1854 workerChoiceStrategy
1855 ).nextWorkerNodeKey
1856 ).toBeDefined()
1857 expect(
1858 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1859 workerChoiceStrategy
1860 ).previousWorkerNodeKey
1861 ).toBeDefined()
1862 expect(
1863 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1864 workerChoiceStrategy
1865 ).workerNodeVirtualTaskRunTime
1866 ).toBeDefined()
1867 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1868 expect(
1869 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1870 pool.workerChoiceStrategyContext.workerChoiceStrategy
1871 ).nextWorkerNodeKey
1872 ).toBe(0)
1873 expect(
1874 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1875 pool.workerChoiceStrategyContext.workerChoiceStrategy
1876 ).previousWorkerNodeKey
1877 ).toBe(0)
1878 expect(
1879 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1880 pool.workerChoiceStrategyContext.workerChoiceStrategy
1881 ).workerNodeVirtualTaskRunTime
1882 ).toBe(0)
1883 // We need to clean up the resources after our test
1884 await pool.destroy()
1885 })
1886
1887 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1888 const workerChoiceStrategy =
1889 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1890 let pool = new FixedThreadPool(
1891 max,
1892 './tests/worker-files/thread/testWorker.mjs',
1893 { workerChoiceStrategy }
1894 )
1895 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1896 dynamicWorkerUsage: false,
1897 dynamicWorkerReady: true
1898 })
1899 await pool.destroy()
1900 pool = new DynamicThreadPool(
1901 min,
1902 max,
1903 './tests/worker-files/thread/testWorker.mjs',
1904 { workerChoiceStrategy }
1905 )
1906 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1907 dynamicWorkerUsage: false,
1908 dynamicWorkerReady: true
1909 })
1910 // We need to clean up the resources after our test
1911 await pool.destroy()
1912 })
1913
1914 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1915 const workerChoiceStrategy =
1916 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1917 let pool = new FixedThreadPool(
1918 max,
1919 './tests/worker-files/thread/testWorker.mjs',
1920 { workerChoiceStrategy }
1921 )
1922 expect(
1923 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1924 ).toStrictEqual({
1925 runTime: {
1926 aggregate: true,
1927 average: true,
1928 median: false
1929 },
1930 waitTime: {
1931 aggregate: false,
1932 average: false,
1933 median: false
1934 },
1935 elu: {
1936 aggregate: false,
1937 average: false,
1938 median: false
1939 }
1940 })
1941 await pool.destroy()
1942 pool = new DynamicThreadPool(
1943 min,
1944 max,
1945 './tests/worker-files/thread/testWorker.mjs',
1946 { workerChoiceStrategy }
1947 )
1948 expect(
1949 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1950 ).toStrictEqual({
1951 runTime: {
1952 aggregate: true,
1953 average: true,
1954 median: false
1955 },
1956 waitTime: {
1957 aggregate: false,
1958 average: false,
1959 median: false
1960 },
1961 elu: {
1962 aggregate: false,
1963 average: false,
1964 median: false
1965 }
1966 })
1967 // We need to clean up the resources after our test
1968 await pool.destroy()
1969 })
1970
1971 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1972 const pool = new FixedThreadPool(
1973 max,
1974 './tests/worker-files/thread/testWorker.mjs',
1975 {
1976 workerChoiceStrategy:
1977 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1978 }
1979 )
1980 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1981 const promises = new Set()
1982 const maxMultiplier = 2
1983 for (let i = 0; i < max * maxMultiplier; i++) {
1984 promises.add(pool.execute())
1985 }
1986 await Promise.all(promises)
1987 for (const workerNode of pool.workerNodes) {
1988 expect(workerNode.usage).toStrictEqual({
1989 tasks: {
1990 executed: expect.any(Number),
1991 executing: 0,
1992 queued: 0,
1993 maxQueued: 0,
1994 sequentiallyStolen: 0,
1995 stolen: 0,
1996 failed: 0
1997 },
1998 runTime: expect.objectContaining({
1999 history: expect.any(CircularArray)
2000 }),
2001 waitTime: {
2002 history: new CircularArray()
2003 },
2004 elu: {
2005 idle: {
2006 history: new CircularArray()
2007 },
2008 active: {
2009 history: new CircularArray()
2010 }
2011 }
2012 })
2013 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2014 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2015 max * maxMultiplier
2016 )
2017 }
2018 expect(
2019 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2020 pool.workerChoiceStrategyContext.workerChoiceStrategy
2021 ).roundId
2022 ).toBe(0)
2023 expect(
2024 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2025 pool.workerChoiceStrategyContext.workerChoiceStrategy
2026 ).workerNodeId
2027 ).toBe(0)
2028 expect(
2029 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2030 pool.workerChoiceStrategyContext.workerChoiceStrategy
2031 ).nextWorkerNodeKey
2032 ).toBe(0)
2033 expect(
2034 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2035 pool.workerChoiceStrategyContext.workerChoiceStrategy
2036 ).previousWorkerNodeKey
2037 ).toEqual(expect.any(Number))
2038 expect(
2039 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2040 pool.workerChoiceStrategyContext.workerChoiceStrategy
2041 ).roundWeights.length
2042 ).toBe(1)
2043 expect(
2044 Number.isSafeInteger(
2045 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2046 pool.workerChoiceStrategyContext.workerChoiceStrategy
2047 ).roundWeights[0]
2048 )
2049 ).toBe(true)
2050 // We need to clean up the resources after our test
2051 await pool.destroy()
2052 })
2053
2054 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2055 const pool = new DynamicThreadPool(
2056 min,
2057 max,
2058 './tests/worker-files/thread/testWorker.mjs',
2059 {
2060 workerChoiceStrategy:
2061 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2062 }
2063 )
2064 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2065 const promises = new Set()
2066 const maxMultiplier = 2
2067 for (let i = 0; i < max * maxMultiplier; i++) {
2068 promises.add(pool.execute())
2069 }
2070 await Promise.all(promises)
2071 for (const workerNode of pool.workerNodes) {
2072 expect(workerNode.usage).toStrictEqual({
2073 tasks: {
2074 executed: expect.any(Number),
2075 executing: 0,
2076 queued: 0,
2077 maxQueued: 0,
2078 sequentiallyStolen: 0,
2079 stolen: 0,
2080 failed: 0
2081 },
2082 runTime: expect.objectContaining({
2083 history: expect.any(CircularArray)
2084 }),
2085 waitTime: {
2086 history: new CircularArray()
2087 },
2088 elu: {
2089 idle: {
2090 history: new CircularArray()
2091 },
2092 active: {
2093 history: new CircularArray()
2094 }
2095 }
2096 })
2097 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2098 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2099 max * maxMultiplier
2100 )
2101 }
2102 expect(
2103 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2104 pool.workerChoiceStrategyContext.workerChoiceStrategy
2105 ).roundId
2106 ).toBe(0)
2107 expect(
2108 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2109 pool.workerChoiceStrategyContext.workerChoiceStrategy
2110 ).workerNodeId
2111 ).toBe(0)
2112 expect(
2113 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2114 pool.workerChoiceStrategyContext.workerChoiceStrategy
2115 ).nextWorkerNodeKey
2116 ).toBe(0)
2117 expect(
2118 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2119 pool.workerChoiceStrategyContext.workerChoiceStrategy
2120 ).previousWorkerNodeKey
2121 ).toEqual(expect.any(Number))
2122 expect(
2123 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2124 pool.workerChoiceStrategyContext.workerChoiceStrategy
2125 ).roundWeights.length
2126 ).toBe(1)
2127 expect(
2128 Number.isSafeInteger(
2129 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2130 pool.workerChoiceStrategyContext.workerChoiceStrategy
2131 ).roundWeights[0]
2132 )
2133 ).toBe(true)
2134 // We need to clean up the resources after our test
2135 await pool.destroy()
2136 })
2137
2138 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2139 const workerChoiceStrategy =
2140 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2141 let pool = new FixedThreadPool(
2142 max,
2143 './tests/worker-files/thread/testWorker.mjs'
2144 )
2145 expect(
2146 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2147 workerChoiceStrategy
2148 ).roundId
2149 ).toBeDefined()
2150 expect(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2152 workerChoiceStrategy
2153 ).workerNodeId
2154 ).toBeDefined()
2155 expect(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2157 workerChoiceStrategy
2158 ).nextWorkerNodeKey
2159 ).toBeDefined()
2160 expect(
2161 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2162 workerChoiceStrategy
2163 ).previousWorkerNodeKey
2164 ).toBeDefined()
2165 expect(
2166 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2167 workerChoiceStrategy
2168 ).roundWeights
2169 ).toBeDefined()
2170 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2171 expect(
2172 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2173 pool.workerChoiceStrategyContext.workerChoiceStrategy
2174 ).roundId
2175 ).toBe(0)
2176 expect(
2177 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2178 pool.workerChoiceStrategyContext.workerChoiceStrategy
2179 ).workerNodeId
2180 ).toBe(0)
2181 expect(
2182 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2183 pool.workerChoiceStrategyContext.workerChoiceStrategy
2184 ).nextWorkerNodeKey
2185 ).toBe(0)
2186 expect(
2187 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2188 pool.workerChoiceStrategyContext.workerChoiceStrategy
2189 ).previousWorkerNodeKey
2190 ).toBe(0)
2191 expect(
2192 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2193 pool.workerChoiceStrategyContext.workerChoiceStrategy
2194 ).roundWeights.length
2195 ).toBe(1)
2196 expect(
2197 Number.isSafeInteger(
2198 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2199 pool.workerChoiceStrategyContext.workerChoiceStrategy
2200 ).roundWeights[0]
2201 )
2202 ).toBe(true)
2203 await pool.destroy()
2204 pool = new DynamicThreadPool(
2205 min,
2206 max,
2207 './tests/worker-files/thread/testWorker.mjs'
2208 )
2209 expect(
2210 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2211 workerChoiceStrategy
2212 ).roundId
2213 ).toBeDefined()
2214 expect(
2215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2216 workerChoiceStrategy
2217 ).workerNodeId
2218 ).toBeDefined()
2219 expect(
2220 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2221 workerChoiceStrategy
2222 ).nextWorkerNodeKey
2223 ).toBeDefined()
2224 expect(
2225 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2226 workerChoiceStrategy
2227 ).previousWorkerNodeKey
2228 ).toBeDefined()
2229 expect(
2230 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2231 workerChoiceStrategy
2232 ).roundWeights
2233 ).toBeDefined()
2234 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2235 expect(
2236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2237 pool.workerChoiceStrategyContext.workerChoiceStrategy
2238 ).roundId
2239 ).toBe(0)
2240 expect(
2241 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2242 pool.workerChoiceStrategyContext.workerChoiceStrategy
2243 ).workerNodeId
2244 ).toBe(0)
2245 expect(
2246 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2247 pool.workerChoiceStrategyContext.workerChoiceStrategy
2248 ).nextWorkerNodeKey
2249 ).toBe(0)
2250 expect(
2251 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2252 pool.workerChoiceStrategyContext.workerChoiceStrategy
2253 ).previousWorkerNodeKey
2254 ).toBe(0)
2255 expect(
2256 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2257 pool.workerChoiceStrategyContext.workerChoiceStrategy
2258 ).roundWeights.length
2259 ).toBe(1)
2260 expect(
2261 Number.isSafeInteger(
2262 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2263 pool.workerChoiceStrategyContext.workerChoiceStrategy
2264 ).roundWeights[0]
2265 )
2266 ).toBe(true)
2267 // We need to clean up the resources after our test
2268 await pool.destroy()
2269 })
2270
2271 it('Verify unknown strategy throw error', () => {
2272 expect(
2273 () =>
2274 new DynamicThreadPool(
2275 min,
2276 max,
2277 './tests/worker-files/thread/testWorker.mjs',
2278 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2279 )
2280 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2281 })
2282 })