feat: continuous task stealing
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 WorkerChoiceStrategies
7 } = require('../../../lib')
8 const { CircularArray } = require('../../../lib/circular-array')
9
10 describe('Selection strategies test suite', () => {
11 const min = 0
12 const max = 3
13
14 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
15 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
16 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
17 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
18 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
19 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
20 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
21 'WEIGHTED_ROUND_ROBIN'
22 )
23 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
24 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
25 )
26 })
27
28 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
29 const pool = new DynamicThreadPool(
30 min,
31 max,
32 './tests/worker-files/thread/testWorker.js'
33 )
34 expect(pool.opts.workerChoiceStrategy).toBe(
35 WorkerChoiceStrategies.ROUND_ROBIN
36 )
37 // We need to clean up the resources after our test
38 await pool.destroy()
39 })
40
41 it('Verify available strategies are taken at pool creation', async () => {
42 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
43 const pool = new FixedThreadPool(
44 max,
45 './tests/worker-files/thread/testWorker.js',
46 { workerChoiceStrategy }
47 )
48 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
49 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
50 workerChoiceStrategy
51 )
52 await pool.destroy()
53 }
54 })
55
56 it('Verify available strategies can be set after pool creation', async () => {
57 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
58 const pool = new DynamicThreadPool(
59 min,
60 max,
61 './tests/worker-files/thread/testWorker.js'
62 )
63 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
64 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
65 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
66 workerChoiceStrategy
67 )
68 await pool.destroy()
69 }
70 })
71
72 it('Verify available strategies default internals at pool creation', async () => {
73 const pool = new FixedThreadPool(
74 max,
75 './tests/worker-files/thread/testWorker.js'
76 )
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
79 expect(
80 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
81 workerChoiceStrategy
82 ).nextWorkerNodeKey
83 ).toBe(0)
84 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
85 expect(
86 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
87 workerChoiceStrategy
88 ).workersVirtualTaskEndTimestamp
89 ).toBeInstanceOf(Array)
90 expect(
91 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
92 workerChoiceStrategy
93 ).workersVirtualTaskEndTimestamp.length
94 ).toBe(0)
95 } else if (
96 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
97 ) {
98 expect(
99 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
100 workerChoiceStrategy
101 ).nextWorkerNodeKey
102 ).toBe(0)
103 expect(
104 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 workerChoiceStrategy
106 ).defaultWorkerWeight
107 ).toBeGreaterThan(0)
108 expect(
109 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
110 workerChoiceStrategy
111 ).workerVirtualTaskRunTime
112 ).toBe(0)
113 }
114 }
115 await pool.destroy()
116 })
117
118 it('Verify ROUND_ROBIN strategy default policy', async () => {
119 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
120 let pool = new FixedThreadPool(
121 max,
122 './tests/worker-files/thread/testWorker.js',
123 { workerChoiceStrategy }
124 )
125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
126 dynamicWorkerUsage: false,
127 dynamicWorkerReady: true
128 })
129 await pool.destroy()
130 pool = new DynamicThreadPool(
131 min,
132 max,
133 './tests/worker-files/thread/testWorker.js',
134 { workerChoiceStrategy }
135 )
136 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
137 dynamicWorkerUsage: false,
138 dynamicWorkerReady: true
139 })
140 // We need to clean up the resources after our test
141 await pool.destroy()
142 })
143
144 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
145 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
146 let pool = new FixedThreadPool(
147 max,
148 './tests/worker-files/thread/testWorker.js',
149 { workerChoiceStrategy }
150 )
151 expect(
152 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
153 ).toStrictEqual({
154 runTime: {
155 aggregate: false,
156 average: false,
157 median: false
158 },
159 waitTime: {
160 aggregate: false,
161 average: false,
162 median: false
163 },
164 elu: {
165 aggregate: false,
166 average: false,
167 median: false
168 }
169 })
170 await pool.destroy()
171 pool = new DynamicThreadPool(
172 min,
173 max,
174 './tests/worker-files/thread/testWorker.js',
175 { workerChoiceStrategy }
176 )
177 expect(
178 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
179 ).toStrictEqual({
180 runTime: {
181 aggregate: false,
182 average: false,
183 median: false
184 },
185 waitTime: {
186 aggregate: false,
187 average: false,
188 median: false
189 },
190 elu: {
191 aggregate: false,
192 average: false,
193 median: false
194 }
195 })
196 // We need to clean up the resources after our test
197 await pool.destroy()
198 })
199
200 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
201 const pool = new FixedThreadPool(
202 max,
203 './tests/worker-files/thread/testWorker.js',
204 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
205 )
206 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
207 const promises = new Set()
208 const maxMultiplier = 2
209 for (let i = 0; i < max * maxMultiplier; i++) {
210 promises.add(pool.execute())
211 }
212 await Promise.all(promises)
213 for (const workerNode of pool.workerNodes) {
214 expect(workerNode.usage).toStrictEqual({
215 tasks: {
216 executed: maxMultiplier,
217 executing: 0,
218 queued: 0,
219 maxQueued: 0,
220 stolen: 0,
221 failed: 0
222 },
223 runTime: {
224 history: expect.any(CircularArray)
225 },
226 waitTime: {
227 history: expect.any(CircularArray)
228 },
229 elu: {
230 idle: {
231 history: expect.any(CircularArray)
232 },
233 active: {
234 history: expect.any(CircularArray)
235 }
236 }
237 })
238 }
239 expect(
240 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
241 WorkerChoiceStrategies.ROUND_ROBIN
242 ).nextWorkerNodeKey
243 ).toBe(0)
244 // We need to clean up the resources after our test
245 await pool.destroy()
246 })
247
248 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
249 const pool = new DynamicThreadPool(
250 min,
251 max,
252 './tests/worker-files/thread/testWorker.js',
253 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
254 )
255 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
256 const promises = new Set()
257 const maxMultiplier = 2
258 for (let i = 0; i < max * maxMultiplier; i++) {
259 promises.add(pool.execute())
260 }
261 await Promise.all(promises)
262 for (const workerNode of pool.workerNodes) {
263 expect(workerNode.usage).toStrictEqual({
264 tasks: {
265 executed: expect.any(Number),
266 executing: 0,
267 queued: 0,
268 maxQueued: 0,
269 stolen: 0,
270 failed: 0
271 },
272 runTime: {
273 history: expect.any(CircularArray)
274 },
275 waitTime: {
276 history: expect.any(CircularArray)
277 },
278 elu: {
279 idle: {
280 history: expect.any(CircularArray)
281 },
282 active: {
283 history: expect.any(CircularArray)
284 }
285 }
286 })
287 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
288 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
289 max * maxMultiplier
290 )
291 }
292 expect(
293 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
294 WorkerChoiceStrategies.ROUND_ROBIN
295 ).nextWorkerNodeKey
296 ).toBe(0)
297 // We need to clean up the resources after our test
298 await pool.destroy()
299 })
300
301 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
302 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
303 let pool = new FixedClusterPool(
304 max,
305 './tests/worker-files/cluster/testWorker.js',
306 { workerChoiceStrategy }
307 )
308 let results = new Set()
309 for (let i = 0; i < max; i++) {
310 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
311 }
312 expect(results.size).toBe(max)
313 await pool.destroy()
314 pool = new FixedThreadPool(
315 max,
316 './tests/worker-files/thread/testWorker.js',
317 { workerChoiceStrategy }
318 )
319 results = new Set()
320 for (let i = 0; i < max; i++) {
321 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
322 }
323 expect(results.size).toBe(max)
324 await pool.destroy()
325 })
326
327 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
328 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
329 let pool = new FixedThreadPool(
330 max,
331 './tests/worker-files/thread/testWorker.js',
332 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
333 )
334 expect(
335 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
336 workerChoiceStrategy
337 ).nextWorkerNodeKey
338 ).toBeDefined()
339 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
340 expect(
341 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
342 pool.workerChoiceStrategyContext.workerChoiceStrategy
343 ).nextWorkerNodeKey
344 ).toBe(0)
345 await pool.destroy()
346 pool = new DynamicThreadPool(
347 min,
348 max,
349 './tests/worker-files/thread/testWorker.js',
350 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
351 )
352 expect(
353 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
354 workerChoiceStrategy
355 ).nextWorkerNodeKey
356 ).toBeDefined()
357 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
358 expect(
359 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
360 pool.workerChoiceStrategyContext.workerChoiceStrategy
361 ).nextWorkerNodeKey
362 ).toBe(0)
363 // We need to clean up the resources after our test
364 await pool.destroy()
365 })
366
367 it('Verify LEAST_USED strategy default policy', async () => {
368 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
369 let pool = new FixedThreadPool(
370 max,
371 './tests/worker-files/thread/testWorker.js',
372 { workerChoiceStrategy }
373 )
374 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
375 dynamicWorkerUsage: false,
376 dynamicWorkerReady: true
377 })
378 await pool.destroy()
379 pool = new DynamicThreadPool(
380 min,
381 max,
382 './tests/worker-files/thread/testWorker.js',
383 { workerChoiceStrategy }
384 )
385 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
386 dynamicWorkerUsage: false,
387 dynamicWorkerReady: true
388 })
389 // We need to clean up the resources after our test
390 await pool.destroy()
391 })
392
393 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
394 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
395 let pool = new FixedThreadPool(
396 max,
397 './tests/worker-files/thread/testWorker.js',
398 { workerChoiceStrategy }
399 )
400 expect(
401 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
402 ).toStrictEqual({
403 runTime: {
404 aggregate: false,
405 average: false,
406 median: false
407 },
408 waitTime: {
409 aggregate: false,
410 average: false,
411 median: false
412 },
413 elu: {
414 aggregate: false,
415 average: false,
416 median: false
417 }
418 })
419 await pool.destroy()
420 pool = new DynamicThreadPool(
421 min,
422 max,
423 './tests/worker-files/thread/testWorker.js',
424 { workerChoiceStrategy }
425 )
426 expect(
427 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 ).toStrictEqual({
429 runTime: {
430 aggregate: false,
431 average: false,
432 median: false
433 },
434 waitTime: {
435 aggregate: false,
436 average: false,
437 median: false
438 },
439 elu: {
440 aggregate: false,
441 average: false,
442 median: false
443 }
444 })
445 // We need to clean up the resources after our test
446 await pool.destroy()
447 })
448
449 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
450 const pool = new FixedThreadPool(
451 max,
452 './tests/worker-files/thread/testWorker.js',
453 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
454 )
455 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
456 const promises = new Set()
457 const maxMultiplier = 2
458 for (let i = 0; i < max * maxMultiplier; i++) {
459 promises.add(pool.execute())
460 }
461 await Promise.all(promises)
462 for (const workerNode of pool.workerNodes) {
463 expect(workerNode.usage).toStrictEqual({
464 tasks: {
465 executed: expect.any(Number),
466 executing: 0,
467 queued: 0,
468 maxQueued: 0,
469 stolen: 0,
470 failed: 0
471 },
472 runTime: {
473 history: expect.any(CircularArray)
474 },
475 waitTime: {
476 history: expect.any(CircularArray)
477 },
478 elu: {
479 idle: {
480 history: expect.any(CircularArray)
481 },
482 active: {
483 history: expect.any(CircularArray)
484 }
485 }
486 })
487 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
488 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
489 max * maxMultiplier
490 )
491 }
492 // We need to clean up the resources after our test
493 await pool.destroy()
494 })
495
496 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
497 const pool = new DynamicThreadPool(
498 min,
499 max,
500 './tests/worker-files/thread/testWorker.js',
501 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
502 )
503 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
504 const promises = new Set()
505 const maxMultiplier = 2
506 for (let i = 0; i < max * maxMultiplier; i++) {
507 promises.add(pool.execute())
508 }
509 await Promise.all(promises)
510 for (const workerNode of pool.workerNodes) {
511 expect(workerNode.usage).toStrictEqual({
512 tasks: {
513 executed: expect.any(Number),
514 executing: 0,
515 queued: 0,
516 maxQueued: 0,
517 stolen: 0,
518 failed: 0
519 },
520 runTime: {
521 history: expect.any(CircularArray)
522 },
523 waitTime: {
524 history: expect.any(CircularArray)
525 },
526 elu: {
527 idle: {
528 history: expect.any(CircularArray)
529 },
530 active: {
531 history: expect.any(CircularArray)
532 }
533 }
534 })
535 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
536 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
537 max * maxMultiplier
538 )
539 }
540 // We need to clean up the resources after our test
541 await pool.destroy()
542 })
543
544 it('Verify LEAST_BUSY strategy default policy', async () => {
545 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
546 let pool = new FixedThreadPool(
547 max,
548 './tests/worker-files/thread/testWorker.js',
549 { workerChoiceStrategy }
550 )
551 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
552 dynamicWorkerUsage: false,
553 dynamicWorkerReady: true
554 })
555 await pool.destroy()
556 pool = new DynamicThreadPool(
557 min,
558 max,
559 './tests/worker-files/thread/testWorker.js',
560 { workerChoiceStrategy }
561 )
562 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
563 dynamicWorkerUsage: false,
564 dynamicWorkerReady: true
565 })
566 // We need to clean up the resources after our test
567 await pool.destroy()
568 })
569
570 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
571 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
572 let pool = new FixedThreadPool(
573 max,
574 './tests/worker-files/thread/testWorker.js',
575 { workerChoiceStrategy }
576 )
577 expect(
578 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
579 ).toStrictEqual({
580 runTime: {
581 aggregate: true,
582 average: false,
583 median: false
584 },
585 waitTime: {
586 aggregate: true,
587 average: false,
588 median: false
589 },
590 elu: {
591 aggregate: false,
592 average: false,
593 median: false
594 }
595 })
596 await pool.destroy()
597 pool = new DynamicThreadPool(
598 min,
599 max,
600 './tests/worker-files/thread/testWorker.js',
601 { workerChoiceStrategy }
602 )
603 expect(
604 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
605 ).toStrictEqual({
606 runTime: {
607 aggregate: true,
608 average: false,
609 median: false
610 },
611 waitTime: {
612 aggregate: true,
613 average: false,
614 median: false
615 },
616 elu: {
617 aggregate: false,
618 average: false,
619 median: false
620 }
621 })
622 // We need to clean up the resources after our test
623 await pool.destroy()
624 })
625
626 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
627 const pool = new FixedThreadPool(
628 max,
629 './tests/worker-files/thread/testWorker.js',
630 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
631 )
632 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
633 const promises = new Set()
634 const maxMultiplier = 2
635 for (let i = 0; i < max * maxMultiplier; i++) {
636 promises.add(pool.execute())
637 }
638 await Promise.all(promises)
639 for (const workerNode of pool.workerNodes) {
640 expect(workerNode.usage).toMatchObject({
641 tasks: {
642 executed: expect.any(Number),
643 executing: 0,
644 queued: 0,
645 maxQueued: 0,
646 stolen: 0,
647 failed: 0
648 },
649 runTime: {
650 history: expect.any(CircularArray)
651 },
652 waitTime: {
653 history: expect.any(CircularArray)
654 },
655 elu: {
656 idle: {
657 history: expect.any(CircularArray)
658 },
659 active: {
660 history: expect.any(CircularArray)
661 }
662 }
663 })
664 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
665 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
666 max * maxMultiplier
667 )
668 if (workerNode.usage.runTime.aggregate == null) {
669 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
670 } else {
671 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
672 }
673 if (workerNode.usage.waitTime.aggregate == null) {
674 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
675 } else {
676 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
677 }
678 }
679 // We need to clean up the resources after our test
680 await pool.destroy()
681 })
682
683 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
684 const pool = new DynamicThreadPool(
685 min,
686 max,
687 './tests/worker-files/thread/testWorker.js',
688 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
689 )
690 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
691 const promises = new Set()
692 const maxMultiplier = 2
693 for (let i = 0; i < max * maxMultiplier; i++) {
694 promises.add(pool.execute())
695 }
696 await Promise.all(promises)
697 for (const workerNode of pool.workerNodes) {
698 expect(workerNode.usage).toMatchObject({
699 tasks: {
700 executed: expect.any(Number),
701 executing: 0,
702 queued: 0,
703 maxQueued: 0,
704 stolen: 0,
705 failed: 0
706 },
707 runTime: {
708 history: expect.any(CircularArray)
709 },
710 waitTime: {
711 history: expect.any(CircularArray)
712 },
713 elu: {
714 idle: {
715 history: expect.any(CircularArray)
716 },
717 active: {
718 history: expect.any(CircularArray)
719 }
720 }
721 })
722 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
723 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
724 max * maxMultiplier
725 )
726 if (workerNode.usage.runTime.aggregate == null) {
727 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
728 } else {
729 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
730 }
731 if (workerNode.usage.waitTime.aggregate == null) {
732 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
733 } else {
734 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
735 }
736 }
737 // We need to clean up the resources after our test
738 await pool.destroy()
739 })
740
741 it('Verify LEAST_ELU strategy default policy', async () => {
742 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
743 let pool = new FixedThreadPool(
744 max,
745 './tests/worker-files/thread/testWorker.js',
746 { workerChoiceStrategy }
747 )
748 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
749 dynamicWorkerUsage: false,
750 dynamicWorkerReady: true
751 })
752 await pool.destroy()
753 pool = new DynamicThreadPool(
754 min,
755 max,
756 './tests/worker-files/thread/testWorker.js',
757 { workerChoiceStrategy }
758 )
759 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
760 dynamicWorkerUsage: false,
761 dynamicWorkerReady: true
762 })
763 // We need to clean up the resources after our test
764 await pool.destroy()
765 })
766
767 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
768 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
769 let pool = new FixedThreadPool(
770 max,
771 './tests/worker-files/thread/testWorker.js',
772 { workerChoiceStrategy }
773 )
774 expect(
775 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
776 ).toStrictEqual({
777 runTime: {
778 aggregate: false,
779 average: false,
780 median: false
781 },
782 waitTime: {
783 aggregate: false,
784 average: false,
785 median: false
786 },
787 elu: {
788 aggregate: true,
789 average: false,
790 median: false
791 }
792 })
793 await pool.destroy()
794 pool = new DynamicThreadPool(
795 min,
796 max,
797 './tests/worker-files/thread/testWorker.js',
798 { workerChoiceStrategy }
799 )
800 expect(
801 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
802 ).toStrictEqual({
803 runTime: {
804 aggregate: false,
805 average: false,
806 median: false
807 },
808 waitTime: {
809 aggregate: false,
810 average: false,
811 median: false
812 },
813 elu: {
814 aggregate: true,
815 average: false,
816 median: false
817 }
818 })
819 // We need to clean up the resources after our test
820 await pool.destroy()
821 })
822
823 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
824 const pool = new FixedThreadPool(
825 max,
826 './tests/worker-files/thread/testWorker.js',
827 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
828 )
829 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
830 const promises = new Set()
831 const maxMultiplier = 2
832 for (let i = 0; i < max * maxMultiplier; i++) {
833 promises.add(pool.execute())
834 }
835 await Promise.all(promises)
836 for (const workerNode of pool.workerNodes) {
837 expect(workerNode.usage).toMatchObject({
838 tasks: {
839 executed: expect.any(Number),
840 executing: 0,
841 queued: 0,
842 maxQueued: 0,
843 stolen: 0,
844 failed: 0
845 },
846 runTime: {
847 history: expect.any(CircularArray)
848 },
849 waitTime: {
850 history: expect.any(CircularArray)
851 },
852 elu: {
853 idle: {
854 history: expect.any(CircularArray)
855 },
856 active: {
857 history: expect.any(CircularArray)
858 }
859 }
860 })
861 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
862 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
863 max * maxMultiplier
864 )
865 if (workerNode.usage.elu.utilization == null) {
866 expect(workerNode.usage.elu.utilization).toBeUndefined()
867 } else {
868 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
869 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
870 }
871 }
872 // We need to clean up the resources after our test
873 await pool.destroy()
874 })
875
876 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
877 const pool = new DynamicThreadPool(
878 min,
879 max,
880 './tests/worker-files/thread/testWorker.js',
881 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
882 )
883 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
884 const promises = new Set()
885 const maxMultiplier = 2
886 for (let i = 0; i < max * maxMultiplier; i++) {
887 promises.add(pool.execute())
888 }
889 await Promise.all(promises)
890 for (const workerNode of pool.workerNodes) {
891 expect(workerNode.usage).toMatchObject({
892 tasks: {
893 executed: expect.any(Number),
894 executing: 0,
895 queued: 0,
896 maxQueued: 0,
897 stolen: 0,
898 failed: 0
899 },
900 runTime: {
901 history: expect.any(CircularArray)
902 },
903 waitTime: {
904 history: expect.any(CircularArray)
905 },
906 elu: {
907 idle: {
908 history: expect.any(CircularArray)
909 },
910 active: {
911 history: expect.any(CircularArray)
912 }
913 }
914 })
915 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
916 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
917 max * maxMultiplier
918 )
919 if (workerNode.usage.elu.utilization == null) {
920 expect(workerNode.usage.elu.utilization).toBeUndefined()
921 } else {
922 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
923 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
924 }
925 }
926 // We need to clean up the resources after our test
927 await pool.destroy()
928 })
929
930 it('Verify FAIR_SHARE strategy default policy', async () => {
931 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
932 let pool = new FixedThreadPool(
933 max,
934 './tests/worker-files/thread/testWorker.js',
935 { workerChoiceStrategy }
936 )
937 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
938 dynamicWorkerUsage: false,
939 dynamicWorkerReady: true
940 })
941 await pool.destroy()
942 pool = new DynamicThreadPool(
943 min,
944 max,
945 './tests/worker-files/thread/testWorker.js',
946 { workerChoiceStrategy }
947 )
948 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
949 dynamicWorkerUsage: false,
950 dynamicWorkerReady: true
951 })
952 // We need to clean up the resources after our test
953 await pool.destroy()
954 })
955
956 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
957 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
958 let pool = new FixedThreadPool(
959 max,
960 './tests/worker-files/thread/testWorker.js',
961 { workerChoiceStrategy }
962 )
963 expect(
964 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
965 ).toStrictEqual({
966 runTime: {
967 aggregate: true,
968 average: true,
969 median: false
970 },
971 waitTime: {
972 aggregate: false,
973 average: false,
974 median: false
975 },
976 elu: {
977 aggregate: true,
978 average: true,
979 median: false
980 }
981 })
982 await pool.destroy()
983 pool = new DynamicThreadPool(
984 min,
985 max,
986 './tests/worker-files/thread/testWorker.js',
987 { workerChoiceStrategy }
988 )
989 expect(
990 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
991 ).toStrictEqual({
992 runTime: {
993 aggregate: true,
994 average: true,
995 median: false
996 },
997 waitTime: {
998 aggregate: false,
999 average: false,
1000 median: false
1001 },
1002 elu: {
1003 aggregate: true,
1004 average: true,
1005 median: false
1006 }
1007 })
1008 // We need to clean up the resources after our test
1009 await pool.destroy()
1010 })
1011
1012 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1013 const pool = new FixedThreadPool(
1014 max,
1015 './tests/worker-files/thread/testWorker.js',
1016 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1017 )
1018 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1019 const promises = new Set()
1020 const maxMultiplier = 2
1021 for (let i = 0; i < max * maxMultiplier; i++) {
1022 promises.add(pool.execute())
1023 }
1024 await Promise.all(promises)
1025 for (const workerNode of pool.workerNodes) {
1026 expect(workerNode.usage).toMatchObject({
1027 tasks: {
1028 executed: expect.any(Number),
1029 executing: 0,
1030 queued: 0,
1031 maxQueued: 0,
1032 stolen: 0,
1033 failed: 0
1034 },
1035 runTime: {
1036 history: expect.any(CircularArray)
1037 },
1038 waitTime: {
1039 history: expect.any(CircularArray)
1040 },
1041 elu: {
1042 idle: {
1043 history: expect.any(CircularArray)
1044 },
1045 active: {
1046 history: expect.any(CircularArray)
1047 }
1048 }
1049 })
1050 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1051 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1052 max * maxMultiplier
1053 )
1054 if (workerNode.usage.runTime.aggregate == null) {
1055 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1056 } else {
1057 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1058 }
1059 if (workerNode.usage.runTime.average == null) {
1060 expect(workerNode.usage.runTime.average).toBeUndefined()
1061 } else {
1062 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1063 }
1064 if (workerNode.usage.elu.utilization == null) {
1065 expect(workerNode.usage.elu.utilization).toBeUndefined()
1066 } else {
1067 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1068 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1069 }
1070 }
1071 expect(
1072 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1073 pool.workerChoiceStrategyContext.workerChoiceStrategy
1074 ).workersVirtualTaskEndTimestamp.length
1075 ).toBe(pool.workerNodes.length)
1076 // We need to clean up the resources after our test
1077 await pool.destroy()
1078 })
1079
1080 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1081 const pool = new DynamicThreadPool(
1082 min,
1083 max,
1084 './tests/worker-files/thread/testWorker.js',
1085 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1086 )
1087 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1088 const promises = new Set()
1089 const maxMultiplier = 2
1090 for (let i = 0; i < max * maxMultiplier; i++) {
1091 promises.add(pool.execute())
1092 }
1093 await Promise.all(promises)
1094 for (const workerNode of pool.workerNodes) {
1095 expect(workerNode.usage).toMatchObject({
1096 tasks: {
1097 executed: expect.any(Number),
1098 executing: 0,
1099 queued: 0,
1100 maxQueued: 0,
1101 stolen: 0,
1102 failed: 0
1103 },
1104 runTime: {
1105 history: expect.any(CircularArray)
1106 },
1107 waitTime: {
1108 history: expect.any(CircularArray)
1109 },
1110 elu: {
1111 idle: {
1112 history: expect.any(CircularArray)
1113 },
1114 active: {
1115 history: expect.any(CircularArray)
1116 }
1117 }
1118 })
1119 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1120 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1121 max * maxMultiplier
1122 )
1123 if (workerNode.usage.runTime.aggregate == null) {
1124 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1125 } else {
1126 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1127 }
1128 if (workerNode.usage.runTime.average == null) {
1129 expect(workerNode.usage.runTime.average).toBeUndefined()
1130 } else {
1131 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1132 }
1133 if (workerNode.usage.elu.utilization == null) {
1134 expect(workerNode.usage.elu.utilization).toBeUndefined()
1135 } else {
1136 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1137 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1138 }
1139 }
1140 expect(
1141 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1142 pool.workerChoiceStrategyContext.workerChoiceStrategy
1143 ).workersVirtualTaskEndTimestamp.length
1144 ).toBe(pool.workerNodes.length)
1145 // We need to clean up the resources after our test
1146 await pool.destroy()
1147 })
1148
1149 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1150 const pool = new DynamicThreadPool(
1151 min,
1152 max,
1153 './tests/worker-files/thread/testWorker.js',
1154 {
1155 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1156 workerChoiceStrategyOptions: {
1157 runTime: { median: true }
1158 }
1159 }
1160 )
1161 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1162 const promises = new Set()
1163 const maxMultiplier = 2
1164 for (let i = 0; i < max * maxMultiplier; i++) {
1165 promises.add(pool.execute())
1166 }
1167 await Promise.all(promises)
1168 for (const workerNode of pool.workerNodes) {
1169 expect(workerNode.usage).toMatchObject({
1170 tasks: {
1171 executed: expect.any(Number),
1172 executing: 0,
1173 queued: 0,
1174 maxQueued: 0,
1175 stolen: 0,
1176 failed: 0
1177 },
1178 runTime: {
1179 history: expect.any(CircularArray)
1180 },
1181 waitTime: {
1182 history: expect.any(CircularArray)
1183 },
1184 elu: {
1185 idle: {
1186 history: expect.any(CircularArray)
1187 },
1188 active: {
1189 history: expect.any(CircularArray)
1190 }
1191 }
1192 })
1193 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1194 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1195 max * maxMultiplier
1196 )
1197 if (workerNode.usage.runTime.aggregate == null) {
1198 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1199 } else {
1200 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1201 }
1202 if (workerNode.usage.runTime.median == null) {
1203 expect(workerNode.usage.runTime.median).toBeUndefined()
1204 } else {
1205 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1206 }
1207 if (workerNode.usage.elu.utilization == null) {
1208 expect(workerNode.usage.elu.utilization).toBeUndefined()
1209 } else {
1210 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1211 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1212 }
1213 }
1214 expect(
1215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1216 pool.workerChoiceStrategyContext.workerChoiceStrategy
1217 ).workersVirtualTaskEndTimestamp.length
1218 ).toBe(pool.workerNodes.length)
1219 // We need to clean up the resources after our test
1220 await pool.destroy()
1221 })
1222
1223 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1224 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1225 let pool = new FixedThreadPool(
1226 max,
1227 './tests/worker-files/thread/testWorker.js'
1228 )
1229 expect(
1230 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1231 workerChoiceStrategy
1232 ).workersVirtualTaskEndTimestamp
1233 ).toBeInstanceOf(Array)
1234 expect(
1235 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1236 workerChoiceStrategy
1237 ).workersVirtualTaskEndTimestamp.length
1238 ).toBe(0)
1239 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1240 workerChoiceStrategy
1241 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1242 expect(
1243 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1244 workerChoiceStrategy
1245 ).workersVirtualTaskEndTimestamp.length
1246 ).toBe(1)
1247 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1248 expect(
1249 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1250 workerChoiceStrategy
1251 ).workersVirtualTaskEndTimestamp
1252 ).toBeInstanceOf(Array)
1253 expect(
1254 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1255 workerChoiceStrategy
1256 ).workersVirtualTaskEndTimestamp.length
1257 ).toBe(0)
1258 await pool.destroy()
1259 pool = new DynamicThreadPool(
1260 min,
1261 max,
1262 './tests/worker-files/thread/testWorker.js'
1263 )
1264 expect(
1265 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1266 workerChoiceStrategy
1267 ).workersVirtualTaskEndTimestamp
1268 ).toBeInstanceOf(Array)
1269 expect(
1270 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1271 workerChoiceStrategy
1272 ).workersVirtualTaskEndTimestamp.length
1273 ).toBe(0)
1274 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1275 workerChoiceStrategy
1276 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1277 expect(
1278 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1279 workerChoiceStrategy
1280 ).workersVirtualTaskEndTimestamp.length
1281 ).toBe(1)
1282 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1283 expect(
1284 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1285 workerChoiceStrategy
1286 ).workersVirtualTaskEndTimestamp
1287 ).toBeInstanceOf(Array)
1288 expect(
1289 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1290 workerChoiceStrategy
1291 ).workersVirtualTaskEndTimestamp.length
1292 ).toBe(0)
1293 // We need to clean up the resources after our test
1294 await pool.destroy()
1295 })
1296
1297 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1298 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1299 let pool = new FixedThreadPool(
1300 max,
1301 './tests/worker-files/thread/testWorker.js',
1302 { workerChoiceStrategy }
1303 )
1304 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1305 dynamicWorkerUsage: false,
1306 dynamicWorkerReady: true
1307 })
1308 await pool.destroy()
1309 pool = new DynamicThreadPool(
1310 min,
1311 max,
1312 './tests/worker-files/thread/testWorker.js',
1313 { workerChoiceStrategy }
1314 )
1315 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1316 dynamicWorkerUsage: false,
1317 dynamicWorkerReady: true
1318 })
1319 // We need to clean up the resources after our test
1320 await pool.destroy()
1321 })
1322
1323 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1324 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1325 let pool = new FixedThreadPool(
1326 max,
1327 './tests/worker-files/thread/testWorker.js',
1328 { workerChoiceStrategy }
1329 )
1330 expect(
1331 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1332 ).toStrictEqual({
1333 runTime: {
1334 aggregate: true,
1335 average: true,
1336 median: false
1337 },
1338 waitTime: {
1339 aggregate: false,
1340 average: false,
1341 median: false
1342 },
1343 elu: {
1344 aggregate: false,
1345 average: false,
1346 median: false
1347 }
1348 })
1349 await pool.destroy()
1350 pool = new DynamicThreadPool(
1351 min,
1352 max,
1353 './tests/worker-files/thread/testWorker.js',
1354 { workerChoiceStrategy }
1355 )
1356 expect(
1357 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1358 ).toStrictEqual({
1359 runTime: {
1360 aggregate: true,
1361 average: true,
1362 median: false
1363 },
1364 waitTime: {
1365 aggregate: false,
1366 average: false,
1367 median: false
1368 },
1369 elu: {
1370 aggregate: false,
1371 average: false,
1372 median: false
1373 }
1374 })
1375 // We need to clean up the resources after our test
1376 await pool.destroy()
1377 })
1378
1379 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1380 const pool = new FixedThreadPool(
1381 max,
1382 './tests/worker-files/thread/testWorker.js',
1383 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1384 )
1385 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1386 const promises = new Set()
1387 const maxMultiplier = 2
1388 for (let i = 0; i < max * maxMultiplier; i++) {
1389 promises.add(pool.execute())
1390 }
1391 await Promise.all(promises)
1392 for (const workerNode of pool.workerNodes) {
1393 expect(workerNode.usage).toStrictEqual({
1394 tasks: {
1395 executed: expect.any(Number),
1396 executing: 0,
1397 queued: 0,
1398 maxQueued: 0,
1399 stolen: 0,
1400 failed: 0
1401 },
1402 runTime: expect.objectContaining({
1403 history: expect.any(CircularArray)
1404 }),
1405 waitTime: {
1406 history: expect.any(CircularArray)
1407 },
1408 elu: {
1409 idle: {
1410 history: expect.any(CircularArray)
1411 },
1412 active: {
1413 history: expect.any(CircularArray)
1414 }
1415 }
1416 })
1417 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1418 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1419 max * maxMultiplier
1420 )
1421 if (workerNode.usage.runTime.aggregate == null) {
1422 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1423 } else {
1424 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1425 }
1426 if (workerNode.usage.runTime.average == null) {
1427 expect(workerNode.usage.runTime.average).toBeUndefined()
1428 } else {
1429 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1430 }
1431 }
1432 expect(
1433 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1434 pool.workerChoiceStrategyContext.workerChoiceStrategy
1435 ).defaultWorkerWeight
1436 ).toBeGreaterThan(0)
1437 expect(
1438 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1439 pool.workerChoiceStrategyContext.workerChoiceStrategy
1440 ).workerVirtualTaskRunTime
1441 ).toBeGreaterThanOrEqual(0)
1442 // We need to clean up the resources after our test
1443 await pool.destroy()
1444 })
1445
1446 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1447 const pool = new DynamicThreadPool(
1448 min,
1449 max,
1450 './tests/worker-files/thread/testWorker.js',
1451 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1452 )
1453 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1454 const promises = new Set()
1455 const maxMultiplier = 2
1456 for (let i = 0; i < max * maxMultiplier; i++) {
1457 promises.add(pool.execute())
1458 }
1459 await Promise.all(promises)
1460 for (const workerNode of pool.workerNodes) {
1461 expect(workerNode.usage).toStrictEqual({
1462 tasks: {
1463 executed: expect.any(Number),
1464 executing: 0,
1465 queued: 0,
1466 maxQueued: 0,
1467 stolen: 0,
1468 failed: 0
1469 },
1470 runTime: expect.objectContaining({
1471 history: expect.any(CircularArray)
1472 }),
1473 waitTime: {
1474 history: expect.any(CircularArray)
1475 },
1476 elu: {
1477 idle: {
1478 history: expect.any(CircularArray)
1479 },
1480 active: {
1481 history: expect.any(CircularArray)
1482 }
1483 }
1484 })
1485 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1486 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1487 max * maxMultiplier
1488 )
1489 if (workerNode.usage.runTime.aggregate == null) {
1490 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1491 } else {
1492 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1493 }
1494 if (workerNode.usage.runTime.average == null) {
1495 expect(workerNode.usage.runTime.average).toBeUndefined()
1496 } else {
1497 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1498 }
1499 }
1500 expect(
1501 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1502 pool.workerChoiceStrategyContext.workerChoiceStrategy
1503 ).defaultWorkerWeight
1504 ).toBeGreaterThan(0)
1505 expect(
1506 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1507 pool.workerChoiceStrategyContext.workerChoiceStrategy
1508 ).workerVirtualTaskRunTime
1509 ).toBeGreaterThanOrEqual(0)
1510 // We need to clean up the resources after our test
1511 await pool.destroy()
1512 })
1513
1514 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1515 const pool = new DynamicThreadPool(
1516 min,
1517 max,
1518 './tests/worker-files/thread/testWorker.js',
1519 {
1520 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1521 workerChoiceStrategyOptions: {
1522 runTime: { median: true }
1523 }
1524 }
1525 )
1526 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1527 const promises = new Set()
1528 const maxMultiplier = 2
1529 for (let i = 0; i < max * maxMultiplier; i++) {
1530 promises.add(pool.execute())
1531 }
1532 await Promise.all(promises)
1533 for (const workerNode of pool.workerNodes) {
1534 expect(workerNode.usage).toStrictEqual({
1535 tasks: {
1536 executed: expect.any(Number),
1537 executing: 0,
1538 queued: 0,
1539 maxQueued: 0,
1540 stolen: 0,
1541 failed: 0
1542 },
1543 runTime: expect.objectContaining({
1544 history: expect.any(CircularArray)
1545 }),
1546 waitTime: {
1547 history: expect.any(CircularArray)
1548 },
1549 elu: {
1550 idle: {
1551 history: expect.any(CircularArray)
1552 },
1553 active: {
1554 history: expect.any(CircularArray)
1555 }
1556 }
1557 })
1558 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1559 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1560 max * maxMultiplier
1561 )
1562 if (workerNode.usage.runTime.aggregate == null) {
1563 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1564 } else {
1565 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1566 }
1567 if (workerNode.usage.runTime.median == null) {
1568 expect(workerNode.usage.runTime.median).toBeUndefined()
1569 } else {
1570 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1571 }
1572 }
1573 expect(
1574 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1575 pool.workerChoiceStrategyContext.workerChoiceStrategy
1576 ).defaultWorkerWeight
1577 ).toBeGreaterThan(0)
1578 expect(
1579 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1580 pool.workerChoiceStrategyContext.workerChoiceStrategy
1581 ).workerVirtualTaskRunTime
1582 ).toBeGreaterThanOrEqual(0)
1583 // We need to clean up the resources after our test
1584 await pool.destroy()
1585 })
1586
1587 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1588 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1589 let pool = new FixedThreadPool(
1590 max,
1591 './tests/worker-files/thread/testWorker.js'
1592 )
1593 expect(
1594 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1595 workerChoiceStrategy
1596 ).nextWorkerNodeKey
1597 ).toBeDefined()
1598 expect(
1599 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1600 workerChoiceStrategy
1601 ).defaultWorkerWeight
1602 ).toBeDefined()
1603 expect(
1604 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1605 workerChoiceStrategy
1606 ).workerVirtualTaskRunTime
1607 ).toBeDefined()
1608 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1609 expect(
1610 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1611 pool.workerChoiceStrategyContext.workerChoiceStrategy
1612 ).nextWorkerNodeKey
1613 ).toBe(0)
1614 expect(
1615 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1616 pool.workerChoiceStrategyContext.workerChoiceStrategy
1617 ).defaultWorkerWeight
1618 ).toBeGreaterThan(0)
1619 expect(
1620 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1621 workerChoiceStrategy
1622 ).workerVirtualTaskRunTime
1623 ).toBe(0)
1624 await pool.destroy()
1625 pool = new DynamicThreadPool(
1626 min,
1627 max,
1628 './tests/worker-files/thread/testWorker.js'
1629 )
1630 expect(
1631 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1632 workerChoiceStrategy
1633 ).nextWorkerNodeKey
1634 ).toBeDefined()
1635 expect(
1636 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1637 workerChoiceStrategy
1638 ).defaultWorkerWeight
1639 ).toBeDefined()
1640 expect(
1641 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1642 workerChoiceStrategy
1643 ).workerVirtualTaskRunTime
1644 ).toBeDefined()
1645 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1646 expect(
1647 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1648 pool.workerChoiceStrategyContext.workerChoiceStrategy
1649 ).nextWorkerNodeKey
1650 ).toBe(0)
1651 expect(
1652 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1653 pool.workerChoiceStrategyContext.workerChoiceStrategy
1654 ).defaultWorkerWeight
1655 ).toBeGreaterThan(0)
1656 expect(
1657 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1658 workerChoiceStrategy
1659 ).workerVirtualTaskRunTime
1660 ).toBe(0)
1661 // We need to clean up the resources after our test
1662 await pool.destroy()
1663 })
1664
1665 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1666 const workerChoiceStrategy =
1667 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1668 let pool = new FixedThreadPool(
1669 max,
1670 './tests/worker-files/thread/testWorker.js',
1671 { workerChoiceStrategy }
1672 )
1673 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1674 dynamicWorkerUsage: false,
1675 dynamicWorkerReady: true
1676 })
1677 await pool.destroy()
1678 pool = new DynamicThreadPool(
1679 min,
1680 max,
1681 './tests/worker-files/thread/testWorker.js',
1682 { workerChoiceStrategy }
1683 )
1684 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1685 dynamicWorkerUsage: false,
1686 dynamicWorkerReady: true
1687 })
1688 // We need to clean up the resources after our test
1689 await pool.destroy()
1690 })
1691
1692 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1693 const workerChoiceStrategy =
1694 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1695 let pool = new FixedThreadPool(
1696 max,
1697 './tests/worker-files/thread/testWorker.js',
1698 { workerChoiceStrategy }
1699 )
1700 expect(
1701 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1702 ).toStrictEqual({
1703 runTime: {
1704 aggregate: false,
1705 average: false,
1706 median: false
1707 },
1708 waitTime: {
1709 aggregate: false,
1710 average: false,
1711 median: false
1712 },
1713 elu: {
1714 aggregate: false,
1715 average: false,
1716 median: false
1717 }
1718 })
1719 await pool.destroy()
1720 pool = new DynamicThreadPool(
1721 min,
1722 max,
1723 './tests/worker-files/thread/testWorker.js',
1724 { workerChoiceStrategy }
1725 )
1726 expect(
1727 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1728 ).toStrictEqual({
1729 runTime: {
1730 aggregate: false,
1731 average: false,
1732 median: false
1733 },
1734 waitTime: {
1735 aggregate: false,
1736 average: false,
1737 median: false
1738 },
1739 elu: {
1740 aggregate: false,
1741 average: false,
1742 median: false
1743 }
1744 })
1745 // We need to clean up the resources after our test
1746 await pool.destroy()
1747 })
1748
1749 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1750 const pool = new FixedThreadPool(
1751 max,
1752 './tests/worker-files/thread/testWorker.js',
1753 {
1754 workerChoiceStrategy:
1755 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1756 }
1757 )
1758 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1759 const promises = new Set()
1760 const maxMultiplier = 2
1761 for (let i = 0; i < max * maxMultiplier; i++) {
1762 promises.add(pool.execute())
1763 }
1764 await Promise.all(promises)
1765 for (const workerNode of pool.workerNodes) {
1766 expect(workerNode.usage).toStrictEqual({
1767 tasks: {
1768 executed: maxMultiplier,
1769 executing: 0,
1770 queued: 0,
1771 maxQueued: 0,
1772 stolen: 0,
1773 failed: 0
1774 },
1775 runTime: {
1776 history: expect.any(CircularArray)
1777 },
1778 waitTime: {
1779 history: expect.any(CircularArray)
1780 },
1781 elu: {
1782 idle: {
1783 history: expect.any(CircularArray)
1784 },
1785 active: {
1786 history: expect.any(CircularArray)
1787 }
1788 }
1789 })
1790 }
1791 expect(
1792 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1793 pool.workerChoiceStrategyContext.workerChoiceStrategy
1794 ).defaultWorkerWeight
1795 ).toBeGreaterThan(0)
1796 expect(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1798 pool.workerChoiceStrategyContext.workerChoiceStrategy
1799 ).roundId
1800 ).toBe(0)
1801 expect(
1802 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1803 pool.workerChoiceStrategyContext.workerChoiceStrategy
1804 ).nextWorkerNodeKey
1805 ).toBe(0)
1806 expect(
1807 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1808 pool.workerChoiceStrategyContext.workerChoiceStrategy
1809 ).roundWeights
1810 ).toStrictEqual([
1811 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1812 pool.workerChoiceStrategyContext.workerChoiceStrategy
1813 ).defaultWorkerWeight
1814 ])
1815 // We need to clean up the resources after our test
1816 await pool.destroy()
1817 })
1818
1819 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1820 const pool = new DynamicThreadPool(
1821 min,
1822 max,
1823 './tests/worker-files/thread/testWorker.js',
1824 {
1825 workerChoiceStrategy:
1826 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1827 }
1828 )
1829 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1830 const promises = new Set()
1831 const maxMultiplier = 2
1832 for (let i = 0; i < max * maxMultiplier; i++) {
1833 promises.add(pool.execute())
1834 }
1835 await Promise.all(promises)
1836 for (const workerNode of pool.workerNodes) {
1837 expect(workerNode.usage).toStrictEqual({
1838 tasks: {
1839 executed: expect.any(Number),
1840 executing: 0,
1841 queued: 0,
1842 maxQueued: 0,
1843 stolen: 0,
1844 failed: 0
1845 },
1846 runTime: {
1847 history: expect.any(CircularArray)
1848 },
1849 waitTime: {
1850 history: expect.any(CircularArray)
1851 },
1852 elu: {
1853 idle: {
1854 history: expect.any(CircularArray)
1855 },
1856 active: {
1857 history: expect.any(CircularArray)
1858 }
1859 }
1860 })
1861 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1862 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1863 max * maxMultiplier
1864 )
1865 }
1866 expect(
1867 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1868 pool.workerChoiceStrategyContext.workerChoiceStrategy
1869 ).defaultWorkerWeight
1870 ).toBeGreaterThan(0)
1871 expect(
1872 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1873 pool.workerChoiceStrategyContext.workerChoiceStrategy
1874 ).roundId
1875 ).toBe(0)
1876 expect(
1877 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1878 pool.workerChoiceStrategyContext.workerChoiceStrategy
1879 ).nextWorkerNodeKey
1880 ).toBe(0)
1881 expect(
1882 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategy
1884 ).roundWeights
1885 ).toStrictEqual([
1886 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1887 pool.workerChoiceStrategyContext.workerChoiceStrategy
1888 ).defaultWorkerWeight
1889 ])
1890 // We need to clean up the resources after our test
1891 await pool.destroy()
1892 })
1893
1894 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1895 const workerChoiceStrategy =
1896 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1897 let pool = new FixedThreadPool(
1898 max,
1899 './tests/worker-files/thread/testWorker.js'
1900 )
1901 expect(
1902 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1903 workerChoiceStrategy
1904 ).roundId
1905 ).toBeDefined()
1906 expect(
1907 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1908 workerChoiceStrategy
1909 ).nextWorkerNodeKey
1910 ).toBeDefined()
1911 expect(
1912 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1913 workerChoiceStrategy
1914 ).defaultWorkerWeight
1915 ).toBeDefined()
1916 expect(
1917 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1918 workerChoiceStrategy
1919 ).roundWeights
1920 ).toBeDefined()
1921 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1922 expect(
1923 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1924 workerChoiceStrategy
1925 ).roundId
1926 ).toBe(0)
1927 expect(
1928 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1929 pool.workerChoiceStrategyContext.workerChoiceStrategy
1930 ).nextWorkerNodeKey
1931 ).toBe(0)
1932 expect(
1933 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1934 pool.workerChoiceStrategyContext.workerChoiceStrategy
1935 ).defaultWorkerWeight
1936 ).toBeGreaterThan(0)
1937 expect(
1938 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1939 workerChoiceStrategy
1940 ).roundWeights
1941 ).toStrictEqual([
1942 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1943 pool.workerChoiceStrategyContext.workerChoiceStrategy
1944 ).defaultWorkerWeight
1945 ])
1946 await pool.destroy()
1947 pool = new DynamicThreadPool(
1948 min,
1949 max,
1950 './tests/worker-files/thread/testWorker.js'
1951 )
1952 expect(
1953 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1954 workerChoiceStrategy
1955 ).roundId
1956 ).toBeDefined()
1957 expect(
1958 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1959 workerChoiceStrategy
1960 ).nextWorkerNodeKey
1961 ).toBeDefined()
1962 expect(
1963 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1964 workerChoiceStrategy
1965 ).defaultWorkerWeight
1966 ).toBeDefined()
1967 expect(
1968 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1969 workerChoiceStrategy
1970 ).roundWeights
1971 ).toBeDefined()
1972 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1973 expect(
1974 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1975 pool.workerChoiceStrategyContext.workerChoiceStrategy
1976 ).nextWorkerNodeKey
1977 ).toBe(0)
1978 expect(
1979 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1980 pool.workerChoiceStrategyContext.workerChoiceStrategy
1981 ).defaultWorkerWeight
1982 ).toBeGreaterThan(0)
1983 expect(
1984 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1985 workerChoiceStrategy
1986 ).roundWeights
1987 ).toStrictEqual([
1988 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1989 pool.workerChoiceStrategyContext.workerChoiceStrategy
1990 ).defaultWorkerWeight
1991 ])
1992 // We need to clean up the resources after our test
1993 await pool.destroy()
1994 })
1995
1996 it('Verify unknown strategy throw error', () => {
1997 expect(
1998 () =>
1999 new DynamicThreadPool(
2000 min,
2001 max,
2002 './tests/worker-files/thread/testWorker.js',
2003 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2004 )
2005 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2006 })
2007 })