feat: optimize worker choice strategies implementation
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } = require('../../../lib')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.js'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.js',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.js'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.js'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 expect(
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
118 workerChoiceStrategy
119 ).nextWorkerNodeKey
120 ).toBe(0)
121 expect(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).previousWorkerNodeKey
125 ).toBe(0)
126 if (
127 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
128 ) {
129 expect(
130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
131 workerChoiceStrategy
132 ).defaultWorkerWeight
133 ).toBeGreaterThan(0)
134 expect(
135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
136 workerChoiceStrategy
137 ).workerNodeVirtualTaskRunTime
138 ).toBe(0)
139 } else if (
140 workerChoiceStrategy ===
141 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
142 ) {
143 expect(
144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
145 workerChoiceStrategy
146 ).defaultWorkerWeight
147 ).toBeGreaterThan(0)
148 expect(
149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
150 workerChoiceStrategy
151 ).workerNodeVirtualTaskRunTime
152 ).toBe(0)
153 expect(
154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
155 workerChoiceStrategy
156 ).roundId
157 ).toBe(0)
158 expect(
159 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
160 workerChoiceStrategy
161 ).workerNodeId
162 ).toBe(0)
163 expect(
164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
165 workerChoiceStrategy
166 ).roundWeights
167 ).toStrictEqual([
168 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
169 workerChoiceStrategy
170 ).defaultWorkerWeight
171 ])
172 }
173 }
174 await pool.destroy()
175 })
176
177 it('Verify ROUND_ROBIN strategy default policy', async () => {
178 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
179 let pool = new FixedThreadPool(
180 max,
181 './tests/worker-files/thread/testWorker.js',
182 { workerChoiceStrategy }
183 )
184 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
185 dynamicWorkerUsage: false,
186 dynamicWorkerReady: true
187 })
188 await pool.destroy()
189 pool = new DynamicThreadPool(
190 min,
191 max,
192 './tests/worker-files/thread/testWorker.js',
193 { workerChoiceStrategy }
194 )
195 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
196 dynamicWorkerUsage: false,
197 dynamicWorkerReady: true
198 })
199 // We need to clean up the resources after our test
200 await pool.destroy()
201 })
202
203 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
204 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
205 let pool = new FixedThreadPool(
206 max,
207 './tests/worker-files/thread/testWorker.js',
208 { workerChoiceStrategy }
209 )
210 expect(
211 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
212 ).toStrictEqual({
213 runTime: {
214 aggregate: false,
215 average: false,
216 median: false
217 },
218 waitTime: {
219 aggregate: false,
220 average: false,
221 median: false
222 },
223 elu: {
224 aggregate: false,
225 average: false,
226 median: false
227 }
228 })
229 await pool.destroy()
230 pool = new DynamicThreadPool(
231 min,
232 max,
233 './tests/worker-files/thread/testWorker.js',
234 { workerChoiceStrategy }
235 )
236 expect(
237 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
238 ).toStrictEqual({
239 runTime: {
240 aggregate: false,
241 average: false,
242 median: false
243 },
244 waitTime: {
245 aggregate: false,
246 average: false,
247 median: false
248 },
249 elu: {
250 aggregate: false,
251 average: false,
252 median: false
253 }
254 })
255 // We need to clean up the resources after our test
256 await pool.destroy()
257 })
258
259 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
260 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
261 const pool = new FixedThreadPool(
262 max,
263 './tests/worker-files/thread/testWorker.js',
264 { workerChoiceStrategy }
265 )
266 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
267 const promises = new Set()
268 const maxMultiplier = 2
269 for (let i = 0; i < max * maxMultiplier; i++) {
270 promises.add(pool.execute())
271 }
272 await Promise.all(promises)
273 for (const workerNode of pool.workerNodes) {
274 expect(workerNode.usage).toStrictEqual({
275 tasks: {
276 executed: maxMultiplier,
277 executing: 0,
278 queued: 0,
279 maxQueued: 0,
280 stolen: 0,
281 failed: 0
282 },
283 runTime: {
284 history: new CircularArray()
285 },
286 waitTime: {
287 history: new CircularArray()
288 },
289 elu: {
290 idle: {
291 history: new CircularArray()
292 },
293 active: {
294 history: new CircularArray()
295 }
296 }
297 })
298 }
299 expect(
300 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
301 pool.workerChoiceStrategyContext.workerChoiceStrategy
302 ).nextWorkerNodeKey
303 ).toBe(0)
304 expect(
305 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
306 pool.workerChoiceStrategyContext.workerChoiceStrategy
307 ).previousWorkerNodeKey
308 ).toBe(pool.workerNodes.length - 1)
309 // We need to clean up the resources after our test
310 await pool.destroy()
311 })
312
313 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
314 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
315 const pool = new DynamicThreadPool(
316 min,
317 max,
318 './tests/worker-files/thread/testWorker.js',
319 { workerChoiceStrategy }
320 )
321 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
322 const promises = new Set()
323 const maxMultiplier = 2
324 for (let i = 0; i < max * maxMultiplier; i++) {
325 promises.add(pool.execute())
326 }
327 await Promise.all(promises)
328 for (const workerNode of pool.workerNodes) {
329 expect(workerNode.usage).toStrictEqual({
330 tasks: {
331 executed: expect.any(Number),
332 executing: 0,
333 queued: 0,
334 maxQueued: 0,
335 stolen: 0,
336 failed: 0
337 },
338 runTime: {
339 history: new CircularArray()
340 },
341 waitTime: {
342 history: new CircularArray()
343 },
344 elu: {
345 idle: {
346 history: new CircularArray()
347 },
348 active: {
349 history: new CircularArray()
350 }
351 }
352 })
353 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
354 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
355 max * maxMultiplier
356 )
357 }
358 expect(
359 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
360 pool.workerChoiceStrategyContext.workerChoiceStrategy
361 ).nextWorkerNodeKey
362 ).toBe(0)
363 expect(
364 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
365 pool.workerChoiceStrategyContext.workerChoiceStrategy
366 ).previousWorkerNodeKey
367 ).toBe(pool.workerNodes.length - 1)
368 // We need to clean up the resources after our test
369 await pool.destroy()
370 })
371
372 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
373 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
374 let pool = new FixedClusterPool(
375 max,
376 './tests/worker-files/cluster/testWorker.js',
377 { workerChoiceStrategy }
378 )
379 let results = new Set()
380 for (let i = 0; i < max; i++) {
381 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
382 }
383 expect(results.size).toBe(max)
384 await pool.destroy()
385 pool = new FixedThreadPool(
386 max,
387 './tests/worker-files/thread/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 results = new Set()
391 for (let i = 0; i < max; i++) {
392 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
393 }
394 expect(results.size).toBe(max)
395 await pool.destroy()
396 })
397
398 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
399 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
400 let pool = new FixedThreadPool(
401 max,
402 './tests/worker-files/thread/testWorker.js',
403 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
404 )
405 expect(
406 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
407 pool.workerChoiceStrategyContext.workerChoiceStrategy
408 ).nextWorkerNodeKey
409 ).toBeDefined()
410 expect(
411 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
412 pool.workerChoiceStrategyContext.workerChoiceStrategy
413 ).previousWorkerNodeKey
414 ).toBeDefined()
415 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
416 expect(
417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
418 pool.workerChoiceStrategyContext.workerChoiceStrategy
419 ).nextWorkerNodeKey
420 ).toBe(0)
421 expect(
422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
423 pool.workerChoiceStrategyContext.workerChoiceStrategy
424 ).previousWorkerNodeKey
425 ).toBe(0)
426 await pool.destroy()
427 pool = new DynamicThreadPool(
428 min,
429 max,
430 './tests/worker-files/thread/testWorker.js',
431 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
432 )
433 expect(
434 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
435 pool.workerChoiceStrategyContext.workerChoiceStrategy
436 ).nextWorkerNodeKey
437 ).toBeDefined()
438 expect(
439 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
440 pool.workerChoiceStrategyContext.workerChoiceStrategy
441 ).previousWorkerNodeKey
442 ).toBeDefined()
443 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
444 expect(
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
447 ).nextWorkerNodeKey
448 ).toBe(0)
449 expect(
450 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
451 pool.workerChoiceStrategyContext.workerChoiceStrategy
452 ).previousWorkerNodeKey
453 ).toBe(0)
454 // We need to clean up the resources after our test
455 await pool.destroy()
456 })
457
458 it('Verify LEAST_USED strategy default policy', async () => {
459 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
460 let pool = new FixedThreadPool(
461 max,
462 './tests/worker-files/thread/testWorker.js',
463 { workerChoiceStrategy }
464 )
465 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
466 dynamicWorkerUsage: false,
467 dynamicWorkerReady: true
468 })
469 await pool.destroy()
470 pool = new DynamicThreadPool(
471 min,
472 max,
473 './tests/worker-files/thread/testWorker.js',
474 { workerChoiceStrategy }
475 )
476 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
477 dynamicWorkerUsage: false,
478 dynamicWorkerReady: true
479 })
480 // We need to clean up the resources after our test
481 await pool.destroy()
482 })
483
484 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
485 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
486 let pool = new FixedThreadPool(
487 max,
488 './tests/worker-files/thread/testWorker.js',
489 { workerChoiceStrategy }
490 )
491 expect(
492 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
493 ).toStrictEqual({
494 runTime: {
495 aggregate: false,
496 average: false,
497 median: false
498 },
499 waitTime: {
500 aggregate: false,
501 average: false,
502 median: false
503 },
504 elu: {
505 aggregate: false,
506 average: false,
507 median: false
508 }
509 })
510 await pool.destroy()
511 pool = new DynamicThreadPool(
512 min,
513 max,
514 './tests/worker-files/thread/testWorker.js',
515 { workerChoiceStrategy }
516 )
517 expect(
518 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
519 ).toStrictEqual({
520 runTime: {
521 aggregate: false,
522 average: false,
523 median: false
524 },
525 waitTime: {
526 aggregate: false,
527 average: false,
528 median: false
529 },
530 elu: {
531 aggregate: false,
532 average: false,
533 median: false
534 }
535 })
536 // We need to clean up the resources after our test
537 await pool.destroy()
538 })
539
540 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
541 const pool = new FixedThreadPool(
542 max,
543 './tests/worker-files/thread/testWorker.js',
544 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
545 )
546 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
547 const promises = new Set()
548 const maxMultiplier = 2
549 for (let i = 0; i < max * maxMultiplier; i++) {
550 promises.add(pool.execute())
551 }
552 await Promise.all(promises)
553 for (const workerNode of pool.workerNodes) {
554 expect(workerNode.usage).toStrictEqual({
555 tasks: {
556 executed: expect.any(Number),
557 executing: 0,
558 queued: 0,
559 maxQueued: 0,
560 stolen: 0,
561 failed: 0
562 },
563 runTime: {
564 history: new CircularArray()
565 },
566 waitTime: {
567 history: new CircularArray()
568 },
569 elu: {
570 idle: {
571 history: new CircularArray()
572 },
573 active: {
574 history: new CircularArray()
575 }
576 }
577 })
578 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
579 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
580 max * maxMultiplier
581 )
582 }
583 expect(
584 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
585 pool.workerChoiceStrategyContext.workerChoiceStrategy
586 ).nextWorkerNodeKey
587 ).toEqual(expect.any(Number))
588 expect(
589 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
590 pool.workerChoiceStrategyContext.workerChoiceStrategy
591 ).previousWorkerNodeKey
592 ).toEqual(expect.any(Number))
593 // We need to clean up the resources after our test
594 await pool.destroy()
595 })
596
597 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
598 const pool = new DynamicThreadPool(
599 min,
600 max,
601 './tests/worker-files/thread/testWorker.js',
602 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
603 )
604 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
605 const promises = new Set()
606 const maxMultiplier = 2
607 for (let i = 0; i < max * maxMultiplier; i++) {
608 promises.add(pool.execute())
609 }
610 await Promise.all(promises)
611 for (const workerNode of pool.workerNodes) {
612 expect(workerNode.usage).toStrictEqual({
613 tasks: {
614 executed: expect.any(Number),
615 executing: 0,
616 queued: 0,
617 maxQueued: 0,
618 stolen: 0,
619 failed: 0
620 },
621 runTime: {
622 history: new CircularArray()
623 },
624 waitTime: {
625 history: new CircularArray()
626 },
627 elu: {
628 idle: {
629 history: new CircularArray()
630 },
631 active: {
632 history: new CircularArray()
633 }
634 }
635 })
636 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
637 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
638 max * maxMultiplier
639 )
640 }
641 expect(
642 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
643 pool.workerChoiceStrategyContext.workerChoiceStrategy
644 ).nextWorkerNodeKey
645 ).toEqual(expect.any(Number))
646 expect(
647 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
648 pool.workerChoiceStrategyContext.workerChoiceStrategy
649 ).previousWorkerNodeKey
650 ).toEqual(expect.any(Number))
651 // We need to clean up the resources after our test
652 await pool.destroy()
653 })
654
655 it('Verify LEAST_BUSY strategy default policy', async () => {
656 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
657 let pool = new FixedThreadPool(
658 max,
659 './tests/worker-files/thread/testWorker.js',
660 { workerChoiceStrategy }
661 )
662 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
663 dynamicWorkerUsage: false,
664 dynamicWorkerReady: true
665 })
666 await pool.destroy()
667 pool = new DynamicThreadPool(
668 min,
669 max,
670 './tests/worker-files/thread/testWorker.js',
671 { workerChoiceStrategy }
672 )
673 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
674 dynamicWorkerUsage: false,
675 dynamicWorkerReady: true
676 })
677 // We need to clean up the resources after our test
678 await pool.destroy()
679 })
680
681 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
682 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
683 let pool = new FixedThreadPool(
684 max,
685 './tests/worker-files/thread/testWorker.js',
686 { workerChoiceStrategy }
687 )
688 expect(
689 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
690 ).toStrictEqual({
691 runTime: {
692 aggregate: true,
693 average: false,
694 median: false
695 },
696 waitTime: {
697 aggregate: true,
698 average: false,
699 median: false
700 },
701 elu: {
702 aggregate: false,
703 average: false,
704 median: false
705 }
706 })
707 await pool.destroy()
708 pool = new DynamicThreadPool(
709 min,
710 max,
711 './tests/worker-files/thread/testWorker.js',
712 { workerChoiceStrategy }
713 )
714 expect(
715 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
716 ).toStrictEqual({
717 runTime: {
718 aggregate: true,
719 average: false,
720 median: false
721 },
722 waitTime: {
723 aggregate: true,
724 average: false,
725 median: false
726 },
727 elu: {
728 aggregate: false,
729 average: false,
730 median: false
731 }
732 })
733 // We need to clean up the resources after our test
734 await pool.destroy()
735 })
736
737 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
738 const pool = new FixedThreadPool(
739 max,
740 './tests/worker-files/thread/testWorker.js',
741 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
742 )
743 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
744 const promises = new Set()
745 const maxMultiplier = 2
746 for (let i = 0; i < max * maxMultiplier; i++) {
747 promises.add(pool.execute())
748 }
749 await Promise.all(promises)
750 for (const workerNode of pool.workerNodes) {
751 expect(workerNode.usage).toStrictEqual({
752 tasks: {
753 executed: expect.any(Number),
754 executing: 0,
755 queued: 0,
756 maxQueued: 0,
757 stolen: 0,
758 failed: 0
759 },
760 runTime: expect.objectContaining({
761 history: expect.any(CircularArray)
762 }),
763 waitTime: expect.objectContaining({
764 history: expect.any(CircularArray)
765 }),
766 elu: {
767 idle: {
768 history: new CircularArray()
769 },
770 active: {
771 history: new CircularArray()
772 }
773 }
774 })
775 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
776 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
777 max * maxMultiplier
778 )
779 if (workerNode.usage.runTime.aggregate == null) {
780 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
781 } else {
782 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
783 }
784 if (workerNode.usage.waitTime.aggregate == null) {
785 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
786 } else {
787 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
788 }
789 }
790 expect(
791 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
792 pool.workerChoiceStrategyContext.workerChoiceStrategy
793 ).nextWorkerNodeKey
794 ).toEqual(expect.any(Number))
795 expect(
796 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
797 pool.workerChoiceStrategyContext.workerChoiceStrategy
798 ).previousWorkerNodeKey
799 ).toEqual(expect.any(Number))
800 // We need to clean up the resources after our test
801 await pool.destroy()
802 })
803
804 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
805 const pool = new DynamicThreadPool(
806 min,
807 max,
808 './tests/worker-files/thread/testWorker.js',
809 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
810 )
811 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
812 const promises = new Set()
813 const maxMultiplier = 2
814 for (let i = 0; i < max * maxMultiplier; i++) {
815 promises.add(pool.execute())
816 }
817 await Promise.all(promises)
818 for (const workerNode of pool.workerNodes) {
819 expect(workerNode.usage).toStrictEqual({
820 tasks: {
821 executed: expect.any(Number),
822 executing: 0,
823 queued: 0,
824 maxQueued: 0,
825 stolen: 0,
826 failed: 0
827 },
828 runTime: expect.objectContaining({
829 history: expect.any(CircularArray)
830 }),
831 waitTime: expect.objectContaining({
832 history: expect.any(CircularArray)
833 }),
834 elu: {
835 idle: {
836 history: new CircularArray()
837 },
838 active: {
839 history: new CircularArray()
840 }
841 }
842 })
843 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
844 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
845 max * maxMultiplier
846 )
847 if (workerNode.usage.runTime.aggregate == null) {
848 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
849 } else {
850 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
851 }
852 if (workerNode.usage.waitTime.aggregate == null) {
853 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
854 } else {
855 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
856 }
857 }
858 expect(
859 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
860 pool.workerChoiceStrategyContext.workerChoiceStrategy
861 ).nextWorkerNodeKey
862 ).toEqual(expect.any(Number))
863 expect(
864 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
865 pool.workerChoiceStrategyContext.workerChoiceStrategy
866 ).previousWorkerNodeKey
867 ).toEqual(expect.any(Number))
868 // We need to clean up the resources after our test
869 await pool.destroy()
870 })
871
872 it('Verify LEAST_ELU strategy default policy', async () => {
873 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
874 let pool = new FixedThreadPool(
875 max,
876 './tests/worker-files/thread/testWorker.js',
877 { workerChoiceStrategy }
878 )
879 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
880 dynamicWorkerUsage: false,
881 dynamicWorkerReady: true
882 })
883 await pool.destroy()
884 pool = new DynamicThreadPool(
885 min,
886 max,
887 './tests/worker-files/thread/testWorker.js',
888 { workerChoiceStrategy }
889 )
890 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
891 dynamicWorkerUsage: false,
892 dynamicWorkerReady: true
893 })
894 // We need to clean up the resources after our test
895 await pool.destroy()
896 })
897
898 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
899 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
900 let pool = new FixedThreadPool(
901 max,
902 './tests/worker-files/thread/testWorker.js',
903 { workerChoiceStrategy }
904 )
905 expect(
906 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
907 ).toStrictEqual({
908 runTime: {
909 aggregate: false,
910 average: false,
911 median: false
912 },
913 waitTime: {
914 aggregate: false,
915 average: false,
916 median: false
917 },
918 elu: {
919 aggregate: true,
920 average: false,
921 median: false
922 }
923 })
924 await pool.destroy()
925 pool = new DynamicThreadPool(
926 min,
927 max,
928 './tests/worker-files/thread/testWorker.js',
929 { workerChoiceStrategy }
930 )
931 expect(
932 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
933 ).toStrictEqual({
934 runTime: {
935 aggregate: false,
936 average: false,
937 median: false
938 },
939 waitTime: {
940 aggregate: false,
941 average: false,
942 median: false
943 },
944 elu: {
945 aggregate: true,
946 average: false,
947 median: false
948 }
949 })
950 // We need to clean up the resources after our test
951 await pool.destroy()
952 })
953
954 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
955 const pool = new FixedThreadPool(
956 max,
957 './tests/worker-files/thread/testWorker.js',
958 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
959 )
960 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
961 const promises = new Set()
962 const maxMultiplier = 2
963 for (let i = 0; i < max * maxMultiplier; i++) {
964 promises.add(pool.execute())
965 }
966 await Promise.all(promises)
967 for (const workerNode of pool.workerNodes) {
968 expect(workerNode.usage).toStrictEqual({
969 tasks: {
970 executed: expect.any(Number),
971 executing: 0,
972 queued: 0,
973 maxQueued: 0,
974 stolen: 0,
975 failed: 0
976 },
977 runTime: {
978 history: new CircularArray()
979 },
980 waitTime: {
981 history: new CircularArray()
982 },
983 elu: expect.objectContaining({
984 idle: expect.objectContaining({
985 history: expect.any(CircularArray)
986 }),
987 active: expect.objectContaining({
988 history: expect.any(CircularArray)
989 })
990 })
991 })
992 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
993 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
994 max * maxMultiplier
995 )
996 if (workerNode.usage.elu.active.aggregate == null) {
997 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
998 } else {
999 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1000 }
1001 if (workerNode.usage.elu.idle.aggregate == null) {
1002 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1003 } else {
1004 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1005 }
1006 if (workerNode.usage.elu.utilization == null) {
1007 expect(workerNode.usage.elu.utilization).toBeUndefined()
1008 } else {
1009 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1010 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1011 }
1012 }
1013 expect(
1014 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1015 pool.workerChoiceStrategyContext.workerChoiceStrategy
1016 ).nextWorkerNodeKey
1017 ).toEqual(expect.any(Number))
1018 expect(
1019 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1020 pool.workerChoiceStrategyContext.workerChoiceStrategy
1021 ).previousWorkerNodeKey
1022 ).toEqual(expect.any(Number))
1023 // We need to clean up the resources after our test
1024 await pool.destroy()
1025 })
1026
1027 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1028 const pool = new DynamicThreadPool(
1029 min,
1030 max,
1031 './tests/worker-files/thread/testWorker.js',
1032 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1033 )
1034 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1035 const promises = new Set()
1036 const maxMultiplier = 2
1037 for (let i = 0; i < max * maxMultiplier; i++) {
1038 promises.add(pool.execute())
1039 }
1040 await Promise.all(promises)
1041 for (const workerNode of pool.workerNodes) {
1042 expect(workerNode.usage).toStrictEqual({
1043 tasks: {
1044 executed: expect.any(Number),
1045 executing: 0,
1046 queued: 0,
1047 maxQueued: 0,
1048 stolen: 0,
1049 failed: 0
1050 },
1051 runTime: {
1052 history: new CircularArray()
1053 },
1054 waitTime: {
1055 history: new CircularArray()
1056 },
1057 elu: expect.objectContaining({
1058 idle: expect.objectContaining({
1059 history: expect.any(CircularArray)
1060 }),
1061 active: expect.objectContaining({
1062 history: expect.any(CircularArray)
1063 })
1064 })
1065 })
1066 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1067 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1068 max * maxMultiplier
1069 )
1070 if (workerNode.usage.elu.active.aggregate == null) {
1071 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1072 } else {
1073 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1074 }
1075 if (workerNode.usage.elu.idle.aggregate == null) {
1076 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1077 } else {
1078 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1079 }
1080 if (workerNode.usage.elu.utilization == null) {
1081 expect(workerNode.usage.elu.utilization).toBeUndefined()
1082 } else {
1083 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1084 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1085 }
1086 }
1087 expect(
1088 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1089 pool.workerChoiceStrategyContext.workerChoiceStrategy
1090 ).nextWorkerNodeKey
1091 ).toEqual(expect.any(Number))
1092 expect(
1093 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1094 pool.workerChoiceStrategyContext.workerChoiceStrategy
1095 ).previousWorkerNodeKey
1096 ).toEqual(expect.any(Number))
1097 // We need to clean up the resources after our test
1098 await pool.destroy()
1099 })
1100
1101 it('Verify FAIR_SHARE strategy default policy', async () => {
1102 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1103 let pool = new FixedThreadPool(
1104 max,
1105 './tests/worker-files/thread/testWorker.js',
1106 { workerChoiceStrategy }
1107 )
1108 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1109 dynamicWorkerUsage: false,
1110 dynamicWorkerReady: true
1111 })
1112 await pool.destroy()
1113 pool = new DynamicThreadPool(
1114 min,
1115 max,
1116 './tests/worker-files/thread/testWorker.js',
1117 { workerChoiceStrategy }
1118 )
1119 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1120 dynamicWorkerUsage: false,
1121 dynamicWorkerReady: true
1122 })
1123 // We need to clean up the resources after our test
1124 await pool.destroy()
1125 })
1126
1127 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1128 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1129 let pool = new FixedThreadPool(
1130 max,
1131 './tests/worker-files/thread/testWorker.js',
1132 { workerChoiceStrategy }
1133 )
1134 expect(
1135 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1136 ).toStrictEqual({
1137 runTime: {
1138 aggregate: true,
1139 average: true,
1140 median: false
1141 },
1142 waitTime: {
1143 aggregate: false,
1144 average: false,
1145 median: false
1146 },
1147 elu: {
1148 aggregate: true,
1149 average: true,
1150 median: false
1151 }
1152 })
1153 await pool.destroy()
1154 pool = new DynamicThreadPool(
1155 min,
1156 max,
1157 './tests/worker-files/thread/testWorker.js',
1158 { workerChoiceStrategy }
1159 )
1160 expect(
1161 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1162 ).toStrictEqual({
1163 runTime: {
1164 aggregate: true,
1165 average: true,
1166 median: false
1167 },
1168 waitTime: {
1169 aggregate: false,
1170 average: false,
1171 median: false
1172 },
1173 elu: {
1174 aggregate: true,
1175 average: true,
1176 median: false
1177 }
1178 })
1179 // We need to clean up the resources after our test
1180 await pool.destroy()
1181 })
1182
1183 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1184 const pool = new FixedThreadPool(
1185 max,
1186 './tests/worker-files/thread/testWorker.js',
1187 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1188 )
1189 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1190 const promises = new Set()
1191 const maxMultiplier = 2
1192 for (let i = 0; i < max * maxMultiplier; i++) {
1193 promises.add(pool.execute())
1194 }
1195 await Promise.all(promises)
1196 for (const workerNode of pool.workerNodes) {
1197 expect(workerNode.usage).toStrictEqual({
1198 tasks: {
1199 executed: expect.any(Number),
1200 executing: 0,
1201 queued: 0,
1202 maxQueued: 0,
1203 stolen: 0,
1204 failed: 0
1205 },
1206 runTime: expect.objectContaining({
1207 history: expect.any(CircularArray)
1208 }),
1209 waitTime: {
1210 history: new CircularArray()
1211 },
1212 elu: expect.objectContaining({
1213 idle: expect.objectContaining({
1214 history: expect.any(CircularArray)
1215 }),
1216 active: expect.objectContaining({
1217 history: expect.any(CircularArray)
1218 })
1219 })
1220 })
1221 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1222 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1223 max * maxMultiplier
1224 )
1225 if (workerNode.usage.runTime.aggregate == null) {
1226 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1227 } else {
1228 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1229 }
1230 if (workerNode.usage.runTime.average == null) {
1231 expect(workerNode.usage.runTime.average).toBeUndefined()
1232 } else {
1233 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1234 }
1235 if (workerNode.usage.elu.active.aggregate == null) {
1236 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1237 } else {
1238 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1239 }
1240 if (workerNode.usage.elu.idle.aggregate == null) {
1241 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1242 } else {
1243 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1244 }
1245 if (workerNode.usage.elu.utilization == null) {
1246 expect(workerNode.usage.elu.utilization).toBeUndefined()
1247 } else {
1248 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1249 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1250 }
1251 }
1252 expect(
1253 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1254 pool.workerChoiceStrategyContext.workerChoiceStrategy
1255 ).nextWorkerNodeKey
1256 ).toEqual(expect.any(Number))
1257 expect(
1258 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1259 pool.workerChoiceStrategyContext.workerChoiceStrategy
1260 ).previousWorkerNodeKey
1261 ).toEqual(expect.any(Number))
1262 // We need to clean up the resources after our test
1263 await pool.destroy()
1264 })
1265
1266 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1267 const pool = new DynamicThreadPool(
1268 min,
1269 max,
1270 './tests/worker-files/thread/testWorker.js',
1271 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1272 )
1273 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1274 const promises = new Set()
1275 const maxMultiplier = 2
1276 for (let i = 0; i < max * maxMultiplier; i++) {
1277 promises.add(pool.execute())
1278 }
1279 await Promise.all(promises)
1280 for (const workerNode of pool.workerNodes) {
1281 expect(workerNode.usage).toStrictEqual({
1282 tasks: {
1283 executed: expect.any(Number),
1284 executing: 0,
1285 queued: 0,
1286 maxQueued: 0,
1287 stolen: 0,
1288 failed: 0
1289 },
1290 runTime: expect.objectContaining({
1291 history: expect.any(CircularArray)
1292 }),
1293 waitTime: {
1294 history: new CircularArray()
1295 },
1296 elu: expect.objectContaining({
1297 idle: expect.objectContaining({
1298 history: expect.any(CircularArray)
1299 }),
1300 active: expect.objectContaining({
1301 history: expect.any(CircularArray)
1302 })
1303 })
1304 })
1305 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1306 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1307 max * maxMultiplier
1308 )
1309 if (workerNode.usage.runTime.aggregate == null) {
1310 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1311 } else {
1312 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1313 }
1314 if (workerNode.usage.runTime.average == null) {
1315 expect(workerNode.usage.runTime.average).toBeUndefined()
1316 } else {
1317 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1318 }
1319 if (workerNode.usage.elu.active.aggregate == null) {
1320 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1321 } else {
1322 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1323 }
1324 if (workerNode.usage.elu.idle.aggregate == null) {
1325 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1326 } else {
1327 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1328 }
1329 if (workerNode.usage.elu.utilization == null) {
1330 expect(workerNode.usage.elu.utilization).toBeUndefined()
1331 } else {
1332 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1333 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1334 }
1335 }
1336 expect(
1337 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1338 pool.workerChoiceStrategyContext.workerChoiceStrategy
1339 ).nextWorkerNodeKey
1340 ).toEqual(expect.any(Number))
1341 expect(
1342 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1343 pool.workerChoiceStrategyContext.workerChoiceStrategy
1344 ).previousWorkerNodeKey
1345 ).toEqual(expect.any(Number))
1346 // We need to clean up the resources after our test
1347 await pool.destroy()
1348 })
1349
1350 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1351 const pool = new DynamicThreadPool(
1352 min,
1353 max,
1354 './tests/worker-files/thread/testWorker.js',
1355 {
1356 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1357 workerChoiceStrategyOptions: {
1358 runTime: { median: true }
1359 }
1360 }
1361 )
1362 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1363 const promises = new Set()
1364 const maxMultiplier = 2
1365 for (let i = 0; i < max * maxMultiplier; i++) {
1366 promises.add(pool.execute())
1367 }
1368 await Promise.all(promises)
1369 for (const workerNode of pool.workerNodes) {
1370 expect(workerNode.usage).toStrictEqual({
1371 tasks: {
1372 executed: expect.any(Number),
1373 executing: 0,
1374 queued: 0,
1375 maxQueued: 0,
1376 stolen: 0,
1377 failed: 0
1378 },
1379 runTime: expect.objectContaining({
1380 history: expect.any(CircularArray)
1381 }),
1382 waitTime: {
1383 history: new CircularArray()
1384 },
1385 elu: expect.objectContaining({
1386 idle: expect.objectContaining({
1387 history: expect.any(CircularArray)
1388 }),
1389 active: expect.objectContaining({
1390 history: expect.any(CircularArray)
1391 })
1392 })
1393 })
1394 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1395 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1396 max * maxMultiplier
1397 )
1398 if (workerNode.usage.runTime.aggregate == null) {
1399 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1400 } else {
1401 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1402 }
1403 if (workerNode.usage.runTime.median == null) {
1404 expect(workerNode.usage.runTime.median).toBeUndefined()
1405 } else {
1406 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1407 }
1408 if (workerNode.usage.elu.active.aggregate == null) {
1409 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1410 } else {
1411 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1412 }
1413 if (workerNode.usage.elu.idle.aggregate == null) {
1414 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1415 } else {
1416 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1417 }
1418 if (workerNode.usage.elu.utilization == null) {
1419 expect(workerNode.usage.elu.utilization).toBeUndefined()
1420 } else {
1421 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1422 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1423 }
1424 }
1425 expect(
1426 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1427 pool.workerChoiceStrategyContext.workerChoiceStrategy
1428 ).nextWorkerNodeKey
1429 ).toEqual(expect.any(Number))
1430 expect(
1431 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1432 pool.workerChoiceStrategyContext.workerChoiceStrategy
1433 ).previousWorkerNodeKey
1434 ).toEqual(expect.any(Number))
1435 // We need to clean up the resources after our test
1436 await pool.destroy()
1437 })
1438
1439 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1440 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1441 let pool = new FixedThreadPool(
1442 max,
1443 './tests/worker-files/thread/testWorker.js'
1444 )
1445 for (const workerNode of pool.workerNodes) {
1446 workerNode.strategyData = {
1447 virtualTaskEndTimestamp: performance.now()
1448 }
1449 }
1450 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1451 for (const workerNode of pool.workerNodes) {
1452 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1453 }
1454 await pool.destroy()
1455 pool = new DynamicThreadPool(
1456 min,
1457 max,
1458 './tests/worker-files/thread/testWorker.js'
1459 )
1460 for (const workerNode of pool.workerNodes) {
1461 workerNode.strategyData = {
1462 virtualTaskEndTimestamp: performance.now()
1463 }
1464 }
1465 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1466 for (const workerNode of pool.workerNodes) {
1467 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1468 }
1469 // We need to clean up the resources after our test
1470 await pool.destroy()
1471 })
1472
1473 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1474 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1475 let pool = new FixedThreadPool(
1476 max,
1477 './tests/worker-files/thread/testWorker.js',
1478 { workerChoiceStrategy }
1479 )
1480 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1481 dynamicWorkerUsage: false,
1482 dynamicWorkerReady: true
1483 })
1484 await pool.destroy()
1485 pool = new DynamicThreadPool(
1486 min,
1487 max,
1488 './tests/worker-files/thread/testWorker.js',
1489 { workerChoiceStrategy }
1490 )
1491 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1492 dynamicWorkerUsage: false,
1493 dynamicWorkerReady: true
1494 })
1495 // We need to clean up the resources after our test
1496 await pool.destroy()
1497 })
1498
1499 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1500 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1501 let pool = new FixedThreadPool(
1502 max,
1503 './tests/worker-files/thread/testWorker.js',
1504 { workerChoiceStrategy }
1505 )
1506 expect(
1507 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1508 ).toStrictEqual({
1509 runTime: {
1510 aggregate: true,
1511 average: true,
1512 median: false
1513 },
1514 waitTime: {
1515 aggregate: false,
1516 average: false,
1517 median: false
1518 },
1519 elu: {
1520 aggregate: false,
1521 average: false,
1522 median: false
1523 }
1524 })
1525 await pool.destroy()
1526 pool = new DynamicThreadPool(
1527 min,
1528 max,
1529 './tests/worker-files/thread/testWorker.js',
1530 { workerChoiceStrategy }
1531 )
1532 expect(
1533 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1534 ).toStrictEqual({
1535 runTime: {
1536 aggregate: true,
1537 average: true,
1538 median: false
1539 },
1540 waitTime: {
1541 aggregate: false,
1542 average: false,
1543 median: false
1544 },
1545 elu: {
1546 aggregate: false,
1547 average: false,
1548 median: false
1549 }
1550 })
1551 // We need to clean up the resources after our test
1552 await pool.destroy()
1553 })
1554
1555 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1556 const pool = new FixedThreadPool(
1557 max,
1558 './tests/worker-files/thread/testWorker.js',
1559 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1560 )
1561 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1562 const promises = new Set()
1563 const maxMultiplier = 2
1564 for (let i = 0; i < max * maxMultiplier; i++) {
1565 promises.add(pool.execute())
1566 }
1567 await Promise.all(promises)
1568 for (const workerNode of pool.workerNodes) {
1569 expect(workerNode.usage).toStrictEqual({
1570 tasks: {
1571 executed: expect.any(Number),
1572 executing: 0,
1573 queued: 0,
1574 maxQueued: 0,
1575 stolen: 0,
1576 failed: 0
1577 },
1578 runTime: expect.objectContaining({
1579 history: expect.any(CircularArray)
1580 }),
1581 waitTime: {
1582 history: new CircularArray()
1583 },
1584 elu: {
1585 idle: {
1586 history: new CircularArray()
1587 },
1588 active: {
1589 history: new CircularArray()
1590 }
1591 }
1592 })
1593 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1594 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1595 max * maxMultiplier
1596 )
1597 if (workerNode.usage.runTime.aggregate == null) {
1598 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1599 } else {
1600 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1601 }
1602 if (workerNode.usage.runTime.average == null) {
1603 expect(workerNode.usage.runTime.average).toBeUndefined()
1604 } else {
1605 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1606 }
1607 }
1608 expect(
1609 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1610 pool.workerChoiceStrategyContext.workerChoiceStrategy
1611 ).nextWorkerNodeKey
1612 ).toBe(0)
1613 expect(
1614 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1615 pool.workerChoiceStrategyContext.workerChoiceStrategy
1616 ).previousWorkerNodeKey
1617 ).toBe(0)
1618 expect(
1619 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1620 pool.workerChoiceStrategyContext.workerChoiceStrategy
1621 ).defaultWorkerWeight
1622 ).toBeGreaterThan(0)
1623 expect(
1624 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1625 pool.workerChoiceStrategyContext.workerChoiceStrategy
1626 ).workerNodeVirtualTaskRunTime
1627 ).toBeGreaterThanOrEqual(0)
1628 // We need to clean up the resources after our test
1629 await pool.destroy()
1630 })
1631
1632 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1633 const pool = new DynamicThreadPool(
1634 min,
1635 max,
1636 './tests/worker-files/thread/testWorker.js',
1637 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1638 )
1639 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1640 const promises = new Set()
1641 const maxMultiplier = 2
1642 for (let i = 0; i < max * maxMultiplier; i++) {
1643 promises.add(pool.execute())
1644 }
1645 await Promise.all(promises)
1646 for (const workerNode of pool.workerNodes) {
1647 expect(workerNode.usage).toStrictEqual({
1648 tasks: {
1649 executed: expect.any(Number),
1650 executing: 0,
1651 queued: 0,
1652 maxQueued: 0,
1653 stolen: 0,
1654 failed: 0
1655 },
1656 runTime: expect.objectContaining({
1657 history: expect.any(CircularArray)
1658 }),
1659 waitTime: {
1660 history: new CircularArray()
1661 },
1662 elu: {
1663 idle: {
1664 history: new CircularArray()
1665 },
1666 active: {
1667 history: new CircularArray()
1668 }
1669 }
1670 })
1671 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1672 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1673 max * maxMultiplier
1674 )
1675 if (workerNode.usage.runTime.aggregate == null) {
1676 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1677 } else {
1678 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1679 }
1680 if (workerNode.usage.runTime.average == null) {
1681 expect(workerNode.usage.runTime.average).toBeUndefined()
1682 } else {
1683 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1684 }
1685 }
1686 expect(
1687 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1688 pool.workerChoiceStrategyContext.workerChoiceStrategy
1689 ).nextWorkerNodeKey
1690 ).toBe(0)
1691 expect(
1692 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1693 pool.workerChoiceStrategyContext.workerChoiceStrategy
1694 ).previousWorkerNodeKey
1695 ).toBe(0)
1696 expect(
1697 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1698 pool.workerChoiceStrategyContext.workerChoiceStrategy
1699 ).defaultWorkerWeight
1700 ).toBeGreaterThan(0)
1701 expect(
1702 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1703 pool.workerChoiceStrategyContext.workerChoiceStrategy
1704 ).workerNodeVirtualTaskRunTime
1705 ).toBeGreaterThanOrEqual(0)
1706 // We need to clean up the resources after our test
1707 await pool.destroy()
1708 })
1709
1710 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1711 const pool = new DynamicThreadPool(
1712 min,
1713 max,
1714 './tests/worker-files/thread/testWorker.js',
1715 {
1716 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1717 workerChoiceStrategyOptions: {
1718 runTime: { median: true }
1719 }
1720 }
1721 )
1722 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1723 const promises = new Set()
1724 const maxMultiplier = 2
1725 for (let i = 0; i < max * maxMultiplier; i++) {
1726 promises.add(pool.execute())
1727 }
1728 await Promise.all(promises)
1729 for (const workerNode of pool.workerNodes) {
1730 expect(workerNode.usage).toStrictEqual({
1731 tasks: {
1732 executed: expect.any(Number),
1733 executing: 0,
1734 queued: 0,
1735 maxQueued: 0,
1736 stolen: 0,
1737 failed: 0
1738 },
1739 runTime: expect.objectContaining({
1740 history: expect.any(CircularArray)
1741 }),
1742 waitTime: {
1743 history: new CircularArray()
1744 },
1745 elu: {
1746 idle: {
1747 history: new CircularArray()
1748 },
1749 active: {
1750 history: new CircularArray()
1751 }
1752 }
1753 })
1754 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1755 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1756 max * maxMultiplier
1757 )
1758 if (workerNode.usage.runTime.aggregate == null) {
1759 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1760 } else {
1761 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1762 }
1763 if (workerNode.usage.runTime.median == null) {
1764 expect(workerNode.usage.runTime.median).toBeUndefined()
1765 } else {
1766 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1767 }
1768 }
1769 expect(
1770 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1771 pool.workerChoiceStrategyContext.workerChoiceStrategy
1772 ).nextWorkerNodeKey
1773 ).toBe(0)
1774 expect(
1775 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1776 pool.workerChoiceStrategyContext.workerChoiceStrategy
1777 ).previousWorkerNodeKey
1778 ).toBe(0)
1779 expect(
1780 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1781 pool.workerChoiceStrategyContext.workerChoiceStrategy
1782 ).defaultWorkerWeight
1783 ).toBeGreaterThan(0)
1784 expect(
1785 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1786 pool.workerChoiceStrategyContext.workerChoiceStrategy
1787 ).workerNodeVirtualTaskRunTime
1788 ).toBeGreaterThanOrEqual(0)
1789 // We need to clean up the resources after our test
1790 await pool.destroy()
1791 })
1792
1793 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1794 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1795 let pool = new FixedThreadPool(
1796 max,
1797 './tests/worker-files/thread/testWorker.js'
1798 )
1799 expect(
1800 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1801 workerChoiceStrategy
1802 ).nextWorkerNodeKey
1803 ).toBeDefined()
1804 expect(
1805 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1806 workerChoiceStrategy
1807 ).previousWorkerNodeKey
1808 ).toBeDefined()
1809 expect(
1810 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1811 workerChoiceStrategy
1812 ).defaultWorkerWeight
1813 ).toBeDefined()
1814 expect(
1815 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1816 workerChoiceStrategy
1817 ).workerNodeVirtualTaskRunTime
1818 ).toBeDefined()
1819 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1820 expect(
1821 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1822 pool.workerChoiceStrategyContext.workerChoiceStrategy
1823 ).nextWorkerNodeKey
1824 ).toBe(0)
1825 expect(
1826 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1827 pool.workerChoiceStrategyContext.workerChoiceStrategy
1828 ).previousWorkerNodeKey
1829 ).toBe(0)
1830 expect(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1832 pool.workerChoiceStrategyContext.workerChoiceStrategy
1833 ).defaultWorkerWeight
1834 ).toBeGreaterThan(0)
1835 expect(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 pool.workerChoiceStrategyContext.workerChoiceStrategy
1838 ).workerNodeVirtualTaskRunTime
1839 ).toBe(0)
1840 await pool.destroy()
1841 pool = new DynamicThreadPool(
1842 min,
1843 max,
1844 './tests/worker-files/thread/testWorker.js'
1845 )
1846 expect(
1847 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1848 workerChoiceStrategy
1849 ).nextWorkerNodeKey
1850 ).toBeDefined()
1851 expect(
1852 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1853 workerChoiceStrategy
1854 ).previousWorkerNodeKey
1855 ).toBeDefined()
1856 expect(
1857 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1858 workerChoiceStrategy
1859 ).defaultWorkerWeight
1860 ).toBeDefined()
1861 expect(
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 workerChoiceStrategy
1864 ).workerNodeVirtualTaskRunTime
1865 ).toBeDefined()
1866 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1867 expect(
1868 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1869 pool.workerChoiceStrategyContext.workerChoiceStrategy
1870 ).nextWorkerNodeKey
1871 ).toBe(0)
1872 expect(
1873 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1874 pool.workerChoiceStrategyContext.workerChoiceStrategy
1875 ).previousWorkerNodeKey
1876 ).toBe(0)
1877 expect(
1878 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1879 pool.workerChoiceStrategyContext.workerChoiceStrategy
1880 ).defaultWorkerWeight
1881 ).toBeGreaterThan(0)
1882 expect(
1883 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1884 pool.workerChoiceStrategyContext.workerChoiceStrategy
1885 ).workerNodeVirtualTaskRunTime
1886 ).toBe(0)
1887 // We need to clean up the resources after our test
1888 await pool.destroy()
1889 })
1890
1891 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1892 const workerChoiceStrategy =
1893 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1894 let pool = new FixedThreadPool(
1895 max,
1896 './tests/worker-files/thread/testWorker.js',
1897 { workerChoiceStrategy }
1898 )
1899 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1900 dynamicWorkerUsage: false,
1901 dynamicWorkerReady: true
1902 })
1903 await pool.destroy()
1904 pool = new DynamicThreadPool(
1905 min,
1906 max,
1907 './tests/worker-files/thread/testWorker.js',
1908 { workerChoiceStrategy }
1909 )
1910 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1911 dynamicWorkerUsage: false,
1912 dynamicWorkerReady: true
1913 })
1914 // We need to clean up the resources after our test
1915 await pool.destroy()
1916 })
1917
1918 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1919 const workerChoiceStrategy =
1920 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1921 let pool = new FixedThreadPool(
1922 max,
1923 './tests/worker-files/thread/testWorker.js',
1924 { workerChoiceStrategy }
1925 )
1926 expect(
1927 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1928 ).toStrictEqual({
1929 runTime: {
1930 aggregate: true,
1931 average: true,
1932 median: false
1933 },
1934 waitTime: {
1935 aggregate: false,
1936 average: false,
1937 median: false
1938 },
1939 elu: {
1940 aggregate: false,
1941 average: false,
1942 median: false
1943 }
1944 })
1945 await pool.destroy()
1946 pool = new DynamicThreadPool(
1947 min,
1948 max,
1949 './tests/worker-files/thread/testWorker.js',
1950 { workerChoiceStrategy }
1951 )
1952 expect(
1953 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1954 ).toStrictEqual({
1955 runTime: {
1956 aggregate: true,
1957 average: true,
1958 median: false
1959 },
1960 waitTime: {
1961 aggregate: false,
1962 average: false,
1963 median: false
1964 },
1965 elu: {
1966 aggregate: false,
1967 average: false,
1968 median: false
1969 }
1970 })
1971 // We need to clean up the resources after our test
1972 await pool.destroy()
1973 })
1974
1975 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1976 const pool = new FixedThreadPool(
1977 max,
1978 './tests/worker-files/thread/testWorker.js',
1979 {
1980 workerChoiceStrategy:
1981 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1982 }
1983 )
1984 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1985 const promises = new Set()
1986 const maxMultiplier = 2
1987 for (let i = 0; i < max * maxMultiplier; i++) {
1988 promises.add(pool.execute())
1989 }
1990 await Promise.all(promises)
1991 for (const workerNode of pool.workerNodes) {
1992 expect(workerNode.usage).toStrictEqual({
1993 tasks: {
1994 executed: expect.any(Number),
1995 executing: 0,
1996 queued: 0,
1997 maxQueued: 0,
1998 stolen: 0,
1999 failed: 0
2000 },
2001 runTime: expect.objectContaining({
2002 history: expect.any(CircularArray)
2003 }),
2004 waitTime: {
2005 history: new CircularArray()
2006 },
2007 elu: {
2008 idle: {
2009 history: new CircularArray()
2010 },
2011 active: {
2012 history: new CircularArray()
2013 }
2014 }
2015 })
2016 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2017 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2018 max * maxMultiplier
2019 )
2020 }
2021 expect(
2022 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2023 pool.workerChoiceStrategyContext.workerChoiceStrategy
2024 ).defaultWorkerWeight
2025 ).toBeGreaterThan(0)
2026 expect(
2027 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2028 pool.workerChoiceStrategyContext.workerChoiceStrategy
2029 ).roundId
2030 ).toBe(0)
2031 expect(
2032 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2033 pool.workerChoiceStrategyContext.workerChoiceStrategy
2034 ).workerNodeId
2035 ).toBe(0)
2036 expect(
2037 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2038 pool.workerChoiceStrategyContext.workerChoiceStrategy
2039 ).nextWorkerNodeKey
2040 ).toBe(0)
2041 expect(
2042 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2043 pool.workerChoiceStrategyContext.workerChoiceStrategy
2044 ).previousWorkerNodeKey
2045 ).toEqual(expect.any(Number))
2046 expect(
2047 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2048 pool.workerChoiceStrategyContext.workerChoiceStrategy
2049 ).roundWeights
2050 ).toStrictEqual([
2051 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2052 pool.workerChoiceStrategyContext.workerChoiceStrategy
2053 ).defaultWorkerWeight
2054 ])
2055 // We need to clean up the resources after our test
2056 await pool.destroy()
2057 })
2058
2059 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2060 const pool = new DynamicThreadPool(
2061 min,
2062 max,
2063 './tests/worker-files/thread/testWorker.js',
2064 {
2065 workerChoiceStrategy:
2066 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2067 }
2068 )
2069 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2070 const promises = new Set()
2071 const maxMultiplier = 2
2072 for (let i = 0; i < max * maxMultiplier; i++) {
2073 promises.add(pool.execute())
2074 }
2075 await Promise.all(promises)
2076 for (const workerNode of pool.workerNodes) {
2077 expect(workerNode.usage).toStrictEqual({
2078 tasks: {
2079 executed: expect.any(Number),
2080 executing: 0,
2081 queued: 0,
2082 maxQueued: 0,
2083 stolen: 0,
2084 failed: 0
2085 },
2086 runTime: expect.objectContaining({
2087 history: expect.any(CircularArray)
2088 }),
2089 waitTime: {
2090 history: new CircularArray()
2091 },
2092 elu: {
2093 idle: {
2094 history: new CircularArray()
2095 },
2096 active: {
2097 history: new CircularArray()
2098 }
2099 }
2100 })
2101 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2102 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2103 max * maxMultiplier
2104 )
2105 }
2106 expect(
2107 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2108 pool.workerChoiceStrategyContext.workerChoiceStrategy
2109 ).defaultWorkerWeight
2110 ).toBeGreaterThan(0)
2111 expect(
2112 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2113 pool.workerChoiceStrategyContext.workerChoiceStrategy
2114 ).roundId
2115 ).toBe(0)
2116 expect(
2117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2118 pool.workerChoiceStrategyContext.workerChoiceStrategy
2119 ).workerNodeId
2120 ).toBe(0)
2121 expect(
2122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2123 pool.workerChoiceStrategyContext.workerChoiceStrategy
2124 ).nextWorkerNodeKey
2125 ).toBe(0)
2126 expect(
2127 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2128 pool.workerChoiceStrategyContext.workerChoiceStrategy
2129 ).previousWorkerNodeKey
2130 ).toEqual(expect.any(Number))
2131 expect(
2132 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2133 pool.workerChoiceStrategyContext.workerChoiceStrategy
2134 ).roundWeights
2135 ).toStrictEqual([
2136 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2137 pool.workerChoiceStrategyContext.workerChoiceStrategy
2138 ).defaultWorkerWeight
2139 ])
2140 // We need to clean up the resources after our test
2141 await pool.destroy()
2142 })
2143
2144 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2145 const workerChoiceStrategy =
2146 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2147 let pool = new FixedThreadPool(
2148 max,
2149 './tests/worker-files/thread/testWorker.js'
2150 )
2151 expect(
2152 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2153 workerChoiceStrategy
2154 ).roundId
2155 ).toBeDefined()
2156 expect(
2157 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2158 workerChoiceStrategy
2159 ).workerNodeId
2160 ).toBeDefined()
2161 expect(
2162 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2163 workerChoiceStrategy
2164 ).nextWorkerNodeKey
2165 ).toBeDefined()
2166 expect(
2167 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2168 workerChoiceStrategy
2169 ).previousWorkerNodeKey
2170 ).toBeDefined()
2171 expect(
2172 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2173 workerChoiceStrategy
2174 ).defaultWorkerWeight
2175 ).toBeDefined()
2176 expect(
2177 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2178 workerChoiceStrategy
2179 ).roundWeights
2180 ).toBeDefined()
2181 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2182 expect(
2183 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2184 pool.workerChoiceStrategyContext.workerChoiceStrategy
2185 ).roundId
2186 ).toBe(0)
2187 expect(
2188 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2189 pool.workerChoiceStrategyContext.workerChoiceStrategy
2190 ).workerNodeId
2191 ).toBe(0)
2192 expect(
2193 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2194 pool.workerChoiceStrategyContext.workerChoiceStrategy
2195 ).nextWorkerNodeKey
2196 ).toBe(0)
2197 expect(
2198 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2199 pool.workerChoiceStrategyContext.workerChoiceStrategy
2200 ).previousWorkerNodeKey
2201 ).toBe(0)
2202 expect(
2203 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2204 pool.workerChoiceStrategyContext.workerChoiceStrategy
2205 ).defaultWorkerWeight
2206 ).toBeGreaterThan(0)
2207 expect(
2208 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2209 pool.workerChoiceStrategyContext.workerChoiceStrategy
2210 ).roundWeights
2211 ).toStrictEqual([
2212 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2213 pool.workerChoiceStrategyContext.workerChoiceStrategy
2214 ).defaultWorkerWeight
2215 ])
2216 await pool.destroy()
2217 pool = new DynamicThreadPool(
2218 min,
2219 max,
2220 './tests/worker-files/thread/testWorker.js'
2221 )
2222 expect(
2223 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2224 workerChoiceStrategy
2225 ).roundId
2226 ).toBeDefined()
2227 expect(
2228 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2229 workerChoiceStrategy
2230 ).workerNodeId
2231 ).toBeDefined()
2232 expect(
2233 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2234 workerChoiceStrategy
2235 ).nextWorkerNodeKey
2236 ).toBeDefined()
2237 expect(
2238 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2239 workerChoiceStrategy
2240 ).previousWorkerNodeKey
2241 ).toBeDefined()
2242 expect(
2243 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2244 workerChoiceStrategy
2245 ).defaultWorkerWeight
2246 ).toBeDefined()
2247 expect(
2248 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2249 workerChoiceStrategy
2250 ).roundWeights
2251 ).toBeDefined()
2252 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2253 expect(
2254 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2255 pool.workerChoiceStrategyContext.workerChoiceStrategy
2256 ).roundId
2257 ).toBe(0)
2258 expect(
2259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2260 pool.workerChoiceStrategyContext.workerChoiceStrategy
2261 ).workerNodeId
2262 ).toBe(0)
2263 expect(
2264 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2265 pool.workerChoiceStrategyContext.workerChoiceStrategy
2266 ).nextWorkerNodeKey
2267 ).toBe(0)
2268 expect(
2269 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2270 pool.workerChoiceStrategyContext.workerChoiceStrategy
2271 ).previousWorkerNodeKey
2272 ).toBe(0)
2273 expect(
2274 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2275 pool.workerChoiceStrategyContext.workerChoiceStrategy
2276 ).defaultWorkerWeight
2277 ).toBeGreaterThan(0)
2278 expect(
2279 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2280 pool.workerChoiceStrategyContext.workerChoiceStrategy
2281 ).roundWeights
2282 ).toStrictEqual([
2283 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2284 pool.workerChoiceStrategyContext.workerChoiceStrategy
2285 ).defaultWorkerWeight
2286 ])
2287 // We need to clean up the resources after our test
2288 await pool.destroy()
2289 })
2290
2291 it('Verify unknown strategy throw error', () => {
2292 expect(
2293 () =>
2294 new DynamicThreadPool(
2295 min,
2296 max,
2297 './tests/worker-files/thread/testWorker.js',
2298 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2299 )
2300 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2301 })
2302 })