5aa392877d77fb48d3c12738cd983916ff5494c1
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
70 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
71 retries:
72 pool.info.maxSize +
73 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
74 runTime: { median: false },
75 waitTime: { median: false },
76 elu: { median: false },
77 weights: expect.objectContaining({
78 0: expect.any(Number),
79 [pool.info.maxSize - 1]: expect.any(Number)
80 })
81 })
82 await pool.destroy()
83 }
84 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
85 const pool = new DynamicClusterPool(
86 min,
87 max,
88 './tests/worker-files/cluster/testWorker.js'
89 )
90 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
91 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
92 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
93 workerChoiceStrategy
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
96 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
97 retries:
98 pool.info.maxSize +
99 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
100 runTime: { median: false },
101 waitTime: { median: false },
102 elu: { median: false },
103 weights: expect.objectContaining({
104 0: expect.any(Number),
105 [pool.info.maxSize - 1]: expect.any(Number)
106 })
107 })
108 await pool.destroy()
109 }
110 })
111
112 it('Verify available strategies default internals at pool creation', async () => {
113 const pool = new FixedThreadPool(
114 max,
115 './tests/worker-files/thread/testWorker.mjs'
116 )
117 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
118 expect(
119 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
120 workerChoiceStrategy
121 ).nextWorkerNodeKey
122 ).toBe(0)
123 expect(
124 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
125 workerChoiceStrategy
126 ).previousWorkerNodeKey
127 ).toBe(0)
128 if (
129 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
130 ) {
131 expect(
132 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
133 workerChoiceStrategy
134 ).workerNodeVirtualTaskRunTime
135 ).toBe(0)
136 } else if (
137 workerChoiceStrategy ===
138 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
139 ) {
140 expect(
141 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
142 workerChoiceStrategy
143 ).workerNodeVirtualTaskRunTime
144 ).toBe(0)
145 expect(
146 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
147 workerChoiceStrategy
148 ).roundId
149 ).toBe(0)
150 expect(
151 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
152 workerChoiceStrategy
153 ).workerNodeId
154 ).toBe(0)
155 expect(
156 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
157 workerChoiceStrategy
158 ).roundWeights.length
159 ).toBe(1)
160 }
161 }
162 await pool.destroy()
163 })
164
165 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
166 const pool = new DynamicThreadPool(
167 min,
168 max,
169 './tests/worker-files/thread/testWorker.mjs'
170 )
171 expect(pool.starting).toBe(false)
172 expect(pool.workerNodes.length).toBe(min)
173 const maxMultiplier = 10000
174 const promises = new Set()
175 for (let i = 0; i < max * maxMultiplier; i++) {
176 promises.add(pool.execute())
177 }
178 await Promise.all(promises)
179 expect(pool.workerNodes.length).toBe(max)
180 // We need to clean up the resources after our test
181 await pool.destroy()
182 })
183
184 it('Verify ROUND_ROBIN strategy default policy', async () => {
185 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
186 let pool = new FixedThreadPool(
187 max,
188 './tests/worker-files/thread/testWorker.mjs',
189 { workerChoiceStrategy }
190 )
191 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
192 dynamicWorkerUsage: false,
193 dynamicWorkerReady: true
194 })
195 await pool.destroy()
196 pool = new DynamicThreadPool(
197 min,
198 max,
199 './tests/worker-files/thread/testWorker.mjs',
200 { workerChoiceStrategy }
201 )
202 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
203 dynamicWorkerUsage: false,
204 dynamicWorkerReady: true
205 })
206 // We need to clean up the resources after our test
207 await pool.destroy()
208 })
209
210 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
211 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
212 let pool = new FixedThreadPool(
213 max,
214 './tests/worker-files/thread/testWorker.mjs',
215 { workerChoiceStrategy }
216 )
217 expect(
218 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
219 ).toStrictEqual({
220 runTime: {
221 aggregate: false,
222 average: false,
223 median: false
224 },
225 waitTime: {
226 aggregate: false,
227 average: false,
228 median: false
229 },
230 elu: {
231 aggregate: false,
232 average: false,
233 median: false
234 }
235 })
236 await pool.destroy()
237 pool = new DynamicThreadPool(
238 min,
239 max,
240 './tests/worker-files/thread/testWorker.mjs',
241 { workerChoiceStrategy }
242 )
243 expect(
244 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
245 ).toStrictEqual({
246 runTime: {
247 aggregate: false,
248 average: false,
249 median: false
250 },
251 waitTime: {
252 aggregate: false,
253 average: false,
254 median: false
255 },
256 elu: {
257 aggregate: false,
258 average: false,
259 median: false
260 }
261 })
262 // We need to clean up the resources after our test
263 await pool.destroy()
264 })
265
266 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
267 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
268 const pool = new FixedThreadPool(
269 max,
270 './tests/worker-files/thread/testWorker.mjs',
271 { workerChoiceStrategy }
272 )
273 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
274 const promises = new Set()
275 const maxMultiplier = 2
276 for (let i = 0; i < max * maxMultiplier; i++) {
277 promises.add(pool.execute())
278 }
279 await Promise.all(promises)
280 for (const workerNode of pool.workerNodes) {
281 expect(workerNode.usage).toStrictEqual({
282 tasks: {
283 executed: maxMultiplier,
284 executing: 0,
285 queued: 0,
286 maxQueued: 0,
287 sequentiallyStolen: 0,
288 stolen: 0,
289 failed: 0
290 },
291 runTime: {
292 history: new CircularArray()
293 },
294 waitTime: {
295 history: new CircularArray()
296 },
297 elu: {
298 idle: {
299 history: new CircularArray()
300 },
301 active: {
302 history: new CircularArray()
303 }
304 }
305 })
306 }
307 expect(
308 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
309 pool.workerChoiceStrategyContext.workerChoiceStrategy
310 ).nextWorkerNodeKey
311 ).toBe(0)
312 expect(
313 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
314 pool.workerChoiceStrategyContext.workerChoiceStrategy
315 ).previousWorkerNodeKey
316 ).toBe(pool.workerNodes.length - 1)
317 // We need to clean up the resources after our test
318 await pool.destroy()
319 })
320
321 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
322 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
323 const pool = new DynamicThreadPool(
324 min,
325 max,
326 './tests/worker-files/thread/testWorker.mjs',
327 { workerChoiceStrategy }
328 )
329 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
330 const promises = new Set()
331 const maxMultiplier = 2
332 for (let i = 0; i < max * maxMultiplier; i++) {
333 promises.add(pool.execute())
334 }
335 await Promise.all(promises)
336 for (const workerNode of pool.workerNodes) {
337 expect(workerNode.usage).toStrictEqual({
338 tasks: {
339 executed: expect.any(Number),
340 executing: 0,
341 queued: 0,
342 maxQueued: 0,
343 sequentiallyStolen: 0,
344 stolen: 0,
345 failed: 0
346 },
347 runTime: {
348 history: new CircularArray()
349 },
350 waitTime: {
351 history: new CircularArray()
352 },
353 elu: {
354 idle: {
355 history: new CircularArray()
356 },
357 active: {
358 history: new CircularArray()
359 }
360 }
361 })
362 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
363 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
364 max * maxMultiplier
365 )
366 }
367 expect(
368 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
369 pool.workerChoiceStrategyContext.workerChoiceStrategy
370 ).nextWorkerNodeKey
371 ).toBe(0)
372 expect(
373 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
374 pool.workerChoiceStrategyContext.workerChoiceStrategy
375 ).previousWorkerNodeKey
376 ).toBe(pool.workerNodes.length - 1)
377 // We need to clean up the resources after our test
378 await pool.destroy()
379 })
380
381 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
382 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
383 let pool = new FixedClusterPool(
384 max,
385 './tests/worker-files/cluster/testWorker.js',
386 { workerChoiceStrategy }
387 )
388 let results = new Set()
389 for (let i = 0; i < max; i++) {
390 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
391 }
392 expect(results.size).toBe(max)
393 await pool.destroy()
394 pool = new FixedThreadPool(
395 max,
396 './tests/worker-files/thread/testWorker.mjs',
397 { workerChoiceStrategy }
398 )
399 results = new Set()
400 for (let i = 0; i < max; i++) {
401 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
402 }
403 expect(results.size).toBe(max)
404 await pool.destroy()
405 })
406
407 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
408 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
409 let pool = new FixedThreadPool(
410 max,
411 './tests/worker-files/thread/testWorker.mjs',
412 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
413 )
414 expect(
415 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
416 pool.workerChoiceStrategyContext.workerChoiceStrategy
417 ).nextWorkerNodeKey
418 ).toBeDefined()
419 expect(
420 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
421 pool.workerChoiceStrategyContext.workerChoiceStrategy
422 ).previousWorkerNodeKey
423 ).toBeDefined()
424 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
425 expect(
426 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
427 pool.workerChoiceStrategyContext.workerChoiceStrategy
428 ).nextWorkerNodeKey
429 ).toBe(0)
430 expect(
431 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
432 pool.workerChoiceStrategyContext.workerChoiceStrategy
433 ).previousWorkerNodeKey
434 ).toBe(0)
435 await pool.destroy()
436 pool = new DynamicThreadPool(
437 min,
438 max,
439 './tests/worker-files/thread/testWorker.mjs',
440 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
441 )
442 expect(
443 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
444 pool.workerChoiceStrategyContext.workerChoiceStrategy
445 ).nextWorkerNodeKey
446 ).toBeDefined()
447 expect(
448 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
449 pool.workerChoiceStrategyContext.workerChoiceStrategy
450 ).previousWorkerNodeKey
451 ).toBeDefined()
452 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
453 expect(
454 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
455 pool.workerChoiceStrategyContext.workerChoiceStrategy
456 ).nextWorkerNodeKey
457 ).toBe(0)
458 expect(
459 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
460 pool.workerChoiceStrategyContext.workerChoiceStrategy
461 ).previousWorkerNodeKey
462 ).toBe(0)
463 // We need to clean up the resources after our test
464 await pool.destroy()
465 })
466
467 it('Verify LEAST_USED strategy default policy', async () => {
468 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
469 let pool = new FixedThreadPool(
470 max,
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy }
473 )
474 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
475 dynamicWorkerUsage: false,
476 dynamicWorkerReady: true
477 })
478 await pool.destroy()
479 pool = new DynamicThreadPool(
480 min,
481 max,
482 './tests/worker-files/thread/testWorker.mjs',
483 { workerChoiceStrategy }
484 )
485 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
486 dynamicWorkerUsage: false,
487 dynamicWorkerReady: true
488 })
489 // We need to clean up the resources after our test
490 await pool.destroy()
491 })
492
493 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
494 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
495 let pool = new FixedThreadPool(
496 max,
497 './tests/worker-files/thread/testWorker.mjs',
498 { workerChoiceStrategy }
499 )
500 expect(
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
502 ).toStrictEqual({
503 runTime: {
504 aggregate: false,
505 average: false,
506 median: false
507 },
508 waitTime: {
509 aggregate: false,
510 average: false,
511 median: false
512 },
513 elu: {
514 aggregate: false,
515 average: false,
516 median: false
517 }
518 })
519 await pool.destroy()
520 pool = new DynamicThreadPool(
521 min,
522 max,
523 './tests/worker-files/thread/testWorker.mjs',
524 { workerChoiceStrategy }
525 )
526 expect(
527 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
528 ).toStrictEqual({
529 runTime: {
530 aggregate: false,
531 average: false,
532 median: false
533 },
534 waitTime: {
535 aggregate: false,
536 average: false,
537 median: false
538 },
539 elu: {
540 aggregate: false,
541 average: false,
542 median: false
543 }
544 })
545 // We need to clean up the resources after our test
546 await pool.destroy()
547 })
548
549 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
550 const pool = new FixedThreadPool(
551 max,
552 './tests/worker-files/thread/testWorker.mjs',
553 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
554 )
555 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
556 const promises = new Set()
557 const maxMultiplier = 2
558 for (let i = 0; i < max * maxMultiplier; i++) {
559 promises.add(pool.execute())
560 }
561 await Promise.all(promises)
562 for (const workerNode of pool.workerNodes) {
563 expect(workerNode.usage).toStrictEqual({
564 tasks: {
565 executed: expect.any(Number),
566 executing: 0,
567 queued: 0,
568 maxQueued: 0,
569 sequentiallyStolen: 0,
570 stolen: 0,
571 failed: 0
572 },
573 runTime: {
574 history: new CircularArray()
575 },
576 waitTime: {
577 history: new CircularArray()
578 },
579 elu: {
580 idle: {
581 history: new CircularArray()
582 },
583 active: {
584 history: new CircularArray()
585 }
586 }
587 })
588 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
589 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
590 max * maxMultiplier
591 )
592 }
593 expect(
594 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
595 pool.workerChoiceStrategyContext.workerChoiceStrategy
596 ).nextWorkerNodeKey
597 ).toEqual(expect.any(Number))
598 expect(
599 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
600 pool.workerChoiceStrategyContext.workerChoiceStrategy
601 ).previousWorkerNodeKey
602 ).toEqual(expect.any(Number))
603 // We need to clean up the resources after our test
604 await pool.destroy()
605 })
606
607 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
608 const pool = new DynamicThreadPool(
609 min,
610 max,
611 './tests/worker-files/thread/testWorker.mjs',
612 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
613 )
614 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
615 const promises = new Set()
616 const maxMultiplier = 2
617 for (let i = 0; i < max * maxMultiplier; i++) {
618 promises.add(pool.execute())
619 }
620 await Promise.all(promises)
621 for (const workerNode of pool.workerNodes) {
622 expect(workerNode.usage).toStrictEqual({
623 tasks: {
624 executed: expect.any(Number),
625 executing: 0,
626 queued: 0,
627 maxQueued: 0,
628 sequentiallyStolen: 0,
629 stolen: 0,
630 failed: 0
631 },
632 runTime: {
633 history: new CircularArray()
634 },
635 waitTime: {
636 history: new CircularArray()
637 },
638 elu: {
639 idle: {
640 history: new CircularArray()
641 },
642 active: {
643 history: new CircularArray()
644 }
645 }
646 })
647 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
648 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
649 max * maxMultiplier
650 )
651 }
652 expect(
653 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
654 pool.workerChoiceStrategyContext.workerChoiceStrategy
655 ).nextWorkerNodeKey
656 ).toEqual(expect.any(Number))
657 expect(
658 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
659 pool.workerChoiceStrategyContext.workerChoiceStrategy
660 ).previousWorkerNodeKey
661 ).toEqual(expect.any(Number))
662 // We need to clean up the resources after our test
663 await pool.destroy()
664 })
665
666 it('Verify LEAST_BUSY strategy default policy', async () => {
667 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
668 let pool = new FixedThreadPool(
669 max,
670 './tests/worker-files/thread/testWorker.mjs',
671 { workerChoiceStrategy }
672 )
673 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
674 dynamicWorkerUsage: false,
675 dynamicWorkerReady: true
676 })
677 await pool.destroy()
678 pool = new DynamicThreadPool(
679 min,
680 max,
681 './tests/worker-files/thread/testWorker.mjs',
682 { workerChoiceStrategy }
683 )
684 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
685 dynamicWorkerUsage: false,
686 dynamicWorkerReady: true
687 })
688 // We need to clean up the resources after our test
689 await pool.destroy()
690 })
691
692 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
693 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
694 let pool = new FixedThreadPool(
695 max,
696 './tests/worker-files/thread/testWorker.mjs',
697 { workerChoiceStrategy }
698 )
699 expect(
700 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
701 ).toStrictEqual({
702 runTime: {
703 aggregate: true,
704 average: false,
705 median: false
706 },
707 waitTime: {
708 aggregate: true,
709 average: false,
710 median: false
711 },
712 elu: {
713 aggregate: false,
714 average: false,
715 median: false
716 }
717 })
718 await pool.destroy()
719 pool = new DynamicThreadPool(
720 min,
721 max,
722 './tests/worker-files/thread/testWorker.mjs',
723 { workerChoiceStrategy }
724 )
725 expect(
726 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
727 ).toStrictEqual({
728 runTime: {
729 aggregate: true,
730 average: false,
731 median: false
732 },
733 waitTime: {
734 aggregate: true,
735 average: false,
736 median: false
737 },
738 elu: {
739 aggregate: false,
740 average: false,
741 median: false
742 }
743 })
744 // We need to clean up the resources after our test
745 await pool.destroy()
746 })
747
748 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
749 const pool = new FixedThreadPool(
750 max,
751 './tests/worker-files/thread/testWorker.mjs',
752 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
753 )
754 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
755 const promises = new Set()
756 const maxMultiplier = 2
757 for (let i = 0; i < max * maxMultiplier; i++) {
758 promises.add(pool.execute())
759 }
760 await Promise.all(promises)
761 for (const workerNode of pool.workerNodes) {
762 expect(workerNode.usage).toStrictEqual({
763 tasks: {
764 executed: expect.any(Number),
765 executing: 0,
766 queued: 0,
767 maxQueued: 0,
768 sequentiallyStolen: 0,
769 stolen: 0,
770 failed: 0
771 },
772 runTime: expect.objectContaining({
773 history: expect.any(CircularArray)
774 }),
775 waitTime: expect.objectContaining({
776 history: expect.any(CircularArray)
777 }),
778 elu: {
779 idle: {
780 history: new CircularArray()
781 },
782 active: {
783 history: new CircularArray()
784 }
785 }
786 })
787 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
788 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
789 max * maxMultiplier
790 )
791 if (workerNode.usage.runTime.aggregate == null) {
792 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
793 } else {
794 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
795 }
796 if (workerNode.usage.waitTime.aggregate == null) {
797 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
798 } else {
799 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
800 }
801 }
802 expect(
803 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
804 pool.workerChoiceStrategyContext.workerChoiceStrategy
805 ).nextWorkerNodeKey
806 ).toEqual(expect.any(Number))
807 expect(
808 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
809 pool.workerChoiceStrategyContext.workerChoiceStrategy
810 ).previousWorkerNodeKey
811 ).toEqual(expect.any(Number))
812 // We need to clean up the resources after our test
813 await pool.destroy()
814 })
815
816 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
817 const pool = new DynamicThreadPool(
818 min,
819 max,
820 './tests/worker-files/thread/testWorker.mjs',
821 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
822 )
823 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
824 const promises = new Set()
825 const maxMultiplier = 2
826 for (let i = 0; i < max * maxMultiplier; i++) {
827 promises.add(pool.execute())
828 }
829 await Promise.all(promises)
830 for (const workerNode of pool.workerNodes) {
831 expect(workerNode.usage).toStrictEqual({
832 tasks: {
833 executed: expect.any(Number),
834 executing: 0,
835 queued: 0,
836 maxQueued: 0,
837 sequentiallyStolen: 0,
838 stolen: 0,
839 failed: 0
840 },
841 runTime: expect.objectContaining({
842 history: expect.any(CircularArray)
843 }),
844 waitTime: expect.objectContaining({
845 history: expect.any(CircularArray)
846 }),
847 elu: {
848 idle: {
849 history: new CircularArray()
850 },
851 active: {
852 history: new CircularArray()
853 }
854 }
855 })
856 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
857 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
858 max * maxMultiplier
859 )
860 if (workerNode.usage.runTime.aggregate == null) {
861 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
862 } else {
863 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
864 }
865 if (workerNode.usage.waitTime.aggregate == null) {
866 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
867 } else {
868 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
869 }
870 }
871 expect(
872 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
873 pool.workerChoiceStrategyContext.workerChoiceStrategy
874 ).nextWorkerNodeKey
875 ).toEqual(expect.any(Number))
876 expect(
877 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
878 pool.workerChoiceStrategyContext.workerChoiceStrategy
879 ).previousWorkerNodeKey
880 ).toEqual(expect.any(Number))
881 // We need to clean up the resources after our test
882 await pool.destroy()
883 })
884
885 it('Verify LEAST_ELU strategy default policy', async () => {
886 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
887 let pool = new FixedThreadPool(
888 max,
889 './tests/worker-files/thread/testWorker.mjs',
890 { workerChoiceStrategy }
891 )
892 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
893 dynamicWorkerUsage: false,
894 dynamicWorkerReady: true
895 })
896 await pool.destroy()
897 pool = new DynamicThreadPool(
898 min,
899 max,
900 './tests/worker-files/thread/testWorker.mjs',
901 { workerChoiceStrategy }
902 )
903 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
904 dynamicWorkerUsage: false,
905 dynamicWorkerReady: true
906 })
907 // We need to clean up the resources after our test
908 await pool.destroy()
909 })
910
911 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
912 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
913 let pool = new FixedThreadPool(
914 max,
915 './tests/worker-files/thread/testWorker.mjs',
916 { workerChoiceStrategy }
917 )
918 expect(
919 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
920 ).toStrictEqual({
921 runTime: {
922 aggregate: false,
923 average: false,
924 median: false
925 },
926 waitTime: {
927 aggregate: false,
928 average: false,
929 median: false
930 },
931 elu: {
932 aggregate: true,
933 average: false,
934 median: false
935 }
936 })
937 await pool.destroy()
938 pool = new DynamicThreadPool(
939 min,
940 max,
941 './tests/worker-files/thread/testWorker.mjs',
942 { workerChoiceStrategy }
943 )
944 expect(
945 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
946 ).toStrictEqual({
947 runTime: {
948 aggregate: false,
949 average: false,
950 median: false
951 },
952 waitTime: {
953 aggregate: false,
954 average: false,
955 median: false
956 },
957 elu: {
958 aggregate: true,
959 average: false,
960 median: false
961 }
962 })
963 // We need to clean up the resources after our test
964 await pool.destroy()
965 })
966
967 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
968 const pool = new FixedThreadPool(
969 max,
970 './tests/worker-files/thread/testWorker.mjs',
971 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
972 )
973 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
974 const promises = new Set()
975 const maxMultiplier = 2
976 for (let i = 0; i < max * maxMultiplier; i++) {
977 promises.add(pool.execute())
978 }
979 await Promise.all(promises)
980 for (const workerNode of pool.workerNodes) {
981 expect(workerNode.usage).toStrictEqual({
982 tasks: {
983 executed: expect.any(Number),
984 executing: 0,
985 queued: 0,
986 maxQueued: 0,
987 sequentiallyStolen: 0,
988 stolen: 0,
989 failed: 0
990 },
991 runTime: {
992 history: new CircularArray()
993 },
994 waitTime: {
995 history: new CircularArray()
996 },
997 elu: expect.objectContaining({
998 idle: expect.objectContaining({
999 history: expect.any(CircularArray)
1000 }),
1001 active: expect.objectContaining({
1002 history: expect.any(CircularArray)
1003 })
1004 })
1005 })
1006 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1007 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1008 max * maxMultiplier
1009 )
1010 if (workerNode.usage.elu.active.aggregate == null) {
1011 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1012 } else {
1013 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1014 }
1015 if (workerNode.usage.elu.idle.aggregate == null) {
1016 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1017 } else {
1018 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1019 }
1020 if (workerNode.usage.elu.utilization == null) {
1021 expect(workerNode.usage.elu.utilization).toBeUndefined()
1022 } else {
1023 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1024 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1025 }
1026 }
1027 expect(
1028 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1029 pool.workerChoiceStrategyContext.workerChoiceStrategy
1030 ).nextWorkerNodeKey
1031 ).toEqual(expect.any(Number))
1032 expect(
1033 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1034 pool.workerChoiceStrategyContext.workerChoiceStrategy
1035 ).previousWorkerNodeKey
1036 ).toEqual(expect.any(Number))
1037 // We need to clean up the resources after our test
1038 await pool.destroy()
1039 })
1040
1041 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1042 const pool = new DynamicThreadPool(
1043 min,
1044 max,
1045 './tests/worker-files/thread/testWorker.mjs',
1046 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1047 )
1048 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1049 const promises = new Set()
1050 const maxMultiplier = 2
1051 for (let i = 0; i < max * maxMultiplier; i++) {
1052 promises.add(pool.execute())
1053 }
1054 await Promise.all(promises)
1055 for (const workerNode of pool.workerNodes) {
1056 expect(workerNode.usage).toStrictEqual({
1057 tasks: {
1058 executed: expect.any(Number),
1059 executing: 0,
1060 queued: 0,
1061 maxQueued: 0,
1062 sequentiallyStolen: 0,
1063 stolen: 0,
1064 failed: 0
1065 },
1066 runTime: {
1067 history: new CircularArray()
1068 },
1069 waitTime: {
1070 history: new CircularArray()
1071 },
1072 elu: expect.objectContaining({
1073 idle: expect.objectContaining({
1074 history: expect.any(CircularArray)
1075 }),
1076 active: expect.objectContaining({
1077 history: expect.any(CircularArray)
1078 })
1079 })
1080 })
1081 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1082 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1083 max * maxMultiplier
1084 )
1085 if (workerNode.usage.elu.active.aggregate == null) {
1086 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1087 } else {
1088 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1089 }
1090 if (workerNode.usage.elu.idle.aggregate == null) {
1091 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1092 } else {
1093 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1094 }
1095 if (workerNode.usage.elu.utilization == null) {
1096 expect(workerNode.usage.elu.utilization).toBeUndefined()
1097 } else {
1098 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1099 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1100 }
1101 }
1102 expect(
1103 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1104 pool.workerChoiceStrategyContext.workerChoiceStrategy
1105 ).nextWorkerNodeKey
1106 ).toEqual(expect.any(Number))
1107 expect(
1108 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1109 pool.workerChoiceStrategyContext.workerChoiceStrategy
1110 ).previousWorkerNodeKey
1111 ).toEqual(expect.any(Number))
1112 // We need to clean up the resources after our test
1113 await pool.destroy()
1114 })
1115
1116 it('Verify FAIR_SHARE strategy default policy', async () => {
1117 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1118 let pool = new FixedThreadPool(
1119 max,
1120 './tests/worker-files/thread/testWorker.mjs',
1121 { workerChoiceStrategy }
1122 )
1123 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1124 dynamicWorkerUsage: false,
1125 dynamicWorkerReady: true
1126 })
1127 await pool.destroy()
1128 pool = new DynamicThreadPool(
1129 min,
1130 max,
1131 './tests/worker-files/thread/testWorker.mjs',
1132 { workerChoiceStrategy }
1133 )
1134 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1135 dynamicWorkerUsage: false,
1136 dynamicWorkerReady: true
1137 })
1138 // We need to clean up the resources after our test
1139 await pool.destroy()
1140 })
1141
1142 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1143 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1144 let pool = new FixedThreadPool(
1145 max,
1146 './tests/worker-files/thread/testWorker.mjs',
1147 { workerChoiceStrategy }
1148 )
1149 expect(
1150 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1151 ).toStrictEqual({
1152 runTime: {
1153 aggregate: true,
1154 average: true,
1155 median: false
1156 },
1157 waitTime: {
1158 aggregate: false,
1159 average: false,
1160 median: false
1161 },
1162 elu: {
1163 aggregate: true,
1164 average: true,
1165 median: false
1166 }
1167 })
1168 await pool.destroy()
1169 pool = new DynamicThreadPool(
1170 min,
1171 max,
1172 './tests/worker-files/thread/testWorker.mjs',
1173 { workerChoiceStrategy }
1174 )
1175 expect(
1176 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1177 ).toStrictEqual({
1178 runTime: {
1179 aggregate: true,
1180 average: true,
1181 median: false
1182 },
1183 waitTime: {
1184 aggregate: false,
1185 average: false,
1186 median: false
1187 },
1188 elu: {
1189 aggregate: true,
1190 average: true,
1191 median: false
1192 }
1193 })
1194 // We need to clean up the resources after our test
1195 await pool.destroy()
1196 })
1197
1198 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1199 const pool = new FixedThreadPool(
1200 max,
1201 './tests/worker-files/thread/testWorker.mjs',
1202 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1203 )
1204 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1205 const promises = new Set()
1206 const maxMultiplier = 2
1207 for (let i = 0; i < max * maxMultiplier; i++) {
1208 promises.add(pool.execute())
1209 }
1210 await Promise.all(promises)
1211 for (const workerNode of pool.workerNodes) {
1212 expect(workerNode.usage).toStrictEqual({
1213 tasks: {
1214 executed: expect.any(Number),
1215 executing: 0,
1216 queued: 0,
1217 maxQueued: 0,
1218 sequentiallyStolen: 0,
1219 stolen: 0,
1220 failed: 0
1221 },
1222 runTime: expect.objectContaining({
1223 history: expect.any(CircularArray)
1224 }),
1225 waitTime: {
1226 history: new CircularArray()
1227 },
1228 elu: expect.objectContaining({
1229 idle: expect.objectContaining({
1230 history: expect.any(CircularArray)
1231 }),
1232 active: expect.objectContaining({
1233 history: expect.any(CircularArray)
1234 })
1235 })
1236 })
1237 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1238 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1239 max * maxMultiplier
1240 )
1241 if (workerNode.usage.runTime.aggregate == null) {
1242 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1243 } else {
1244 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1245 }
1246 if (workerNode.usage.runTime.average == null) {
1247 expect(workerNode.usage.runTime.average).toBeUndefined()
1248 } else {
1249 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1250 }
1251 if (workerNode.usage.elu.active.aggregate == null) {
1252 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1253 } else {
1254 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1255 }
1256 if (workerNode.usage.elu.idle.aggregate == null) {
1257 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1258 } else {
1259 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1260 }
1261 if (workerNode.usage.elu.utilization == null) {
1262 expect(workerNode.usage.elu.utilization).toBeUndefined()
1263 } else {
1264 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1265 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1266 }
1267 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1268 }
1269 expect(
1270 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1271 pool.workerChoiceStrategyContext.workerChoiceStrategy
1272 ).nextWorkerNodeKey
1273 ).toEqual(expect.any(Number))
1274 expect(
1275 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1276 pool.workerChoiceStrategyContext.workerChoiceStrategy
1277 ).previousWorkerNodeKey
1278 ).toEqual(expect.any(Number))
1279 // We need to clean up the resources after our test
1280 await pool.destroy()
1281 })
1282
1283 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1284 const pool = new DynamicThreadPool(
1285 min,
1286 max,
1287 './tests/worker-files/thread/testWorker.mjs',
1288 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1289 )
1290 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1291 const promises = new Set()
1292 const maxMultiplier = 2
1293 for (let i = 0; i < max * maxMultiplier; i++) {
1294 promises.add(pool.execute())
1295 }
1296 await Promise.all(promises)
1297 for (const workerNode of pool.workerNodes) {
1298 expect(workerNode.usage).toStrictEqual({
1299 tasks: {
1300 executed: expect.any(Number),
1301 executing: 0,
1302 queued: 0,
1303 maxQueued: 0,
1304 sequentiallyStolen: 0,
1305 stolen: 0,
1306 failed: 0
1307 },
1308 runTime: expect.objectContaining({
1309 history: expect.any(CircularArray)
1310 }),
1311 waitTime: {
1312 history: new CircularArray()
1313 },
1314 elu: expect.objectContaining({
1315 idle: expect.objectContaining({
1316 history: expect.any(CircularArray)
1317 }),
1318 active: expect.objectContaining({
1319 history: expect.any(CircularArray)
1320 })
1321 })
1322 })
1323 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1324 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1325 max * maxMultiplier
1326 )
1327 if (workerNode.usage.runTime.aggregate == null) {
1328 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1329 } else {
1330 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1331 }
1332 if (workerNode.usage.runTime.average == null) {
1333 expect(workerNode.usage.runTime.average).toBeUndefined()
1334 } else {
1335 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1336 }
1337 if (workerNode.usage.elu.active.aggregate == null) {
1338 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1339 } else {
1340 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1341 }
1342 if (workerNode.usage.elu.idle.aggregate == null) {
1343 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1344 } else {
1345 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1346 }
1347 if (workerNode.usage.elu.utilization == null) {
1348 expect(workerNode.usage.elu.utilization).toBeUndefined()
1349 } else {
1350 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1351 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1352 }
1353 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1354 }
1355 expect(
1356 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1357 pool.workerChoiceStrategyContext.workerChoiceStrategy
1358 ).nextWorkerNodeKey
1359 ).toEqual(expect.any(Number))
1360 expect(
1361 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1362 pool.workerChoiceStrategyContext.workerChoiceStrategy
1363 ).previousWorkerNodeKey
1364 ).toEqual(expect.any(Number))
1365 // We need to clean up the resources after our test
1366 await pool.destroy()
1367 })
1368
1369 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1370 const pool = new DynamicThreadPool(
1371 min,
1372 max,
1373 './tests/worker-files/thread/testWorker.mjs',
1374 {
1375 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1376 workerChoiceStrategyOptions: {
1377 runTime: { median: true }
1378 }
1379 }
1380 )
1381 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1382 const promises = new Set()
1383 const maxMultiplier = 2
1384 for (let i = 0; i < max * maxMultiplier; i++) {
1385 promises.add(pool.execute())
1386 }
1387 await Promise.all(promises)
1388 for (const workerNode of pool.workerNodes) {
1389 expect(workerNode.usage).toStrictEqual({
1390 tasks: {
1391 executed: expect.any(Number),
1392 executing: 0,
1393 queued: 0,
1394 maxQueued: 0,
1395 sequentiallyStolen: 0,
1396 stolen: 0,
1397 failed: 0
1398 },
1399 runTime: expect.objectContaining({
1400 history: expect.any(CircularArray)
1401 }),
1402 waitTime: {
1403 history: new CircularArray()
1404 },
1405 elu: expect.objectContaining({
1406 idle: expect.objectContaining({
1407 history: expect.any(CircularArray)
1408 }),
1409 active: expect.objectContaining({
1410 history: expect.any(CircularArray)
1411 })
1412 })
1413 })
1414 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1415 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1416 max * maxMultiplier
1417 )
1418 if (workerNode.usage.runTime.aggregate == null) {
1419 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1420 } else {
1421 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1422 }
1423 if (workerNode.usage.runTime.median == null) {
1424 expect(workerNode.usage.runTime.median).toBeUndefined()
1425 } else {
1426 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1427 }
1428 if (workerNode.usage.elu.active.aggregate == null) {
1429 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1430 } else {
1431 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1432 }
1433 if (workerNode.usage.elu.idle.aggregate == null) {
1434 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1435 } else {
1436 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1437 }
1438 if (workerNode.usage.elu.utilization == null) {
1439 expect(workerNode.usage.elu.utilization).toBeUndefined()
1440 } else {
1441 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1442 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1443 }
1444 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1445 }
1446 expect(
1447 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1448 pool.workerChoiceStrategyContext.workerChoiceStrategy
1449 ).nextWorkerNodeKey
1450 ).toEqual(expect.any(Number))
1451 expect(
1452 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1453 pool.workerChoiceStrategyContext.workerChoiceStrategy
1454 ).previousWorkerNodeKey
1455 ).toEqual(expect.any(Number))
1456 // We need to clean up the resources after our test
1457 await pool.destroy()
1458 })
1459
1460 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1461 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1462 let pool = new FixedThreadPool(
1463 max,
1464 './tests/worker-files/thread/testWorker.mjs'
1465 )
1466 for (const workerNode of pool.workerNodes) {
1467 workerNode.strategyData = {
1468 virtualTaskEndTimestamp: performance.now()
1469 }
1470 }
1471 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1472 for (const workerNode of pool.workerNodes) {
1473 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1474 }
1475 await pool.destroy()
1476 pool = new DynamicThreadPool(
1477 min,
1478 max,
1479 './tests/worker-files/thread/testWorker.mjs'
1480 )
1481 for (const workerNode of pool.workerNodes) {
1482 workerNode.strategyData = {
1483 virtualTaskEndTimestamp: performance.now()
1484 }
1485 }
1486 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1487 for (const workerNode of pool.workerNodes) {
1488 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1489 }
1490 // We need to clean up the resources after our test
1491 await pool.destroy()
1492 })
1493
1494 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1495 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1496 let pool = new FixedThreadPool(
1497 max,
1498 './tests/worker-files/thread/testWorker.mjs',
1499 { workerChoiceStrategy }
1500 )
1501 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1502 dynamicWorkerUsage: false,
1503 dynamicWorkerReady: true
1504 })
1505 await pool.destroy()
1506 pool = new DynamicThreadPool(
1507 min,
1508 max,
1509 './tests/worker-files/thread/testWorker.mjs',
1510 { workerChoiceStrategy }
1511 )
1512 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1513 dynamicWorkerUsage: false,
1514 dynamicWorkerReady: true
1515 })
1516 // We need to clean up the resources after our test
1517 await pool.destroy()
1518 })
1519
1520 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1521 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1522 let pool = new FixedThreadPool(
1523 max,
1524 './tests/worker-files/thread/testWorker.mjs',
1525 { workerChoiceStrategy }
1526 )
1527 expect(
1528 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1529 ).toStrictEqual({
1530 runTime: {
1531 aggregate: true,
1532 average: true,
1533 median: false
1534 },
1535 waitTime: {
1536 aggregate: false,
1537 average: false,
1538 median: false
1539 },
1540 elu: {
1541 aggregate: false,
1542 average: false,
1543 median: false
1544 }
1545 })
1546 await pool.destroy()
1547 pool = new DynamicThreadPool(
1548 min,
1549 max,
1550 './tests/worker-files/thread/testWorker.mjs',
1551 { workerChoiceStrategy }
1552 )
1553 expect(
1554 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1555 ).toStrictEqual({
1556 runTime: {
1557 aggregate: true,
1558 average: true,
1559 median: false
1560 },
1561 waitTime: {
1562 aggregate: false,
1563 average: false,
1564 median: false
1565 },
1566 elu: {
1567 aggregate: false,
1568 average: false,
1569 median: false
1570 }
1571 })
1572 // We need to clean up the resources after our test
1573 await pool.destroy()
1574 })
1575
1576 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1577 const pool = new FixedThreadPool(
1578 max,
1579 './tests/worker-files/thread/testWorker.mjs',
1580 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1581 )
1582 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1583 const promises = new Set()
1584 const maxMultiplier = 2
1585 for (let i = 0; i < max * maxMultiplier; i++) {
1586 promises.add(pool.execute())
1587 }
1588 await Promise.all(promises)
1589 for (const workerNode of pool.workerNodes) {
1590 expect(workerNode.usage).toStrictEqual({
1591 tasks: {
1592 executed: expect.any(Number),
1593 executing: 0,
1594 queued: 0,
1595 maxQueued: 0,
1596 sequentiallyStolen: 0,
1597 stolen: 0,
1598 failed: 0
1599 },
1600 runTime: expect.objectContaining({
1601 history: expect.any(CircularArray)
1602 }),
1603 waitTime: {
1604 history: new CircularArray()
1605 },
1606 elu: {
1607 idle: {
1608 history: new CircularArray()
1609 },
1610 active: {
1611 history: new CircularArray()
1612 }
1613 }
1614 })
1615 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1616 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1617 max * maxMultiplier
1618 )
1619 if (workerNode.usage.runTime.aggregate == null) {
1620 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1621 } else {
1622 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1623 }
1624 if (workerNode.usage.runTime.average == null) {
1625 expect(workerNode.usage.runTime.average).toBeUndefined()
1626 } else {
1627 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1628 }
1629 }
1630 expect(
1631 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1632 pool.workerChoiceStrategyContext.workerChoiceStrategy
1633 ).nextWorkerNodeKey
1634 ).toBe(0)
1635 expect(
1636 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1637 pool.workerChoiceStrategyContext.workerChoiceStrategy
1638 ).previousWorkerNodeKey
1639 ).toStrictEqual(expect.any(Number))
1640 expect(
1641 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1642 pool.workerChoiceStrategyContext.workerChoiceStrategy
1643 ).workerNodeVirtualTaskRunTime
1644 ).toBeGreaterThanOrEqual(0)
1645 // We need to clean up the resources after our test
1646 await pool.destroy()
1647 })
1648
1649 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1650 const pool = new DynamicThreadPool(
1651 min,
1652 max,
1653 './tests/worker-files/thread/testWorker.mjs',
1654 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1655 )
1656 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1657 const promises = new Set()
1658 const maxMultiplier = 2
1659 for (let i = 0; i < max * maxMultiplier; i++) {
1660 promises.add(pool.execute())
1661 }
1662 await Promise.all(promises)
1663 for (const workerNode of pool.workerNodes) {
1664 expect(workerNode.usage).toStrictEqual({
1665 tasks: {
1666 executed: expect.any(Number),
1667 executing: 0,
1668 queued: 0,
1669 maxQueued: 0,
1670 sequentiallyStolen: 0,
1671 stolen: 0,
1672 failed: 0
1673 },
1674 runTime: expect.objectContaining({
1675 history: expect.any(CircularArray)
1676 }),
1677 waitTime: {
1678 history: new CircularArray()
1679 },
1680 elu: {
1681 idle: {
1682 history: new CircularArray()
1683 },
1684 active: {
1685 history: new CircularArray()
1686 }
1687 }
1688 })
1689 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1690 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1691 max * maxMultiplier
1692 )
1693 if (workerNode.usage.runTime.aggregate == null) {
1694 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1695 } else {
1696 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1697 }
1698 if (workerNode.usage.runTime.average == null) {
1699 expect(workerNode.usage.runTime.average).toBeUndefined()
1700 } else {
1701 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1702 }
1703 }
1704 expect(
1705 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1706 pool.workerChoiceStrategyContext.workerChoiceStrategy
1707 ).nextWorkerNodeKey
1708 ).toBe(0)
1709 expect(
1710 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1711 pool.workerChoiceStrategyContext.workerChoiceStrategy
1712 ).previousWorkerNodeKey
1713 ).toBe(0)
1714 expect(
1715 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1716 pool.workerChoiceStrategyContext.workerChoiceStrategy
1717 ).workerNodeVirtualTaskRunTime
1718 ).toBeGreaterThanOrEqual(0)
1719 // We need to clean up the resources after our test
1720 await pool.destroy()
1721 })
1722
1723 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1724 const pool = new DynamicThreadPool(
1725 min,
1726 max,
1727 './tests/worker-files/thread/testWorker.mjs',
1728 {
1729 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1730 workerChoiceStrategyOptions: {
1731 runTime: { median: true }
1732 }
1733 }
1734 )
1735 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1736 const promises = new Set()
1737 const maxMultiplier = 2
1738 for (let i = 0; i < max * maxMultiplier; i++) {
1739 promises.add(pool.execute())
1740 }
1741 await Promise.all(promises)
1742 for (const workerNode of pool.workerNodes) {
1743 expect(workerNode.usage).toStrictEqual({
1744 tasks: {
1745 executed: expect.any(Number),
1746 executing: 0,
1747 queued: 0,
1748 maxQueued: 0,
1749 sequentiallyStolen: 0,
1750 stolen: 0,
1751 failed: 0
1752 },
1753 runTime: expect.objectContaining({
1754 history: expect.any(CircularArray)
1755 }),
1756 waitTime: {
1757 history: new CircularArray()
1758 },
1759 elu: {
1760 idle: {
1761 history: new CircularArray()
1762 },
1763 active: {
1764 history: new CircularArray()
1765 }
1766 }
1767 })
1768 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1769 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1770 max * maxMultiplier
1771 )
1772 if (workerNode.usage.runTime.aggregate == null) {
1773 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1774 } else {
1775 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1776 }
1777 if (workerNode.usage.runTime.median == null) {
1778 expect(workerNode.usage.runTime.median).toBeUndefined()
1779 } else {
1780 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1781 }
1782 }
1783 expect(
1784 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1785 pool.workerChoiceStrategyContext.workerChoiceStrategy
1786 ).nextWorkerNodeKey
1787 ).toBe(0)
1788 expect(
1789 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1790 pool.workerChoiceStrategyContext.workerChoiceStrategy
1791 ).previousWorkerNodeKey
1792 ).toBe(0)
1793 expect(
1794 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1795 pool.workerChoiceStrategyContext.workerChoiceStrategy
1796 ).workerNodeVirtualTaskRunTime
1797 ).toBeGreaterThanOrEqual(0)
1798 // We need to clean up the resources after our test
1799 await pool.destroy()
1800 })
1801
1802 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1803 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1804 let pool = new FixedThreadPool(
1805 max,
1806 './tests/worker-files/thread/testWorker.mjs'
1807 )
1808 expect(
1809 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1810 workerChoiceStrategy
1811 ).nextWorkerNodeKey
1812 ).toBeDefined()
1813 expect(
1814 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1815 workerChoiceStrategy
1816 ).previousWorkerNodeKey
1817 ).toBeDefined()
1818 expect(
1819 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1820 workerChoiceStrategy
1821 ).workerNodeVirtualTaskRunTime
1822 ).toBeDefined()
1823 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1824 expect(
1825 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1826 pool.workerChoiceStrategyContext.workerChoiceStrategy
1827 ).nextWorkerNodeKey
1828 ).toBe(0)
1829 expect(
1830 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategy
1832 ).previousWorkerNodeKey
1833 ).toBe(0)
1834 expect(
1835 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategy
1837 ).workerNodeVirtualTaskRunTime
1838 ).toBe(0)
1839 await pool.destroy()
1840 pool = new DynamicThreadPool(
1841 min,
1842 max,
1843 './tests/worker-files/thread/testWorker.mjs'
1844 )
1845 expect(
1846 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1847 workerChoiceStrategy
1848 ).nextWorkerNodeKey
1849 ).toBeDefined()
1850 expect(
1851 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1852 workerChoiceStrategy
1853 ).previousWorkerNodeKey
1854 ).toBeDefined()
1855 expect(
1856 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1857 workerChoiceStrategy
1858 ).workerNodeVirtualTaskRunTime
1859 ).toBeDefined()
1860 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1861 expect(
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 pool.workerChoiceStrategyContext.workerChoiceStrategy
1864 ).nextWorkerNodeKey
1865 ).toBe(0)
1866 expect(
1867 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1868 pool.workerChoiceStrategyContext.workerChoiceStrategy
1869 ).previousWorkerNodeKey
1870 ).toBe(0)
1871 expect(
1872 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1873 pool.workerChoiceStrategyContext.workerChoiceStrategy
1874 ).workerNodeVirtualTaskRunTime
1875 ).toBe(0)
1876 // We need to clean up the resources after our test
1877 await pool.destroy()
1878 })
1879
1880 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1881 const workerChoiceStrategy =
1882 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1883 let pool = new FixedThreadPool(
1884 max,
1885 './tests/worker-files/thread/testWorker.mjs',
1886 { workerChoiceStrategy }
1887 )
1888 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1889 dynamicWorkerUsage: false,
1890 dynamicWorkerReady: true
1891 })
1892 await pool.destroy()
1893 pool = new DynamicThreadPool(
1894 min,
1895 max,
1896 './tests/worker-files/thread/testWorker.mjs',
1897 { workerChoiceStrategy }
1898 )
1899 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1900 dynamicWorkerUsage: false,
1901 dynamicWorkerReady: true
1902 })
1903 // We need to clean up the resources after our test
1904 await pool.destroy()
1905 })
1906
1907 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1908 const workerChoiceStrategy =
1909 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1910 let pool = new FixedThreadPool(
1911 max,
1912 './tests/worker-files/thread/testWorker.mjs',
1913 { workerChoiceStrategy }
1914 )
1915 expect(
1916 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1917 ).toStrictEqual({
1918 runTime: {
1919 aggregate: true,
1920 average: true,
1921 median: false
1922 },
1923 waitTime: {
1924 aggregate: false,
1925 average: false,
1926 median: false
1927 },
1928 elu: {
1929 aggregate: false,
1930 average: false,
1931 median: false
1932 }
1933 })
1934 await pool.destroy()
1935 pool = new DynamicThreadPool(
1936 min,
1937 max,
1938 './tests/worker-files/thread/testWorker.mjs',
1939 { workerChoiceStrategy }
1940 )
1941 expect(
1942 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1943 ).toStrictEqual({
1944 runTime: {
1945 aggregate: true,
1946 average: true,
1947 median: false
1948 },
1949 waitTime: {
1950 aggregate: false,
1951 average: false,
1952 median: false
1953 },
1954 elu: {
1955 aggregate: false,
1956 average: false,
1957 median: false
1958 }
1959 })
1960 // We need to clean up the resources after our test
1961 await pool.destroy()
1962 })
1963
1964 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1965 const pool = new FixedThreadPool(
1966 max,
1967 './tests/worker-files/thread/testWorker.mjs',
1968 {
1969 workerChoiceStrategy:
1970 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1971 }
1972 )
1973 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1974 const promises = new Set()
1975 const maxMultiplier = 2
1976 for (let i = 0; i < max * maxMultiplier; i++) {
1977 promises.add(pool.execute())
1978 }
1979 await Promise.all(promises)
1980 for (const workerNode of pool.workerNodes) {
1981 expect(workerNode.usage).toStrictEqual({
1982 tasks: {
1983 executed: expect.any(Number),
1984 executing: 0,
1985 queued: 0,
1986 maxQueued: 0,
1987 sequentiallyStolen: 0,
1988 stolen: 0,
1989 failed: 0
1990 },
1991 runTime: expect.objectContaining({
1992 history: expect.any(CircularArray)
1993 }),
1994 waitTime: {
1995 history: new CircularArray()
1996 },
1997 elu: {
1998 idle: {
1999 history: new CircularArray()
2000 },
2001 active: {
2002 history: new CircularArray()
2003 }
2004 }
2005 })
2006 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2007 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2008 max * maxMultiplier
2009 )
2010 }
2011 expect(
2012 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2013 pool.workerChoiceStrategyContext.workerChoiceStrategy
2014 ).roundId
2015 ).toBe(0)
2016 expect(
2017 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2018 pool.workerChoiceStrategyContext.workerChoiceStrategy
2019 ).workerNodeId
2020 ).toBe(0)
2021 expect(
2022 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2023 pool.workerChoiceStrategyContext.workerChoiceStrategy
2024 ).nextWorkerNodeKey
2025 ).toBe(0)
2026 expect(
2027 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2028 pool.workerChoiceStrategyContext.workerChoiceStrategy
2029 ).previousWorkerNodeKey
2030 ).toEqual(expect.any(Number))
2031 expect(
2032 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2033 pool.workerChoiceStrategyContext.workerChoiceStrategy
2034 ).roundWeights.length
2035 ).toBe(1)
2036 // We need to clean up the resources after our test
2037 await pool.destroy()
2038 })
2039
2040 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2041 const pool = new DynamicThreadPool(
2042 min,
2043 max,
2044 './tests/worker-files/thread/testWorker.mjs',
2045 {
2046 workerChoiceStrategy:
2047 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2048 }
2049 )
2050 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2051 const promises = new Set()
2052 const maxMultiplier = 2
2053 for (let i = 0; i < max * maxMultiplier; i++) {
2054 promises.add(pool.execute())
2055 }
2056 await Promise.all(promises)
2057 for (const workerNode of pool.workerNodes) {
2058 expect(workerNode.usage).toStrictEqual({
2059 tasks: {
2060 executed: expect.any(Number),
2061 executing: 0,
2062 queued: 0,
2063 maxQueued: 0,
2064 sequentiallyStolen: 0,
2065 stolen: 0,
2066 failed: 0
2067 },
2068 runTime: expect.objectContaining({
2069 history: expect.any(CircularArray)
2070 }),
2071 waitTime: {
2072 history: new CircularArray()
2073 },
2074 elu: {
2075 idle: {
2076 history: new CircularArray()
2077 },
2078 active: {
2079 history: new CircularArray()
2080 }
2081 }
2082 })
2083 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2084 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2085 max * maxMultiplier
2086 )
2087 }
2088 expect(
2089 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2090 pool.workerChoiceStrategyContext.workerChoiceStrategy
2091 ).roundId
2092 ).toBe(0)
2093 expect(
2094 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2095 pool.workerChoiceStrategyContext.workerChoiceStrategy
2096 ).workerNodeId
2097 ).toBe(0)
2098 expect(
2099 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2100 pool.workerChoiceStrategyContext.workerChoiceStrategy
2101 ).nextWorkerNodeKey
2102 ).toBe(0)
2103 expect(
2104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2105 pool.workerChoiceStrategyContext.workerChoiceStrategy
2106 ).previousWorkerNodeKey
2107 ).toEqual(expect.any(Number))
2108 expect(
2109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2110 pool.workerChoiceStrategyContext.workerChoiceStrategy
2111 ).roundWeights.length
2112 ).toBe(1)
2113 // We need to clean up the resources after our test
2114 await pool.destroy()
2115 })
2116
2117 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2118 const workerChoiceStrategy =
2119 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2120 let pool = new FixedThreadPool(
2121 max,
2122 './tests/worker-files/thread/testWorker.mjs'
2123 )
2124 expect(
2125 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2126 workerChoiceStrategy
2127 ).roundId
2128 ).toBeDefined()
2129 expect(
2130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2131 workerChoiceStrategy
2132 ).workerNodeId
2133 ).toBeDefined()
2134 expect(
2135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2136 workerChoiceStrategy
2137 ).nextWorkerNodeKey
2138 ).toBeDefined()
2139 expect(
2140 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2141 workerChoiceStrategy
2142 ).previousWorkerNodeKey
2143 ).toBeDefined()
2144 expect(
2145 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2146 workerChoiceStrategy
2147 ).roundWeights
2148 ).toBeDefined()
2149 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2150 expect(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2152 pool.workerChoiceStrategyContext.workerChoiceStrategy
2153 ).roundId
2154 ).toBe(0)
2155 expect(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2157 pool.workerChoiceStrategyContext.workerChoiceStrategy
2158 ).workerNodeId
2159 ).toBe(0)
2160 expect(
2161 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2162 pool.workerChoiceStrategyContext.workerChoiceStrategy
2163 ).nextWorkerNodeKey
2164 ).toBe(0)
2165 expect(
2166 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2167 pool.workerChoiceStrategyContext.workerChoiceStrategy
2168 ).previousWorkerNodeKey
2169 ).toBe(0)
2170 expect(
2171 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2172 pool.workerChoiceStrategyContext.workerChoiceStrategy
2173 ).roundWeights.length
2174 ).toBe(1)
2175 await pool.destroy()
2176 pool = new DynamicThreadPool(
2177 min,
2178 max,
2179 './tests/worker-files/thread/testWorker.mjs'
2180 )
2181 expect(
2182 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2183 workerChoiceStrategy
2184 ).roundId
2185 ).toBeDefined()
2186 expect(
2187 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2188 workerChoiceStrategy
2189 ).workerNodeId
2190 ).toBeDefined()
2191 expect(
2192 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2193 workerChoiceStrategy
2194 ).nextWorkerNodeKey
2195 ).toBeDefined()
2196 expect(
2197 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2198 workerChoiceStrategy
2199 ).previousWorkerNodeKey
2200 ).toBeDefined()
2201 expect(
2202 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2203 workerChoiceStrategy
2204 ).roundWeights
2205 ).toBeDefined()
2206 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2207 expect(
2208 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2209 pool.workerChoiceStrategyContext.workerChoiceStrategy
2210 ).roundId
2211 ).toBe(0)
2212 expect(
2213 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2214 pool.workerChoiceStrategyContext.workerChoiceStrategy
2215 ).workerNodeId
2216 ).toBe(0)
2217 expect(
2218 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2219 pool.workerChoiceStrategyContext.workerChoiceStrategy
2220 ).nextWorkerNodeKey
2221 ).toBe(0)
2222 expect(
2223 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2224 pool.workerChoiceStrategyContext.workerChoiceStrategy
2225 ).previousWorkerNodeKey
2226 ).toBe(0)
2227 expect(
2228 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2229 pool.workerChoiceStrategyContext.workerChoiceStrategy
2230 ).roundWeights.length
2231 ).toBe(1)
2232 // We need to clean up the resources after our test
2233 await pool.destroy()
2234 })
2235
2236 it('Verify unknown strategy throw error', () => {
2237 expect(
2238 () =>
2239 new DynamicThreadPool(
2240 min,
2241 max,
2242 './tests/worker-files/thread/testWorker.mjs',
2243 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2244 )
2245 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2246 })
2247 })