51478a0620767d03730d8c441baf9e5ab4c18ea4
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } = require('../../../lib')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.js'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.js',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.js'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.js'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 expect(
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
118 workerChoiceStrategy
119 ).nextWorkerNodeKey
120 ).toBe(0)
121 expect(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).previousWorkerNodeKey
125 ).toBe(0)
126 if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
127 expect(
128 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
129 workerChoiceStrategy
130 ).workersVirtualTaskEndTimestamp
131 ).toStrictEqual([])
132 } else if (
133 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
134 ) {
135 expect(
136 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
137 workerChoiceStrategy
138 ).defaultWorkerWeight
139 ).toBeGreaterThan(0)
140 expect(
141 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
142 workerChoiceStrategy
143 ).workerVirtualTaskRunTime
144 ).toBe(0)
145 } else if (
146 workerChoiceStrategy ===
147 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
148 ) {
149 expect(
150 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
151 workerChoiceStrategy
152 ).defaultWorkerWeight
153 ).toBeGreaterThan(0)
154 expect(
155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
156 workerChoiceStrategy
157 ).workerVirtualTaskRunTime
158 ).toBe(0)
159 expect(
160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
161 workerChoiceStrategy
162 ).roundId
163 ).toBe(0)
164 expect(
165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
166 workerChoiceStrategy
167 ).workerNodeId
168 ).toBe(0)
169 expect(
170 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
171 workerChoiceStrategy
172 ).roundWeights
173 ).toStrictEqual([
174 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
175 workerChoiceStrategy
176 ).defaultWorkerWeight
177 ])
178 }
179 }
180 await pool.destroy()
181 })
182
183 it('Verify ROUND_ROBIN strategy default policy', async () => {
184 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
185 let pool = new FixedThreadPool(
186 max,
187 './tests/worker-files/thread/testWorker.js',
188 { workerChoiceStrategy }
189 )
190 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
191 dynamicWorkerUsage: false,
192 dynamicWorkerReady: true
193 })
194 await pool.destroy()
195 pool = new DynamicThreadPool(
196 min,
197 max,
198 './tests/worker-files/thread/testWorker.js',
199 { workerChoiceStrategy }
200 )
201 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
202 dynamicWorkerUsage: false,
203 dynamicWorkerReady: true
204 })
205 // We need to clean up the resources after our test
206 await pool.destroy()
207 })
208
209 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
210 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
211 let pool = new FixedThreadPool(
212 max,
213 './tests/worker-files/thread/testWorker.js',
214 { workerChoiceStrategy }
215 )
216 expect(
217 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
218 ).toStrictEqual({
219 runTime: {
220 aggregate: false,
221 average: false,
222 median: false
223 },
224 waitTime: {
225 aggregate: false,
226 average: false,
227 median: false
228 },
229 elu: {
230 aggregate: false,
231 average: false,
232 median: false
233 }
234 })
235 await pool.destroy()
236 pool = new DynamicThreadPool(
237 min,
238 max,
239 './tests/worker-files/thread/testWorker.js',
240 { workerChoiceStrategy }
241 )
242 expect(
243 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
244 ).toStrictEqual({
245 runTime: {
246 aggregate: false,
247 average: false,
248 median: false
249 },
250 waitTime: {
251 aggregate: false,
252 average: false,
253 median: false
254 },
255 elu: {
256 aggregate: false,
257 average: false,
258 median: false
259 }
260 })
261 // We need to clean up the resources after our test
262 await pool.destroy()
263 })
264
265 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
266 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
267 const pool = new FixedThreadPool(
268 max,
269 './tests/worker-files/thread/testWorker.js',
270 { workerChoiceStrategy }
271 )
272 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
273 const promises = new Set()
274 const maxMultiplier = 2
275 for (let i = 0; i < max * maxMultiplier; i++) {
276 promises.add(pool.execute())
277 }
278 await Promise.all(promises)
279 for (const workerNode of pool.workerNodes) {
280 expect(workerNode.usage).toStrictEqual({
281 tasks: {
282 executed: maxMultiplier,
283 executing: 0,
284 queued: 0,
285 maxQueued: 0,
286 stolen: 0,
287 failed: 0
288 },
289 runTime: {
290 history: new CircularArray()
291 },
292 waitTime: {
293 history: new CircularArray()
294 },
295 elu: {
296 idle: {
297 history: new CircularArray()
298 },
299 active: {
300 history: new CircularArray()
301 }
302 }
303 })
304 }
305 expect(
306 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
307 pool.workerChoiceStrategyContext.workerChoiceStrategy
308 ).nextWorkerNodeKey
309 ).toBe(0)
310 expect(
311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
312 pool.workerChoiceStrategyContext.workerChoiceStrategy
313 ).previousWorkerNodeKey
314 ).toBe(pool.workerNodes.length - 1)
315 // We need to clean up the resources after our test
316 await pool.destroy()
317 })
318
319 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
320 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
321 const pool = new DynamicThreadPool(
322 min,
323 max,
324 './tests/worker-files/thread/testWorker.js',
325 { workerChoiceStrategy }
326 )
327 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
328 const promises = new Set()
329 const maxMultiplier = 2
330 for (let i = 0; i < max * maxMultiplier; i++) {
331 promises.add(pool.execute())
332 }
333 await Promise.all(promises)
334 for (const workerNode of pool.workerNodes) {
335 expect(workerNode.usage).toStrictEqual({
336 tasks: {
337 executed: expect.any(Number),
338 executing: 0,
339 queued: 0,
340 maxQueued: 0,
341 stolen: 0,
342 failed: 0
343 },
344 runTime: {
345 history: new CircularArray()
346 },
347 waitTime: {
348 history: new CircularArray()
349 },
350 elu: {
351 idle: {
352 history: new CircularArray()
353 },
354 active: {
355 history: new CircularArray()
356 }
357 }
358 })
359 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
360 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
361 max * maxMultiplier
362 )
363 }
364 expect(
365 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
366 pool.workerChoiceStrategyContext.workerChoiceStrategy
367 ).nextWorkerNodeKey
368 ).toBe(0)
369 expect(
370 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
371 pool.workerChoiceStrategyContext.workerChoiceStrategy
372 ).previousWorkerNodeKey
373 ).toBe(pool.workerNodes.length - 1)
374 // We need to clean up the resources after our test
375 await pool.destroy()
376 })
377
378 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
379 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
380 let pool = new FixedClusterPool(
381 max,
382 './tests/worker-files/cluster/testWorker.js',
383 { workerChoiceStrategy }
384 )
385 let results = new Set()
386 for (let i = 0; i < max; i++) {
387 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
388 }
389 expect(results.size).toBe(max)
390 await pool.destroy()
391 pool = new FixedThreadPool(
392 max,
393 './tests/worker-files/thread/testWorker.js',
394 { workerChoiceStrategy }
395 )
396 results = new Set()
397 for (let i = 0; i < max; i++) {
398 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
399 }
400 expect(results.size).toBe(max)
401 await pool.destroy()
402 })
403
404 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
405 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
406 let pool = new FixedThreadPool(
407 max,
408 './tests/worker-files/thread/testWorker.js',
409 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
410 )
411 expect(
412 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
413 pool.workerChoiceStrategyContext.workerChoiceStrategy
414 ).nextWorkerNodeKey
415 ).toBeDefined()
416 expect(
417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
418 pool.workerChoiceStrategyContext.workerChoiceStrategy
419 ).previousWorkerNodeKey
420 ).toBeDefined()
421 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
422 expect(
423 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
424 pool.workerChoiceStrategyContext.workerChoiceStrategy
425 ).nextWorkerNodeKey
426 ).toBe(0)
427 expect(
428 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
429 pool.workerChoiceStrategyContext.workerChoiceStrategy
430 ).previousWorkerNodeKey
431 ).toBe(0)
432 await pool.destroy()
433 pool = new DynamicThreadPool(
434 min,
435 max,
436 './tests/worker-files/thread/testWorker.js',
437 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
438 )
439 expect(
440 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
441 pool.workerChoiceStrategyContext.workerChoiceStrategy
442 ).nextWorkerNodeKey
443 ).toBeDefined()
444 expect(
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
447 ).previousWorkerNodeKey
448 ).toBeDefined()
449 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
450 expect(
451 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
452 pool.workerChoiceStrategyContext.workerChoiceStrategy
453 ).nextWorkerNodeKey
454 ).toBe(0)
455 expect(
456 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
457 pool.workerChoiceStrategyContext.workerChoiceStrategy
458 ).previousWorkerNodeKey
459 ).toBe(0)
460 // We need to clean up the resources after our test
461 await pool.destroy()
462 })
463
464 it('Verify LEAST_USED strategy default policy', async () => {
465 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
466 let pool = new FixedThreadPool(
467 max,
468 './tests/worker-files/thread/testWorker.js',
469 { workerChoiceStrategy }
470 )
471 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
472 dynamicWorkerUsage: false,
473 dynamicWorkerReady: true
474 })
475 await pool.destroy()
476 pool = new DynamicThreadPool(
477 min,
478 max,
479 './tests/worker-files/thread/testWorker.js',
480 { workerChoiceStrategy }
481 )
482 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
483 dynamicWorkerUsage: false,
484 dynamicWorkerReady: true
485 })
486 // We need to clean up the resources after our test
487 await pool.destroy()
488 })
489
490 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
491 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
492 let pool = new FixedThreadPool(
493 max,
494 './tests/worker-files/thread/testWorker.js',
495 { workerChoiceStrategy }
496 )
497 expect(
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
499 ).toStrictEqual({
500 runTime: {
501 aggregate: false,
502 average: false,
503 median: false
504 },
505 waitTime: {
506 aggregate: false,
507 average: false,
508 median: false
509 },
510 elu: {
511 aggregate: false,
512 average: false,
513 median: false
514 }
515 })
516 await pool.destroy()
517 pool = new DynamicThreadPool(
518 min,
519 max,
520 './tests/worker-files/thread/testWorker.js',
521 { workerChoiceStrategy }
522 )
523 expect(
524 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
525 ).toStrictEqual({
526 runTime: {
527 aggregate: false,
528 average: false,
529 median: false
530 },
531 waitTime: {
532 aggregate: false,
533 average: false,
534 median: false
535 },
536 elu: {
537 aggregate: false,
538 average: false,
539 median: false
540 }
541 })
542 // We need to clean up the resources after our test
543 await pool.destroy()
544 })
545
546 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
547 const pool = new FixedThreadPool(
548 max,
549 './tests/worker-files/thread/testWorker.js',
550 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
551 )
552 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
553 const promises = new Set()
554 const maxMultiplier = 2
555 for (let i = 0; i < max * maxMultiplier; i++) {
556 promises.add(pool.execute())
557 }
558 await Promise.all(promises)
559 for (const workerNode of pool.workerNodes) {
560 expect(workerNode.usage).toStrictEqual({
561 tasks: {
562 executed: expect.any(Number),
563 executing: 0,
564 queued: 0,
565 maxQueued: 0,
566 stolen: 0,
567 failed: 0
568 },
569 runTime: {
570 history: new CircularArray()
571 },
572 waitTime: {
573 history: new CircularArray()
574 },
575 elu: {
576 idle: {
577 history: new CircularArray()
578 },
579 active: {
580 history: new CircularArray()
581 }
582 }
583 })
584 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
585 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
586 max * maxMultiplier
587 )
588 }
589 expect(
590 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
591 pool.workerChoiceStrategyContext.workerChoiceStrategy
592 ).nextWorkerNodeKey
593 ).toEqual(expect.any(Number))
594 expect(
595 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
596 pool.workerChoiceStrategyContext.workerChoiceStrategy
597 ).previousWorkerNodeKey
598 ).toEqual(expect.any(Number))
599 // We need to clean up the resources after our test
600 await pool.destroy()
601 })
602
603 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
604 const pool = new DynamicThreadPool(
605 min,
606 max,
607 './tests/worker-files/thread/testWorker.js',
608 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
609 )
610 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
611 const promises = new Set()
612 const maxMultiplier = 2
613 for (let i = 0; i < max * maxMultiplier; i++) {
614 promises.add(pool.execute())
615 }
616 await Promise.all(promises)
617 for (const workerNode of pool.workerNodes) {
618 expect(workerNode.usage).toStrictEqual({
619 tasks: {
620 executed: expect.any(Number),
621 executing: 0,
622 queued: 0,
623 maxQueued: 0,
624 stolen: 0,
625 failed: 0
626 },
627 runTime: {
628 history: new CircularArray()
629 },
630 waitTime: {
631 history: new CircularArray()
632 },
633 elu: {
634 idle: {
635 history: new CircularArray()
636 },
637 active: {
638 history: new CircularArray()
639 }
640 }
641 })
642 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
643 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
644 max * maxMultiplier
645 )
646 }
647 expect(
648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
649 pool.workerChoiceStrategyContext.workerChoiceStrategy
650 ).nextWorkerNodeKey
651 ).toEqual(expect.any(Number))
652 expect(
653 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
654 pool.workerChoiceStrategyContext.workerChoiceStrategy
655 ).previousWorkerNodeKey
656 ).toEqual(expect.any(Number))
657 // We need to clean up the resources after our test
658 await pool.destroy()
659 })
660
661 it('Verify LEAST_BUSY strategy default policy', async () => {
662 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
663 let pool = new FixedThreadPool(
664 max,
665 './tests/worker-files/thread/testWorker.js',
666 { workerChoiceStrategy }
667 )
668 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
669 dynamicWorkerUsage: false,
670 dynamicWorkerReady: true
671 })
672 await pool.destroy()
673 pool = new DynamicThreadPool(
674 min,
675 max,
676 './tests/worker-files/thread/testWorker.js',
677 { workerChoiceStrategy }
678 )
679 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
680 dynamicWorkerUsage: false,
681 dynamicWorkerReady: true
682 })
683 // We need to clean up the resources after our test
684 await pool.destroy()
685 })
686
687 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
688 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
689 let pool = new FixedThreadPool(
690 max,
691 './tests/worker-files/thread/testWorker.js',
692 { workerChoiceStrategy }
693 )
694 expect(
695 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
696 ).toStrictEqual({
697 runTime: {
698 aggregate: true,
699 average: false,
700 median: false
701 },
702 waitTime: {
703 aggregate: true,
704 average: false,
705 median: false
706 },
707 elu: {
708 aggregate: false,
709 average: false,
710 median: false
711 }
712 })
713 await pool.destroy()
714 pool = new DynamicThreadPool(
715 min,
716 max,
717 './tests/worker-files/thread/testWorker.js',
718 { workerChoiceStrategy }
719 )
720 expect(
721 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
722 ).toStrictEqual({
723 runTime: {
724 aggregate: true,
725 average: false,
726 median: false
727 },
728 waitTime: {
729 aggregate: true,
730 average: false,
731 median: false
732 },
733 elu: {
734 aggregate: false,
735 average: false,
736 median: false
737 }
738 })
739 // We need to clean up the resources after our test
740 await pool.destroy()
741 })
742
743 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
744 const pool = new FixedThreadPool(
745 max,
746 './tests/worker-files/thread/testWorker.js',
747 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
748 )
749 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
750 const promises = new Set()
751 const maxMultiplier = 2
752 for (let i = 0; i < max * maxMultiplier; i++) {
753 promises.add(pool.execute())
754 }
755 await Promise.all(promises)
756 for (const workerNode of pool.workerNodes) {
757 expect(workerNode.usage).toStrictEqual({
758 tasks: {
759 executed: expect.any(Number),
760 executing: 0,
761 queued: 0,
762 maxQueued: 0,
763 stolen: 0,
764 failed: 0
765 },
766 runTime: expect.objectContaining({
767 history: expect.any(CircularArray)
768 }),
769 waitTime: expect.objectContaining({
770 history: expect.any(CircularArray)
771 }),
772 elu: {
773 idle: {
774 history: new CircularArray()
775 },
776 active: {
777 history: new CircularArray()
778 }
779 }
780 })
781 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
782 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
783 max * maxMultiplier
784 )
785 if (workerNode.usage.runTime.aggregate == null) {
786 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
787 } else {
788 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
789 }
790 if (workerNode.usage.waitTime.aggregate == null) {
791 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
792 } else {
793 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
794 }
795 }
796 expect(
797 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
798 pool.workerChoiceStrategyContext.workerChoiceStrategy
799 ).nextWorkerNodeKey
800 ).toEqual(expect.any(Number))
801 expect(
802 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
803 pool.workerChoiceStrategyContext.workerChoiceStrategy
804 ).previousWorkerNodeKey
805 ).toEqual(expect.any(Number))
806 // We need to clean up the resources after our test
807 await pool.destroy()
808 })
809
810 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
811 const pool = new DynamicThreadPool(
812 min,
813 max,
814 './tests/worker-files/thread/testWorker.js',
815 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
816 )
817 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
818 const promises = new Set()
819 const maxMultiplier = 2
820 for (let i = 0; i < max * maxMultiplier; i++) {
821 promises.add(pool.execute())
822 }
823 await Promise.all(promises)
824 for (const workerNode of pool.workerNodes) {
825 expect(workerNode.usage).toStrictEqual({
826 tasks: {
827 executed: expect.any(Number),
828 executing: 0,
829 queued: 0,
830 maxQueued: 0,
831 stolen: 0,
832 failed: 0
833 },
834 runTime: expect.objectContaining({
835 history: expect.any(CircularArray)
836 }),
837 waitTime: expect.objectContaining({
838 history: expect.any(CircularArray)
839 }),
840 elu: {
841 idle: {
842 history: new CircularArray()
843 },
844 active: {
845 history: new CircularArray()
846 }
847 }
848 })
849 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
850 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
851 max * maxMultiplier
852 )
853 if (workerNode.usage.runTime.aggregate == null) {
854 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
855 } else {
856 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
857 }
858 if (workerNode.usage.waitTime.aggregate == null) {
859 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
860 } else {
861 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
862 }
863 }
864 expect(
865 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
866 pool.workerChoiceStrategyContext.workerChoiceStrategy
867 ).nextWorkerNodeKey
868 ).toEqual(expect.any(Number))
869 expect(
870 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
871 pool.workerChoiceStrategyContext.workerChoiceStrategy
872 ).previousWorkerNodeKey
873 ).toEqual(expect.any(Number))
874 // We need to clean up the resources after our test
875 await pool.destroy()
876 })
877
878 it('Verify LEAST_ELU strategy default policy', async () => {
879 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
880 let pool = new FixedThreadPool(
881 max,
882 './tests/worker-files/thread/testWorker.js',
883 { workerChoiceStrategy }
884 )
885 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
886 dynamicWorkerUsage: false,
887 dynamicWorkerReady: true
888 })
889 await pool.destroy()
890 pool = new DynamicThreadPool(
891 min,
892 max,
893 './tests/worker-files/thread/testWorker.js',
894 { workerChoiceStrategy }
895 )
896 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
897 dynamicWorkerUsage: false,
898 dynamicWorkerReady: true
899 })
900 // We need to clean up the resources after our test
901 await pool.destroy()
902 })
903
904 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
905 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
906 let pool = new FixedThreadPool(
907 max,
908 './tests/worker-files/thread/testWorker.js',
909 { workerChoiceStrategy }
910 )
911 expect(
912 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
913 ).toStrictEqual({
914 runTime: {
915 aggregate: false,
916 average: false,
917 median: false
918 },
919 waitTime: {
920 aggregate: false,
921 average: false,
922 median: false
923 },
924 elu: {
925 aggregate: true,
926 average: false,
927 median: false
928 }
929 })
930 await pool.destroy()
931 pool = new DynamicThreadPool(
932 min,
933 max,
934 './tests/worker-files/thread/testWorker.js',
935 { workerChoiceStrategy }
936 )
937 expect(
938 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
939 ).toStrictEqual({
940 runTime: {
941 aggregate: false,
942 average: false,
943 median: false
944 },
945 waitTime: {
946 aggregate: false,
947 average: false,
948 median: false
949 },
950 elu: {
951 aggregate: true,
952 average: false,
953 median: false
954 }
955 })
956 // We need to clean up the resources after our test
957 await pool.destroy()
958 })
959
960 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
961 const pool = new FixedThreadPool(
962 max,
963 './tests/worker-files/thread/testWorker.js',
964 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
965 )
966 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
967 const promises = new Set()
968 const maxMultiplier = 2
969 for (let i = 0; i < max * maxMultiplier; i++) {
970 promises.add(pool.execute())
971 }
972 await Promise.all(promises)
973 for (const workerNode of pool.workerNodes) {
974 expect(workerNode.usage).toStrictEqual({
975 tasks: {
976 executed: expect.any(Number),
977 executing: 0,
978 queued: 0,
979 maxQueued: 0,
980 stolen: 0,
981 failed: 0
982 },
983 runTime: {
984 history: new CircularArray()
985 },
986 waitTime: {
987 history: new CircularArray()
988 },
989 elu: expect.objectContaining({
990 idle: expect.objectContaining({
991 history: expect.any(CircularArray)
992 }),
993 active: expect.objectContaining({
994 history: expect.any(CircularArray)
995 })
996 })
997 })
998 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
999 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1000 max * maxMultiplier
1001 )
1002 if (workerNode.usage.elu.active.aggregate == null) {
1003 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1004 } else {
1005 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1006 }
1007 if (workerNode.usage.elu.idle.aggregate == null) {
1008 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1009 } else {
1010 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1011 }
1012 if (workerNode.usage.elu.utilization == null) {
1013 expect(workerNode.usage.elu.utilization).toBeUndefined()
1014 } else {
1015 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1016 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1017 }
1018 }
1019 expect(
1020 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1021 pool.workerChoiceStrategyContext.workerChoiceStrategy
1022 ).nextWorkerNodeKey
1023 ).toEqual(expect.any(Number))
1024 expect(
1025 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1026 pool.workerChoiceStrategyContext.workerChoiceStrategy
1027 ).previousWorkerNodeKey
1028 ).toEqual(expect.any(Number))
1029 // We need to clean up the resources after our test
1030 await pool.destroy()
1031 })
1032
1033 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1034 const pool = new DynamicThreadPool(
1035 min,
1036 max,
1037 './tests/worker-files/thread/testWorker.js',
1038 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1039 )
1040 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1041 const promises = new Set()
1042 const maxMultiplier = 2
1043 for (let i = 0; i < max * maxMultiplier; i++) {
1044 promises.add(pool.execute())
1045 }
1046 await Promise.all(promises)
1047 for (const workerNode of pool.workerNodes) {
1048 expect(workerNode.usage).toStrictEqual({
1049 tasks: {
1050 executed: expect.any(Number),
1051 executing: 0,
1052 queued: 0,
1053 maxQueued: 0,
1054 stolen: 0,
1055 failed: 0
1056 },
1057 runTime: {
1058 history: new CircularArray()
1059 },
1060 waitTime: {
1061 history: new CircularArray()
1062 },
1063 elu: expect.objectContaining({
1064 idle: expect.objectContaining({
1065 history: expect.any(CircularArray)
1066 }),
1067 active: expect.objectContaining({
1068 history: expect.any(CircularArray)
1069 })
1070 })
1071 })
1072 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 max * maxMultiplier
1075 )
1076 if (workerNode.usage.elu.active.aggregate == null) {
1077 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1078 } else {
1079 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1080 }
1081 if (workerNode.usage.elu.idle.aggregate == null) {
1082 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1083 } else {
1084 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1085 }
1086 if (workerNode.usage.elu.utilization == null) {
1087 expect(workerNode.usage.elu.utilization).toBeUndefined()
1088 } else {
1089 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1090 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1091 }
1092 }
1093 expect(
1094 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1095 pool.workerChoiceStrategyContext.workerChoiceStrategy
1096 ).nextWorkerNodeKey
1097 ).toEqual(expect.any(Number))
1098 expect(
1099 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1100 pool.workerChoiceStrategyContext.workerChoiceStrategy
1101 ).previousWorkerNodeKey
1102 ).toEqual(expect.any(Number))
1103 // We need to clean up the resources after our test
1104 await pool.destroy()
1105 })
1106
1107 it('Verify FAIR_SHARE strategy default policy', async () => {
1108 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1109 let pool = new FixedThreadPool(
1110 max,
1111 './tests/worker-files/thread/testWorker.js',
1112 { workerChoiceStrategy }
1113 )
1114 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1115 dynamicWorkerUsage: false,
1116 dynamicWorkerReady: true
1117 })
1118 await pool.destroy()
1119 pool = new DynamicThreadPool(
1120 min,
1121 max,
1122 './tests/worker-files/thread/testWorker.js',
1123 { workerChoiceStrategy }
1124 )
1125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1126 dynamicWorkerUsage: false,
1127 dynamicWorkerReady: true
1128 })
1129 // We need to clean up the resources after our test
1130 await pool.destroy()
1131 })
1132
1133 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1134 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1135 let pool = new FixedThreadPool(
1136 max,
1137 './tests/worker-files/thread/testWorker.js',
1138 { workerChoiceStrategy }
1139 )
1140 expect(
1141 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1142 ).toStrictEqual({
1143 runTime: {
1144 aggregate: true,
1145 average: true,
1146 median: false
1147 },
1148 waitTime: {
1149 aggregate: false,
1150 average: false,
1151 median: false
1152 },
1153 elu: {
1154 aggregate: true,
1155 average: true,
1156 median: false
1157 }
1158 })
1159 await pool.destroy()
1160 pool = new DynamicThreadPool(
1161 min,
1162 max,
1163 './tests/worker-files/thread/testWorker.js',
1164 { workerChoiceStrategy }
1165 )
1166 expect(
1167 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1168 ).toStrictEqual({
1169 runTime: {
1170 aggregate: true,
1171 average: true,
1172 median: false
1173 },
1174 waitTime: {
1175 aggregate: false,
1176 average: false,
1177 median: false
1178 },
1179 elu: {
1180 aggregate: true,
1181 average: true,
1182 median: false
1183 }
1184 })
1185 // We need to clean up the resources after our test
1186 await pool.destroy()
1187 })
1188
1189 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1190 const pool = new FixedThreadPool(
1191 max,
1192 './tests/worker-files/thread/testWorker.js',
1193 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1194 )
1195 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1196 const promises = new Set()
1197 const maxMultiplier = 2
1198 for (let i = 0; i < max * maxMultiplier; i++) {
1199 promises.add(pool.execute())
1200 }
1201 await Promise.all(promises)
1202 for (const workerNode of pool.workerNodes) {
1203 expect(workerNode.usage).toStrictEqual({
1204 tasks: {
1205 executed: expect.any(Number),
1206 executing: 0,
1207 queued: 0,
1208 maxQueued: 0,
1209 stolen: 0,
1210 failed: 0
1211 },
1212 runTime: expect.objectContaining({
1213 history: expect.any(CircularArray)
1214 }),
1215 waitTime: {
1216 history: new CircularArray()
1217 },
1218 elu: expect.objectContaining({
1219 idle: expect.objectContaining({
1220 history: expect.any(CircularArray)
1221 }),
1222 active: expect.objectContaining({
1223 history: expect.any(CircularArray)
1224 })
1225 })
1226 })
1227 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1228 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1229 max * maxMultiplier
1230 )
1231 if (workerNode.usage.runTime.aggregate == null) {
1232 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1233 } else {
1234 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1235 }
1236 if (workerNode.usage.runTime.average == null) {
1237 expect(workerNode.usage.runTime.average).toBeUndefined()
1238 } else {
1239 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1240 }
1241 if (workerNode.usage.elu.active.aggregate == null) {
1242 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1243 } else {
1244 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1245 }
1246 if (workerNode.usage.elu.idle.aggregate == null) {
1247 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1248 } else {
1249 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1250 }
1251 if (workerNode.usage.elu.utilization == null) {
1252 expect(workerNode.usage.elu.utilization).toBeUndefined()
1253 } else {
1254 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1255 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1256 }
1257 }
1258 expect(
1259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1260 pool.workerChoiceStrategyContext.workerChoiceStrategy
1261 ).nextWorkerNodeKey
1262 ).toEqual(expect.any(Number))
1263 expect(
1264 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1265 pool.workerChoiceStrategyContext.workerChoiceStrategy
1266 ).previousWorkerNodeKey
1267 ).toEqual(expect.any(Number))
1268 expect(
1269 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1270 pool.workerChoiceStrategyContext.workerChoiceStrategy
1271 ).workersVirtualTaskEndTimestamp.length
1272 ).toBe(pool.workerNodes.length)
1273 // We need to clean up the resources after our test
1274 await pool.destroy()
1275 })
1276
1277 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1278 const pool = new DynamicThreadPool(
1279 min,
1280 max,
1281 './tests/worker-files/thread/testWorker.js',
1282 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1283 )
1284 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1285 const promises = new Set()
1286 const maxMultiplier = 2
1287 for (let i = 0; i < max * maxMultiplier; i++) {
1288 promises.add(pool.execute())
1289 }
1290 await Promise.all(promises)
1291 for (const workerNode of pool.workerNodes) {
1292 expect(workerNode.usage).toStrictEqual({
1293 tasks: {
1294 executed: expect.any(Number),
1295 executing: 0,
1296 queued: 0,
1297 maxQueued: 0,
1298 stolen: 0,
1299 failed: 0
1300 },
1301 runTime: expect.objectContaining({
1302 history: expect.any(CircularArray)
1303 }),
1304 waitTime: {
1305 history: new CircularArray()
1306 },
1307 elu: expect.objectContaining({
1308 idle: expect.objectContaining({
1309 history: expect.any(CircularArray)
1310 }),
1311 active: expect.objectContaining({
1312 history: expect.any(CircularArray)
1313 })
1314 })
1315 })
1316 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1317 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1318 max * maxMultiplier
1319 )
1320 if (workerNode.usage.runTime.aggregate == null) {
1321 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1322 } else {
1323 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1324 }
1325 if (workerNode.usage.runTime.average == null) {
1326 expect(workerNode.usage.runTime.average).toBeUndefined()
1327 } else {
1328 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1329 }
1330 if (workerNode.usage.elu.active.aggregate == null) {
1331 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1332 } else {
1333 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1334 }
1335 if (workerNode.usage.elu.idle.aggregate == null) {
1336 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1337 } else {
1338 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1339 }
1340 if (workerNode.usage.elu.utilization == null) {
1341 expect(workerNode.usage.elu.utilization).toBeUndefined()
1342 } else {
1343 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1344 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1345 }
1346 }
1347 expect(
1348 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1349 pool.workerChoiceStrategyContext.workerChoiceStrategy
1350 ).nextWorkerNodeKey
1351 ).toEqual(expect.any(Number))
1352 expect(
1353 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1354 pool.workerChoiceStrategyContext.workerChoiceStrategy
1355 ).previousWorkerNodeKey
1356 ).toEqual(expect.any(Number))
1357 expect(
1358 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1359 pool.workerChoiceStrategyContext.workerChoiceStrategy
1360 ).workersVirtualTaskEndTimestamp.length
1361 ).toBe(pool.workerNodes.length)
1362 // We need to clean up the resources after our test
1363 await pool.destroy()
1364 })
1365
1366 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1367 const pool = new DynamicThreadPool(
1368 min,
1369 max,
1370 './tests/worker-files/thread/testWorker.js',
1371 {
1372 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1373 workerChoiceStrategyOptions: {
1374 runTime: { median: true }
1375 }
1376 }
1377 )
1378 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1379 const promises = new Set()
1380 const maxMultiplier = 2
1381 for (let i = 0; i < max * maxMultiplier; i++) {
1382 promises.add(pool.execute())
1383 }
1384 await Promise.all(promises)
1385 for (const workerNode of pool.workerNodes) {
1386 expect(workerNode.usage).toStrictEqual({
1387 tasks: {
1388 executed: expect.any(Number),
1389 executing: 0,
1390 queued: 0,
1391 maxQueued: 0,
1392 stolen: 0,
1393 failed: 0
1394 },
1395 runTime: expect.objectContaining({
1396 history: expect.any(CircularArray)
1397 }),
1398 waitTime: {
1399 history: new CircularArray()
1400 },
1401 elu: expect.objectContaining({
1402 idle: expect.objectContaining({
1403 history: expect.any(CircularArray)
1404 }),
1405 active: expect.objectContaining({
1406 history: expect.any(CircularArray)
1407 })
1408 })
1409 })
1410 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1411 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1412 max * maxMultiplier
1413 )
1414 if (workerNode.usage.runTime.aggregate == null) {
1415 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1416 } else {
1417 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1418 }
1419 if (workerNode.usage.runTime.median == null) {
1420 expect(workerNode.usage.runTime.median).toBeUndefined()
1421 } else {
1422 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1423 }
1424 if (workerNode.usage.elu.active.aggregate == null) {
1425 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1426 } else {
1427 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1428 }
1429 if (workerNode.usage.elu.idle.aggregate == null) {
1430 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1431 } else {
1432 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1433 }
1434 if (workerNode.usage.elu.utilization == null) {
1435 expect(workerNode.usage.elu.utilization).toBeUndefined()
1436 } else {
1437 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1438 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1439 }
1440 }
1441 expect(
1442 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1443 pool.workerChoiceStrategyContext.workerChoiceStrategy
1444 ).nextWorkerNodeKey
1445 ).toEqual(expect.any(Number))
1446 expect(
1447 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1448 pool.workerChoiceStrategyContext.workerChoiceStrategy
1449 ).previousWorkerNodeKey
1450 ).toEqual(expect.any(Number))
1451 expect(
1452 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1453 pool.workerChoiceStrategyContext.workerChoiceStrategy
1454 ).workersVirtualTaskEndTimestamp.length
1455 ).toBe(pool.workerNodes.length)
1456 // We need to clean up the resources after our test
1457 await pool.destroy()
1458 })
1459
1460 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1461 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1462 let pool = new FixedThreadPool(
1463 max,
1464 './tests/worker-files/thread/testWorker.js'
1465 )
1466 expect(
1467 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1468 workerChoiceStrategy
1469 ).workersVirtualTaskEndTimestamp
1470 ).toBeInstanceOf(Array)
1471 expect(
1472 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1473 workerChoiceStrategy
1474 ).workersVirtualTaskEndTimestamp.length
1475 ).toBe(0)
1476 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1477 workerChoiceStrategy
1478 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1479 expect(
1480 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1481 workerChoiceStrategy
1482 ).workersVirtualTaskEndTimestamp.length
1483 ).toBe(1)
1484 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1485 expect(
1486 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1487 pool.workerChoiceStrategyContext.workerChoiceStrategy
1488 ).workersVirtualTaskEndTimestamp
1489 ).toBeInstanceOf(Array)
1490 expect(
1491 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1492 pool.workerChoiceStrategyContext.workerChoiceStrategy
1493 ).workersVirtualTaskEndTimestamp.length
1494 ).toBe(0)
1495 await pool.destroy()
1496 pool = new DynamicThreadPool(
1497 min,
1498 max,
1499 './tests/worker-files/thread/testWorker.js'
1500 )
1501 expect(
1502 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1503 workerChoiceStrategy
1504 ).workersVirtualTaskEndTimestamp
1505 ).toBeInstanceOf(Array)
1506 expect(
1507 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1508 workerChoiceStrategy
1509 ).workersVirtualTaskEndTimestamp.length
1510 ).toBe(0)
1511 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1512 workerChoiceStrategy
1513 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1514 expect(
1515 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1516 workerChoiceStrategy
1517 ).workersVirtualTaskEndTimestamp.length
1518 ).toBe(1)
1519 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1520 expect(
1521 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1522 pool.workerChoiceStrategyContext.workerChoiceStrategy
1523 ).workersVirtualTaskEndTimestamp
1524 ).toBeInstanceOf(Array)
1525 expect(
1526 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1527 pool.workerChoiceStrategyContext.workerChoiceStrategy
1528 ).workersVirtualTaskEndTimestamp.length
1529 ).toBe(0)
1530 // We need to clean up the resources after our test
1531 await pool.destroy()
1532 })
1533
1534 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1535 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1536 let pool = new FixedThreadPool(
1537 max,
1538 './tests/worker-files/thread/testWorker.js',
1539 { workerChoiceStrategy }
1540 )
1541 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1542 dynamicWorkerUsage: false,
1543 dynamicWorkerReady: true
1544 })
1545 await pool.destroy()
1546 pool = new DynamicThreadPool(
1547 min,
1548 max,
1549 './tests/worker-files/thread/testWorker.js',
1550 { workerChoiceStrategy }
1551 )
1552 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1553 dynamicWorkerUsage: false,
1554 dynamicWorkerReady: true
1555 })
1556 // We need to clean up the resources after our test
1557 await pool.destroy()
1558 })
1559
1560 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1561 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1562 let pool = new FixedThreadPool(
1563 max,
1564 './tests/worker-files/thread/testWorker.js',
1565 { workerChoiceStrategy }
1566 )
1567 expect(
1568 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1569 ).toStrictEqual({
1570 runTime: {
1571 aggregate: true,
1572 average: true,
1573 median: false
1574 },
1575 waitTime: {
1576 aggregate: false,
1577 average: false,
1578 median: false
1579 },
1580 elu: {
1581 aggregate: false,
1582 average: false,
1583 median: false
1584 }
1585 })
1586 await pool.destroy()
1587 pool = new DynamicThreadPool(
1588 min,
1589 max,
1590 './tests/worker-files/thread/testWorker.js',
1591 { workerChoiceStrategy }
1592 )
1593 expect(
1594 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1595 ).toStrictEqual({
1596 runTime: {
1597 aggregate: true,
1598 average: true,
1599 median: false
1600 },
1601 waitTime: {
1602 aggregate: false,
1603 average: false,
1604 median: false
1605 },
1606 elu: {
1607 aggregate: false,
1608 average: false,
1609 median: false
1610 }
1611 })
1612 // We need to clean up the resources after our test
1613 await pool.destroy()
1614 })
1615
1616 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1617 const pool = new FixedThreadPool(
1618 max,
1619 './tests/worker-files/thread/testWorker.js',
1620 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1621 )
1622 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1623 const promises = new Set()
1624 const maxMultiplier = 2
1625 for (let i = 0; i < max * maxMultiplier; i++) {
1626 promises.add(pool.execute())
1627 }
1628 await Promise.all(promises)
1629 for (const workerNode of pool.workerNodes) {
1630 expect(workerNode.usage).toStrictEqual({
1631 tasks: {
1632 executed: expect.any(Number),
1633 executing: 0,
1634 queued: 0,
1635 maxQueued: 0,
1636 stolen: 0,
1637 failed: 0
1638 },
1639 runTime: expect.objectContaining({
1640 history: expect.any(CircularArray)
1641 }),
1642 waitTime: {
1643 history: new CircularArray()
1644 },
1645 elu: {
1646 idle: {
1647 history: new CircularArray()
1648 },
1649 active: {
1650 history: new CircularArray()
1651 }
1652 }
1653 })
1654 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1655 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1656 max * maxMultiplier
1657 )
1658 if (workerNode.usage.runTime.aggregate == null) {
1659 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1660 } else {
1661 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1662 }
1663 if (workerNode.usage.runTime.average == null) {
1664 expect(workerNode.usage.runTime.average).toBeUndefined()
1665 } else {
1666 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1667 }
1668 }
1669 expect(
1670 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1671 pool.workerChoiceStrategyContext.workerChoiceStrategy
1672 ).nextWorkerNodeKey
1673 ).toBe(0)
1674 expect(
1675 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1676 pool.workerChoiceStrategyContext.workerChoiceStrategy
1677 ).previousWorkerNodeKey
1678 ).toBe(0)
1679 expect(
1680 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1681 pool.workerChoiceStrategyContext.workerChoiceStrategy
1682 ).defaultWorkerWeight
1683 ).toBeGreaterThan(0)
1684 expect(
1685 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1686 pool.workerChoiceStrategyContext.workerChoiceStrategy
1687 ).workerVirtualTaskRunTime
1688 ).toBeGreaterThanOrEqual(0)
1689 // We need to clean up the resources after our test
1690 await pool.destroy()
1691 })
1692
1693 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1694 const pool = new DynamicThreadPool(
1695 min,
1696 max,
1697 './tests/worker-files/thread/testWorker.js',
1698 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1699 )
1700 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1701 const promises = new Set()
1702 const maxMultiplier = 2
1703 for (let i = 0; i < max * maxMultiplier; i++) {
1704 promises.add(pool.execute())
1705 }
1706 await Promise.all(promises)
1707 for (const workerNode of pool.workerNodes) {
1708 expect(workerNode.usage).toStrictEqual({
1709 tasks: {
1710 executed: expect.any(Number),
1711 executing: 0,
1712 queued: 0,
1713 maxQueued: 0,
1714 stolen: 0,
1715 failed: 0
1716 },
1717 runTime: expect.objectContaining({
1718 history: expect.any(CircularArray)
1719 }),
1720 waitTime: {
1721 history: new CircularArray()
1722 },
1723 elu: {
1724 idle: {
1725 history: new CircularArray()
1726 },
1727 active: {
1728 history: new CircularArray()
1729 }
1730 }
1731 })
1732 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1733 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1734 max * maxMultiplier
1735 )
1736 if (workerNode.usage.runTime.aggregate == null) {
1737 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1738 } else {
1739 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1740 }
1741 if (workerNode.usage.runTime.average == null) {
1742 expect(workerNode.usage.runTime.average).toBeUndefined()
1743 } else {
1744 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1745 }
1746 }
1747 expect(
1748 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1749 pool.workerChoiceStrategyContext.workerChoiceStrategy
1750 ).nextWorkerNodeKey
1751 ).toBe(0)
1752 expect(
1753 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1754 pool.workerChoiceStrategyContext.workerChoiceStrategy
1755 ).previousWorkerNodeKey
1756 ).toBe(0)
1757 expect(
1758 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1759 pool.workerChoiceStrategyContext.workerChoiceStrategy
1760 ).defaultWorkerWeight
1761 ).toBeGreaterThan(0)
1762 expect(
1763 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1764 pool.workerChoiceStrategyContext.workerChoiceStrategy
1765 ).workerVirtualTaskRunTime
1766 ).toBeGreaterThanOrEqual(0)
1767 // We need to clean up the resources after our test
1768 await pool.destroy()
1769 })
1770
1771 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1772 const pool = new DynamicThreadPool(
1773 min,
1774 max,
1775 './tests/worker-files/thread/testWorker.js',
1776 {
1777 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1778 workerChoiceStrategyOptions: {
1779 runTime: { median: true }
1780 }
1781 }
1782 )
1783 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1784 const promises = new Set()
1785 const maxMultiplier = 2
1786 for (let i = 0; i < max * maxMultiplier; i++) {
1787 promises.add(pool.execute())
1788 }
1789 await Promise.all(promises)
1790 for (const workerNode of pool.workerNodes) {
1791 expect(workerNode.usage).toStrictEqual({
1792 tasks: {
1793 executed: expect.any(Number),
1794 executing: 0,
1795 queued: 0,
1796 maxQueued: 0,
1797 stolen: 0,
1798 failed: 0
1799 },
1800 runTime: expect.objectContaining({
1801 history: expect.any(CircularArray)
1802 }),
1803 waitTime: {
1804 history: new CircularArray()
1805 },
1806 elu: {
1807 idle: {
1808 history: new CircularArray()
1809 },
1810 active: {
1811 history: new CircularArray()
1812 }
1813 }
1814 })
1815 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1816 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1817 max * maxMultiplier
1818 )
1819 if (workerNode.usage.runTime.aggregate == null) {
1820 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1821 } else {
1822 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1823 }
1824 if (workerNode.usage.runTime.median == null) {
1825 expect(workerNode.usage.runTime.median).toBeUndefined()
1826 } else {
1827 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1828 }
1829 }
1830 expect(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1832 pool.workerChoiceStrategyContext.workerChoiceStrategy
1833 ).nextWorkerNodeKey
1834 ).toBe(0)
1835 expect(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 pool.workerChoiceStrategyContext.workerChoiceStrategy
1838 ).previousWorkerNodeKey
1839 ).toBe(0)
1840 expect(
1841 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1842 pool.workerChoiceStrategyContext.workerChoiceStrategy
1843 ).defaultWorkerWeight
1844 ).toBeGreaterThan(0)
1845 expect(
1846 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1847 pool.workerChoiceStrategyContext.workerChoiceStrategy
1848 ).workerVirtualTaskRunTime
1849 ).toBeGreaterThanOrEqual(0)
1850 // We need to clean up the resources after our test
1851 await pool.destroy()
1852 })
1853
1854 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1855 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1856 let pool = new FixedThreadPool(
1857 max,
1858 './tests/worker-files/thread/testWorker.js'
1859 )
1860 expect(
1861 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1862 workerChoiceStrategy
1863 ).nextWorkerNodeKey
1864 ).toBeDefined()
1865 expect(
1866 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1867 workerChoiceStrategy
1868 ).previousWorkerNodeKey
1869 ).toBeDefined()
1870 expect(
1871 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1872 workerChoiceStrategy
1873 ).defaultWorkerWeight
1874 ).toBeDefined()
1875 expect(
1876 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1877 workerChoiceStrategy
1878 ).workerVirtualTaskRunTime
1879 ).toBeDefined()
1880 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1881 expect(
1882 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategy
1884 ).nextWorkerNodeKey
1885 ).toBe(0)
1886 expect(
1887 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1888 pool.workerChoiceStrategyContext.workerChoiceStrategy
1889 ).previousWorkerNodeKey
1890 ).toBe(0)
1891 expect(
1892 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1893 pool.workerChoiceStrategyContext.workerChoiceStrategy
1894 ).defaultWorkerWeight
1895 ).toBeGreaterThan(0)
1896 expect(
1897 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1898 pool.workerChoiceStrategyContext.workerChoiceStrategy
1899 ).workerVirtualTaskRunTime
1900 ).toBe(0)
1901 await pool.destroy()
1902 pool = new DynamicThreadPool(
1903 min,
1904 max,
1905 './tests/worker-files/thread/testWorker.js'
1906 )
1907 expect(
1908 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1909 workerChoiceStrategy
1910 ).nextWorkerNodeKey
1911 ).toBeDefined()
1912 expect(
1913 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1914 workerChoiceStrategy
1915 ).previousWorkerNodeKey
1916 ).toBeDefined()
1917 expect(
1918 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1919 workerChoiceStrategy
1920 ).defaultWorkerWeight
1921 ).toBeDefined()
1922 expect(
1923 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1924 workerChoiceStrategy
1925 ).workerVirtualTaskRunTime
1926 ).toBeDefined()
1927 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1928 expect(
1929 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1930 pool.workerChoiceStrategyContext.workerChoiceStrategy
1931 ).nextWorkerNodeKey
1932 ).toBe(0)
1933 expect(
1934 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1935 pool.workerChoiceStrategyContext.workerChoiceStrategy
1936 ).previousWorkerNodeKey
1937 ).toBe(0)
1938 expect(
1939 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1940 pool.workerChoiceStrategyContext.workerChoiceStrategy
1941 ).defaultWorkerWeight
1942 ).toBeGreaterThan(0)
1943 expect(
1944 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1945 pool.workerChoiceStrategyContext.workerChoiceStrategy
1946 ).workerVirtualTaskRunTime
1947 ).toBe(0)
1948 // We need to clean up the resources after our test
1949 await pool.destroy()
1950 })
1951
1952 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1953 const workerChoiceStrategy =
1954 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1955 let pool = new FixedThreadPool(
1956 max,
1957 './tests/worker-files/thread/testWorker.js',
1958 { workerChoiceStrategy }
1959 )
1960 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1961 dynamicWorkerUsage: false,
1962 dynamicWorkerReady: true
1963 })
1964 await pool.destroy()
1965 pool = new DynamicThreadPool(
1966 min,
1967 max,
1968 './tests/worker-files/thread/testWorker.js',
1969 { workerChoiceStrategy }
1970 )
1971 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1972 dynamicWorkerUsage: false,
1973 dynamicWorkerReady: true
1974 })
1975 // We need to clean up the resources after our test
1976 await pool.destroy()
1977 })
1978
1979 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1980 const workerChoiceStrategy =
1981 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1982 let pool = new FixedThreadPool(
1983 max,
1984 './tests/worker-files/thread/testWorker.js',
1985 { workerChoiceStrategy }
1986 )
1987 expect(
1988 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1989 ).toStrictEqual({
1990 runTime: {
1991 aggregate: true,
1992 average: true,
1993 median: false
1994 },
1995 waitTime: {
1996 aggregate: false,
1997 average: false,
1998 median: false
1999 },
2000 elu: {
2001 aggregate: false,
2002 average: false,
2003 median: false
2004 }
2005 })
2006 await pool.destroy()
2007 pool = new DynamicThreadPool(
2008 min,
2009 max,
2010 './tests/worker-files/thread/testWorker.js',
2011 { workerChoiceStrategy }
2012 )
2013 expect(
2014 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
2015 ).toStrictEqual({
2016 runTime: {
2017 aggregate: true,
2018 average: true,
2019 median: false
2020 },
2021 waitTime: {
2022 aggregate: false,
2023 average: false,
2024 median: false
2025 },
2026 elu: {
2027 aggregate: false,
2028 average: false,
2029 median: false
2030 }
2031 })
2032 // We need to clean up the resources after our test
2033 await pool.destroy()
2034 })
2035
2036 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
2037 const pool = new FixedThreadPool(
2038 max,
2039 './tests/worker-files/thread/testWorker.js',
2040 {
2041 workerChoiceStrategy:
2042 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2043 }
2044 )
2045 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2046 const promises = new Set()
2047 const maxMultiplier = 2
2048 for (let i = 0; i < max * maxMultiplier; i++) {
2049 promises.add(pool.execute())
2050 }
2051 await Promise.all(promises)
2052 for (const workerNode of pool.workerNodes) {
2053 expect(workerNode.usage).toStrictEqual({
2054 tasks: {
2055 executed: expect.any(Number),
2056 executing: 0,
2057 queued: 0,
2058 maxQueued: 0,
2059 stolen: 0,
2060 failed: 0
2061 },
2062 runTime: expect.objectContaining({
2063 history: expect.any(CircularArray)
2064 }),
2065 waitTime: {
2066 history: new CircularArray()
2067 },
2068 elu: {
2069 idle: {
2070 history: new CircularArray()
2071 },
2072 active: {
2073 history: new CircularArray()
2074 }
2075 }
2076 })
2077 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2078 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2079 max * maxMultiplier
2080 )
2081 }
2082 expect(
2083 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2084 pool.workerChoiceStrategyContext.workerChoiceStrategy
2085 ).defaultWorkerWeight
2086 ).toBeGreaterThan(0)
2087 expect(
2088 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2089 pool.workerChoiceStrategyContext.workerChoiceStrategy
2090 ).roundId
2091 ).toBe(0)
2092 expect(
2093 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2094 pool.workerChoiceStrategyContext.workerChoiceStrategy
2095 ).workerNodeId
2096 ).toBe(0)
2097 expect(
2098 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2099 pool.workerChoiceStrategyContext.workerChoiceStrategy
2100 ).nextWorkerNodeKey
2101 ).toBe(0)
2102 expect(
2103 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2104 pool.workerChoiceStrategyContext.workerChoiceStrategy
2105 ).previousWorkerNodeKey
2106 ).toEqual(expect.any(Number))
2107 expect(
2108 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2109 pool.workerChoiceStrategyContext.workerChoiceStrategy
2110 ).roundWeights
2111 ).toStrictEqual([
2112 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2113 pool.workerChoiceStrategyContext.workerChoiceStrategy
2114 ).defaultWorkerWeight
2115 ])
2116 // We need to clean up the resources after our test
2117 await pool.destroy()
2118 })
2119
2120 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2121 const pool = new DynamicThreadPool(
2122 min,
2123 max,
2124 './tests/worker-files/thread/testWorker.js',
2125 {
2126 workerChoiceStrategy:
2127 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2128 }
2129 )
2130 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2131 const promises = new Set()
2132 const maxMultiplier = 2
2133 for (let i = 0; i < max * maxMultiplier; i++) {
2134 promises.add(pool.execute())
2135 }
2136 await Promise.all(promises)
2137 for (const workerNode of pool.workerNodes) {
2138 expect(workerNode.usage).toStrictEqual({
2139 tasks: {
2140 executed: expect.any(Number),
2141 executing: 0,
2142 queued: 0,
2143 maxQueued: 0,
2144 stolen: 0,
2145 failed: 0
2146 },
2147 runTime: expect.objectContaining({
2148 history: expect.any(CircularArray)
2149 }),
2150 waitTime: {
2151 history: new CircularArray()
2152 },
2153 elu: {
2154 idle: {
2155 history: new CircularArray()
2156 },
2157 active: {
2158 history: new CircularArray()
2159 }
2160 }
2161 })
2162 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2163 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2164 max * maxMultiplier
2165 )
2166 }
2167 expect(
2168 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2169 pool.workerChoiceStrategyContext.workerChoiceStrategy
2170 ).defaultWorkerWeight
2171 ).toBeGreaterThan(0)
2172 expect(
2173 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2174 pool.workerChoiceStrategyContext.workerChoiceStrategy
2175 ).roundId
2176 ).toBe(0)
2177 expect(
2178 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2179 pool.workerChoiceStrategyContext.workerChoiceStrategy
2180 ).workerNodeId
2181 ).toBe(0)
2182 expect(
2183 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2184 pool.workerChoiceStrategyContext.workerChoiceStrategy
2185 ).nextWorkerNodeKey
2186 ).toBe(0)
2187 expect(
2188 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2189 pool.workerChoiceStrategyContext.workerChoiceStrategy
2190 ).previousWorkerNodeKey
2191 ).toEqual(expect.any(Number))
2192 expect(
2193 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2194 pool.workerChoiceStrategyContext.workerChoiceStrategy
2195 ).roundWeights
2196 ).toStrictEqual([
2197 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2198 pool.workerChoiceStrategyContext.workerChoiceStrategy
2199 ).defaultWorkerWeight
2200 ])
2201 // We need to clean up the resources after our test
2202 await pool.destroy()
2203 })
2204
2205 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2206 const workerChoiceStrategy =
2207 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2208 let pool = new FixedThreadPool(
2209 max,
2210 './tests/worker-files/thread/testWorker.js'
2211 )
2212 expect(
2213 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2214 workerChoiceStrategy
2215 ).roundId
2216 ).toBeDefined()
2217 expect(
2218 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2219 workerChoiceStrategy
2220 ).workerNodeId
2221 ).toBeDefined()
2222 expect(
2223 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2224 workerChoiceStrategy
2225 ).nextWorkerNodeKey
2226 ).toBeDefined()
2227 expect(
2228 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2229 workerChoiceStrategy
2230 ).previousWorkerNodeKey
2231 ).toBeDefined()
2232 expect(
2233 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2234 workerChoiceStrategy
2235 ).defaultWorkerWeight
2236 ).toBeDefined()
2237 expect(
2238 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2239 workerChoiceStrategy
2240 ).roundWeights
2241 ).toBeDefined()
2242 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2243 expect(
2244 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2245 pool.workerChoiceStrategyContext.workerChoiceStrategy
2246 ).roundId
2247 ).toBe(0)
2248 expect(
2249 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2250 pool.workerChoiceStrategyContext.workerChoiceStrategy
2251 ).workerNodeId
2252 ).toBe(0)
2253 expect(
2254 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2255 pool.workerChoiceStrategyContext.workerChoiceStrategy
2256 ).nextWorkerNodeKey
2257 ).toBe(0)
2258 expect(
2259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2260 pool.workerChoiceStrategyContext.workerChoiceStrategy
2261 ).previousWorkerNodeKey
2262 ).toBe(0)
2263 expect(
2264 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2265 pool.workerChoiceStrategyContext.workerChoiceStrategy
2266 ).defaultWorkerWeight
2267 ).toBeGreaterThan(0)
2268 expect(
2269 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2270 pool.workerChoiceStrategyContext.workerChoiceStrategy
2271 ).roundWeights
2272 ).toStrictEqual([
2273 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2274 pool.workerChoiceStrategyContext.workerChoiceStrategy
2275 ).defaultWorkerWeight
2276 ])
2277 await pool.destroy()
2278 pool = new DynamicThreadPool(
2279 min,
2280 max,
2281 './tests/worker-files/thread/testWorker.js'
2282 )
2283 expect(
2284 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2285 workerChoiceStrategy
2286 ).roundId
2287 ).toBeDefined()
2288 expect(
2289 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2290 workerChoiceStrategy
2291 ).workerNodeId
2292 ).toBeDefined()
2293 expect(
2294 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2295 workerChoiceStrategy
2296 ).nextWorkerNodeKey
2297 ).toBeDefined()
2298 expect(
2299 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2300 workerChoiceStrategy
2301 ).previousWorkerNodeKey
2302 ).toBeDefined()
2303 expect(
2304 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2305 workerChoiceStrategy
2306 ).defaultWorkerWeight
2307 ).toBeDefined()
2308 expect(
2309 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2310 workerChoiceStrategy
2311 ).roundWeights
2312 ).toBeDefined()
2313 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2314 expect(
2315 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2316 pool.workerChoiceStrategyContext.workerChoiceStrategy
2317 ).roundId
2318 ).toBe(0)
2319 expect(
2320 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2321 pool.workerChoiceStrategyContext.workerChoiceStrategy
2322 ).workerNodeId
2323 ).toBe(0)
2324 expect(
2325 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2326 pool.workerChoiceStrategyContext.workerChoiceStrategy
2327 ).nextWorkerNodeKey
2328 ).toBe(0)
2329 expect(
2330 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2331 pool.workerChoiceStrategyContext.workerChoiceStrategy
2332 ).previousWorkerNodeKey
2333 ).toBe(0)
2334 expect(
2335 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2336 pool.workerChoiceStrategyContext.workerChoiceStrategy
2337 ).defaultWorkerWeight
2338 ).toBeGreaterThan(0)
2339 expect(
2340 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2341 pool.workerChoiceStrategyContext.workerChoiceStrategy
2342 ).roundWeights
2343 ).toStrictEqual([
2344 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2345 pool.workerChoiceStrategyContext.workerChoiceStrategy
2346 ).defaultWorkerWeight
2347 ])
2348 // We need to clean up the resources after our test
2349 await pool.destroy()
2350 })
2351
2352 it('Verify unknown strategy throw error', () => {
2353 expect(
2354 () =>
2355 new DynamicThreadPool(
2356 min,
2357 max,
2358 './tests/worker-files/thread/testWorker.js',
2359 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2360 )
2361 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2362 })
2363 })