refactor: untangle worker eligibility from previous worker node
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
... / ...
CommitLineData
1const { expect } = require('expect')
2const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8} = require('../../../lib')
9const { CircularArray } = require('../../../lib/circular-array')
10
11describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.js'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.js',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.js'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.js'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 expect(
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
118 workerChoiceStrategy
119 ).nextWorkerNodeKey
120 ).toBe(0)
121 expect(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).previousWorkerNodeKey
125 ).toBe(0)
126 if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
127 expect(
128 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
129 workerChoiceStrategy
130 ).workersVirtualTaskEndTimestamp
131 ).toBeInstanceOf(Array)
132 expect(
133 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
134 workerChoiceStrategy
135 ).workersVirtualTaskEndTimestamp.length
136 ).toBe(0)
137 } else if (
138 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
139 ) {
140 expect(
141 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
142 workerChoiceStrategy
143 ).defaultWorkerWeight
144 ).toBeGreaterThan(0)
145 expect(
146 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
147 workerChoiceStrategy
148 ).workerVirtualTaskRunTime
149 ).toBe(0)
150 } else if (
151 workerChoiceStrategy ===
152 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
153 ) {
154 expect(
155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
156 workerChoiceStrategy
157 ).defaultWorkerWeight
158 ).toBeGreaterThan(0)
159 expect(
160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
161 workerChoiceStrategy
162 ).workerVirtualTaskRunTime
163 ).toBe(0)
164 expect(
165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
166 workerChoiceStrategy
167 ).roundId
168 ).toBe(0)
169 expect(
170 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
171 workerChoiceStrategy
172 ).workerNodeId
173 ).toBe(0)
174 expect(
175 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
176 workerChoiceStrategy
177 ).roundWeights
178 ).toStrictEqual([
179 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
180 workerChoiceStrategy
181 ).defaultWorkerWeight
182 ])
183 }
184 }
185 await pool.destroy()
186 })
187
188 it('Verify ROUND_ROBIN strategy default policy', async () => {
189 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
190 let pool = new FixedThreadPool(
191 max,
192 './tests/worker-files/thread/testWorker.js',
193 { workerChoiceStrategy }
194 )
195 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
196 dynamicWorkerUsage: false,
197 dynamicWorkerReady: true
198 })
199 await pool.destroy()
200 pool = new DynamicThreadPool(
201 min,
202 max,
203 './tests/worker-files/thread/testWorker.js',
204 { workerChoiceStrategy }
205 )
206 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
207 dynamicWorkerUsage: false,
208 dynamicWorkerReady: true
209 })
210 // We need to clean up the resources after our test
211 await pool.destroy()
212 })
213
214 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
215 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
216 let pool = new FixedThreadPool(
217 max,
218 './tests/worker-files/thread/testWorker.js',
219 { workerChoiceStrategy }
220 )
221 expect(
222 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
223 ).toStrictEqual({
224 runTime: {
225 aggregate: false,
226 average: false,
227 median: false
228 },
229 waitTime: {
230 aggregate: false,
231 average: false,
232 median: false
233 },
234 elu: {
235 aggregate: false,
236 average: false,
237 median: false
238 }
239 })
240 await pool.destroy()
241 pool = new DynamicThreadPool(
242 min,
243 max,
244 './tests/worker-files/thread/testWorker.js',
245 { workerChoiceStrategy }
246 )
247 expect(
248 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
249 ).toStrictEqual({
250 runTime: {
251 aggregate: false,
252 average: false,
253 median: false
254 },
255 waitTime: {
256 aggregate: false,
257 average: false,
258 median: false
259 },
260 elu: {
261 aggregate: false,
262 average: false,
263 median: false
264 }
265 })
266 // We need to clean up the resources after our test
267 await pool.destroy()
268 })
269
270 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
271 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
272 const pool = new FixedThreadPool(
273 max,
274 './tests/worker-files/thread/testWorker.js',
275 { workerChoiceStrategy }
276 )
277 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
278 const promises = new Set()
279 const maxMultiplier = 2
280 for (let i = 0; i < max * maxMultiplier; i++) {
281 promises.add(pool.execute())
282 }
283 await Promise.all(promises)
284 for (const workerNode of pool.workerNodes) {
285 expect(workerNode.usage).toStrictEqual({
286 tasks: {
287 executed: maxMultiplier,
288 executing: 0,
289 queued: 0,
290 maxQueued: 0,
291 stolen: 0,
292 failed: 0
293 },
294 runTime: {
295 history: new CircularArray()
296 },
297 waitTime: {
298 history: new CircularArray()
299 },
300 elu: {
301 idle: {
302 history: new CircularArray()
303 },
304 active: {
305 history: new CircularArray()
306 }
307 }
308 })
309 }
310 expect(
311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
312 pool.workerChoiceStrategyContext.workerChoiceStrategy
313 ).nextWorkerNodeKey
314 ).toBe(0)
315 expect(
316 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
317 pool.workerChoiceStrategyContext.workerChoiceStrategy
318 ).previousWorkerNodeKey
319 ).toBe(pool.workerNodes.length - 1)
320 // We need to clean up the resources after our test
321 await pool.destroy()
322 })
323
324 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
325 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
326 const pool = new DynamicThreadPool(
327 min,
328 max,
329 './tests/worker-files/thread/testWorker.js',
330 { workerChoiceStrategy }
331 )
332 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
333 const promises = new Set()
334 const maxMultiplier = 2
335 for (let i = 0; i < max * maxMultiplier; i++) {
336 promises.add(pool.execute())
337 }
338 await Promise.all(promises)
339 for (const workerNode of pool.workerNodes) {
340 expect(workerNode.usage).toStrictEqual({
341 tasks: {
342 executed: expect.any(Number),
343 executing: 0,
344 queued: 0,
345 maxQueued: 0,
346 stolen: 0,
347 failed: 0
348 },
349 runTime: {
350 history: new CircularArray()
351 },
352 waitTime: {
353 history: new CircularArray()
354 },
355 elu: {
356 idle: {
357 history: new CircularArray()
358 },
359 active: {
360 history: new CircularArray()
361 }
362 }
363 })
364 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
365 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
366 max * maxMultiplier
367 )
368 }
369 expect(
370 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
371 pool.workerChoiceStrategyContext.workerChoiceStrategy
372 ).nextWorkerNodeKey
373 ).toBe(0)
374 expect(
375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
376 pool.workerChoiceStrategyContext.workerChoiceStrategy
377 ).previousWorkerNodeKey
378 ).toBe(pool.workerNodes.length - 1)
379 // We need to clean up the resources after our test
380 await pool.destroy()
381 })
382
383 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
385 let pool = new FixedClusterPool(
386 max,
387 './tests/worker-files/cluster/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 let results = new Set()
391 for (let i = 0; i < max; i++) {
392 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
393 }
394 expect(results.size).toBe(max)
395 await pool.destroy()
396 pool = new FixedThreadPool(
397 max,
398 './tests/worker-files/thread/testWorker.js',
399 { workerChoiceStrategy }
400 )
401 results = new Set()
402 for (let i = 0; i < max; i++) {
403 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
404 }
405 expect(results.size).toBe(max)
406 await pool.destroy()
407 })
408
409 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
410 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
411 let pool = new FixedThreadPool(
412 max,
413 './tests/worker-files/thread/testWorker.js',
414 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
415 )
416 expect(
417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
418 pool.workerChoiceStrategyContext.workerChoiceStrategy
419 ).nextWorkerNodeKey
420 ).toBeDefined()
421 expect(
422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
423 pool.workerChoiceStrategyContext.workerChoiceStrategy
424 ).previousWorkerNodeKey
425 ).toBeDefined()
426 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
427 expect(
428 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
429 pool.workerChoiceStrategyContext.workerChoiceStrategy
430 ).nextWorkerNodeKey
431 ).toBe(0)
432 expect(
433 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
434 pool.workerChoiceStrategyContext.workerChoiceStrategy
435 ).previousWorkerNodeKey
436 ).toBe(0)
437 await pool.destroy()
438 pool = new DynamicThreadPool(
439 min,
440 max,
441 './tests/worker-files/thread/testWorker.js',
442 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
443 )
444 expect(
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
447 ).nextWorkerNodeKey
448 ).toBeDefined()
449 expect(
450 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
451 pool.workerChoiceStrategyContext.workerChoiceStrategy
452 ).previousWorkerNodeKey
453 ).toBeDefined()
454 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
455 expect(
456 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
457 pool.workerChoiceStrategyContext.workerChoiceStrategy
458 ).nextWorkerNodeKey
459 ).toBe(0)
460 expect(
461 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
462 pool.workerChoiceStrategyContext.workerChoiceStrategy
463 ).previousWorkerNodeKey
464 ).toBe(0)
465 // We need to clean up the resources after our test
466 await pool.destroy()
467 })
468
469 it('Verify LEAST_USED strategy default policy', async () => {
470 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
471 let pool = new FixedThreadPool(
472 max,
473 './tests/worker-files/thread/testWorker.js',
474 { workerChoiceStrategy }
475 )
476 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
477 dynamicWorkerUsage: false,
478 dynamicWorkerReady: true
479 })
480 await pool.destroy()
481 pool = new DynamicThreadPool(
482 min,
483 max,
484 './tests/worker-files/thread/testWorker.js',
485 { workerChoiceStrategy }
486 )
487 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
488 dynamicWorkerUsage: false,
489 dynamicWorkerReady: true
490 })
491 // We need to clean up the resources after our test
492 await pool.destroy()
493 })
494
495 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
496 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
497 let pool = new FixedThreadPool(
498 max,
499 './tests/worker-files/thread/testWorker.js',
500 { workerChoiceStrategy }
501 )
502 expect(
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
504 ).toStrictEqual({
505 runTime: {
506 aggregate: false,
507 average: false,
508 median: false
509 },
510 waitTime: {
511 aggregate: false,
512 average: false,
513 median: false
514 },
515 elu: {
516 aggregate: false,
517 average: false,
518 median: false
519 }
520 })
521 await pool.destroy()
522 pool = new DynamicThreadPool(
523 min,
524 max,
525 './tests/worker-files/thread/testWorker.js',
526 { workerChoiceStrategy }
527 )
528 expect(
529 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
530 ).toStrictEqual({
531 runTime: {
532 aggregate: false,
533 average: false,
534 median: false
535 },
536 waitTime: {
537 aggregate: false,
538 average: false,
539 median: false
540 },
541 elu: {
542 aggregate: false,
543 average: false,
544 median: false
545 }
546 })
547 // We need to clean up the resources after our test
548 await pool.destroy()
549 })
550
551 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
552 const pool = new FixedThreadPool(
553 max,
554 './tests/worker-files/thread/testWorker.js',
555 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
556 )
557 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
558 const promises = new Set()
559 const maxMultiplier = 2
560 for (let i = 0; i < max * maxMultiplier; i++) {
561 promises.add(pool.execute())
562 }
563 await Promise.all(promises)
564 for (const workerNode of pool.workerNodes) {
565 expect(workerNode.usage).toStrictEqual({
566 tasks: {
567 executed: expect.any(Number),
568 executing: 0,
569 queued: 0,
570 maxQueued: 0,
571 stolen: 0,
572 failed: 0
573 },
574 runTime: {
575 history: new CircularArray()
576 },
577 waitTime: {
578 history: new CircularArray()
579 },
580 elu: {
581 idle: {
582 history: new CircularArray()
583 },
584 active: {
585 history: new CircularArray()
586 }
587 }
588 })
589 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
590 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
591 max * maxMultiplier
592 )
593 }
594 // We need to clean up the resources after our test
595 await pool.destroy()
596 })
597
598 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
599 const pool = new DynamicThreadPool(
600 min,
601 max,
602 './tests/worker-files/thread/testWorker.js',
603 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
604 )
605 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
606 const promises = new Set()
607 const maxMultiplier = 2
608 for (let i = 0; i < max * maxMultiplier; i++) {
609 promises.add(pool.execute())
610 }
611 await Promise.all(promises)
612 for (const workerNode of pool.workerNodes) {
613 expect(workerNode.usage).toStrictEqual({
614 tasks: {
615 executed: expect.any(Number),
616 executing: 0,
617 queued: 0,
618 maxQueued: 0,
619 stolen: 0,
620 failed: 0
621 },
622 runTime: {
623 history: new CircularArray()
624 },
625 waitTime: {
626 history: new CircularArray()
627 },
628 elu: {
629 idle: {
630 history: new CircularArray()
631 },
632 active: {
633 history: new CircularArray()
634 }
635 }
636 })
637 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
638 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
639 max * maxMultiplier
640 )
641 }
642 // We need to clean up the resources after our test
643 await pool.destroy()
644 })
645
646 it('Verify LEAST_BUSY strategy default policy', async () => {
647 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
648 let pool = new FixedThreadPool(
649 max,
650 './tests/worker-files/thread/testWorker.js',
651 { workerChoiceStrategy }
652 )
653 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
654 dynamicWorkerUsage: false,
655 dynamicWorkerReady: true
656 })
657 await pool.destroy()
658 pool = new DynamicThreadPool(
659 min,
660 max,
661 './tests/worker-files/thread/testWorker.js',
662 { workerChoiceStrategy }
663 )
664 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
665 dynamicWorkerUsage: false,
666 dynamicWorkerReady: true
667 })
668 // We need to clean up the resources after our test
669 await pool.destroy()
670 })
671
672 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
673 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
674 let pool = new FixedThreadPool(
675 max,
676 './tests/worker-files/thread/testWorker.js',
677 { workerChoiceStrategy }
678 )
679 expect(
680 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
681 ).toStrictEqual({
682 runTime: {
683 aggregate: true,
684 average: false,
685 median: false
686 },
687 waitTime: {
688 aggregate: true,
689 average: false,
690 median: false
691 },
692 elu: {
693 aggregate: false,
694 average: false,
695 median: false
696 }
697 })
698 await pool.destroy()
699 pool = new DynamicThreadPool(
700 min,
701 max,
702 './tests/worker-files/thread/testWorker.js',
703 { workerChoiceStrategy }
704 )
705 expect(
706 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
707 ).toStrictEqual({
708 runTime: {
709 aggregate: true,
710 average: false,
711 median: false
712 },
713 waitTime: {
714 aggregate: true,
715 average: false,
716 median: false
717 },
718 elu: {
719 aggregate: false,
720 average: false,
721 median: false
722 }
723 })
724 // We need to clean up the resources after our test
725 await pool.destroy()
726 })
727
728 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
729 const pool = new FixedThreadPool(
730 max,
731 './tests/worker-files/thread/testWorker.js',
732 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
733 )
734 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
735 const promises = new Set()
736 const maxMultiplier = 2
737 for (let i = 0; i < max * maxMultiplier; i++) {
738 promises.add(pool.execute())
739 }
740 await Promise.all(promises)
741 for (const workerNode of pool.workerNodes) {
742 expect(workerNode.usage).toStrictEqual({
743 tasks: {
744 executed: expect.any(Number),
745 executing: 0,
746 queued: 0,
747 maxQueued: 0,
748 stolen: 0,
749 failed: 0
750 },
751 runTime: expect.objectContaining({
752 history: expect.any(CircularArray)
753 }),
754 waitTime: expect.objectContaining({
755 history: expect.any(CircularArray)
756 }),
757 elu: {
758 idle: {
759 history: new CircularArray()
760 },
761 active: {
762 history: new CircularArray()
763 }
764 }
765 })
766 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
767 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
768 max * maxMultiplier
769 )
770 if (workerNode.usage.runTime.aggregate == null) {
771 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
772 } else {
773 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
774 }
775 if (workerNode.usage.waitTime.aggregate == null) {
776 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
777 } else {
778 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
779 }
780 }
781 // We need to clean up the resources after our test
782 await pool.destroy()
783 })
784
785 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
786 const pool = new DynamicThreadPool(
787 min,
788 max,
789 './tests/worker-files/thread/testWorker.js',
790 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
791 )
792 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
793 const promises = new Set()
794 const maxMultiplier = 2
795 for (let i = 0; i < max * maxMultiplier; i++) {
796 promises.add(pool.execute())
797 }
798 await Promise.all(promises)
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode.usage).toStrictEqual({
801 tasks: {
802 executed: expect.any(Number),
803 executing: 0,
804 queued: 0,
805 maxQueued: 0,
806 stolen: 0,
807 failed: 0
808 },
809 runTime: expect.objectContaining({
810 history: expect.any(CircularArray)
811 }),
812 waitTime: expect.objectContaining({
813 history: expect.any(CircularArray)
814 }),
815 elu: {
816 idle: {
817 history: new CircularArray()
818 },
819 active: {
820 history: new CircularArray()
821 }
822 }
823 })
824 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
825 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
826 max * maxMultiplier
827 )
828 if (workerNode.usage.runTime.aggregate == null) {
829 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
830 } else {
831 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
832 }
833 if (workerNode.usage.waitTime.aggregate == null) {
834 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
835 } else {
836 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
837 }
838 }
839 // We need to clean up the resources after our test
840 await pool.destroy()
841 })
842
843 it('Verify LEAST_ELU strategy default policy', async () => {
844 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
845 let pool = new FixedThreadPool(
846 max,
847 './tests/worker-files/thread/testWorker.js',
848 { workerChoiceStrategy }
849 )
850 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
851 dynamicWorkerUsage: false,
852 dynamicWorkerReady: true
853 })
854 await pool.destroy()
855 pool = new DynamicThreadPool(
856 min,
857 max,
858 './tests/worker-files/thread/testWorker.js',
859 { workerChoiceStrategy }
860 )
861 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
862 dynamicWorkerUsage: false,
863 dynamicWorkerReady: true
864 })
865 // We need to clean up the resources after our test
866 await pool.destroy()
867 })
868
869 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
870 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
871 let pool = new FixedThreadPool(
872 max,
873 './tests/worker-files/thread/testWorker.js',
874 { workerChoiceStrategy }
875 )
876 expect(
877 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
878 ).toStrictEqual({
879 runTime: {
880 aggregate: false,
881 average: false,
882 median: false
883 },
884 waitTime: {
885 aggregate: false,
886 average: false,
887 median: false
888 },
889 elu: {
890 aggregate: true,
891 average: false,
892 median: false
893 }
894 })
895 await pool.destroy()
896 pool = new DynamicThreadPool(
897 min,
898 max,
899 './tests/worker-files/thread/testWorker.js',
900 { workerChoiceStrategy }
901 )
902 expect(
903 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
904 ).toStrictEqual({
905 runTime: {
906 aggregate: false,
907 average: false,
908 median: false
909 },
910 waitTime: {
911 aggregate: false,
912 average: false,
913 median: false
914 },
915 elu: {
916 aggregate: true,
917 average: false,
918 median: false
919 }
920 })
921 // We need to clean up the resources after our test
922 await pool.destroy()
923 })
924
925 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
926 const pool = new FixedThreadPool(
927 max,
928 './tests/worker-files/thread/testWorker.js',
929 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
930 )
931 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
932 const promises = new Set()
933 const maxMultiplier = 2
934 for (let i = 0; i < max * maxMultiplier; i++) {
935 promises.add(pool.execute())
936 }
937 await Promise.all(promises)
938 for (const workerNode of pool.workerNodes) {
939 expect(workerNode.usage).toStrictEqual({
940 tasks: {
941 executed: expect.any(Number),
942 executing: 0,
943 queued: 0,
944 maxQueued: 0,
945 stolen: 0,
946 failed: 0
947 },
948 runTime: {
949 history: new CircularArray()
950 },
951 waitTime: {
952 history: new CircularArray()
953 },
954 elu: expect.objectContaining({
955 idle: expect.objectContaining({
956 history: expect.any(CircularArray)
957 }),
958 active: expect.objectContaining({
959 history: expect.any(CircularArray)
960 })
961 })
962 })
963 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
964 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
965 max * maxMultiplier
966 )
967 if (workerNode.usage.elu.active.aggregate == null) {
968 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
969 } else {
970 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
971 }
972 if (workerNode.usage.elu.idle.aggregate == null) {
973 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
974 } else {
975 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
976 }
977 if (workerNode.usage.elu.utilization == null) {
978 expect(workerNode.usage.elu.utilization).toBeUndefined()
979 } else {
980 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
981 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
982 }
983 }
984 // We need to clean up the resources after our test
985 await pool.destroy()
986 })
987
988 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
989 const pool = new DynamicThreadPool(
990 min,
991 max,
992 './tests/worker-files/thread/testWorker.js',
993 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
994 )
995 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
996 const promises = new Set()
997 const maxMultiplier = 2
998 for (let i = 0; i < max * maxMultiplier; i++) {
999 promises.add(pool.execute())
1000 }
1001 await Promise.all(promises)
1002 for (const workerNode of pool.workerNodes) {
1003 expect(workerNode.usage).toStrictEqual({
1004 tasks: {
1005 executed: expect.any(Number),
1006 executing: 0,
1007 queued: 0,
1008 maxQueued: 0,
1009 stolen: 0,
1010 failed: 0
1011 },
1012 runTime: {
1013 history: new CircularArray()
1014 },
1015 waitTime: {
1016 history: new CircularArray()
1017 },
1018 elu: expect.objectContaining({
1019 idle: expect.objectContaining({
1020 history: expect.any(CircularArray)
1021 }),
1022 active: expect.objectContaining({
1023 history: expect.any(CircularArray)
1024 })
1025 })
1026 })
1027 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1028 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1029 max * maxMultiplier
1030 )
1031 if (workerNode.usage.elu.active.aggregate == null) {
1032 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1033 } else {
1034 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1035 }
1036 if (workerNode.usage.elu.idle.aggregate == null) {
1037 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1038 } else {
1039 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1040 }
1041 if (workerNode.usage.elu.utilization == null) {
1042 expect(workerNode.usage.elu.utilization).toBeUndefined()
1043 } else {
1044 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1045 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1046 }
1047 }
1048 // We need to clean up the resources after our test
1049 await pool.destroy()
1050 })
1051
1052 it('Verify FAIR_SHARE strategy default policy', async () => {
1053 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1054 let pool = new FixedThreadPool(
1055 max,
1056 './tests/worker-files/thread/testWorker.js',
1057 { workerChoiceStrategy }
1058 )
1059 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1060 dynamicWorkerUsage: false,
1061 dynamicWorkerReady: true
1062 })
1063 await pool.destroy()
1064 pool = new DynamicThreadPool(
1065 min,
1066 max,
1067 './tests/worker-files/thread/testWorker.js',
1068 { workerChoiceStrategy }
1069 )
1070 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1071 dynamicWorkerUsage: false,
1072 dynamicWorkerReady: true
1073 })
1074 // We need to clean up the resources after our test
1075 await pool.destroy()
1076 })
1077
1078 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1079 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1080 let pool = new FixedThreadPool(
1081 max,
1082 './tests/worker-files/thread/testWorker.js',
1083 { workerChoiceStrategy }
1084 )
1085 expect(
1086 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1087 ).toStrictEqual({
1088 runTime: {
1089 aggregate: true,
1090 average: true,
1091 median: false
1092 },
1093 waitTime: {
1094 aggregate: false,
1095 average: false,
1096 median: false
1097 },
1098 elu: {
1099 aggregate: true,
1100 average: true,
1101 median: false
1102 }
1103 })
1104 await pool.destroy()
1105 pool = new DynamicThreadPool(
1106 min,
1107 max,
1108 './tests/worker-files/thread/testWorker.js',
1109 { workerChoiceStrategy }
1110 )
1111 expect(
1112 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1113 ).toStrictEqual({
1114 runTime: {
1115 aggregate: true,
1116 average: true,
1117 median: false
1118 },
1119 waitTime: {
1120 aggregate: false,
1121 average: false,
1122 median: false
1123 },
1124 elu: {
1125 aggregate: true,
1126 average: true,
1127 median: false
1128 }
1129 })
1130 // We need to clean up the resources after our test
1131 await pool.destroy()
1132 })
1133
1134 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1135 const pool = new FixedThreadPool(
1136 max,
1137 './tests/worker-files/thread/testWorker.js',
1138 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1139 )
1140 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1141 const promises = new Set()
1142 const maxMultiplier = 2
1143 for (let i = 0; i < max * maxMultiplier; i++) {
1144 promises.add(pool.execute())
1145 }
1146 await Promise.all(promises)
1147 for (const workerNode of pool.workerNodes) {
1148 expect(workerNode.usage).toStrictEqual({
1149 tasks: {
1150 executed: expect.any(Number),
1151 executing: 0,
1152 queued: 0,
1153 maxQueued: 0,
1154 stolen: 0,
1155 failed: 0
1156 },
1157 runTime: expect.objectContaining({
1158 history: expect.any(CircularArray)
1159 }),
1160 waitTime: {
1161 history: new CircularArray()
1162 },
1163 elu: expect.objectContaining({
1164 idle: expect.objectContaining({
1165 history: expect.any(CircularArray)
1166 }),
1167 active: expect.objectContaining({
1168 history: expect.any(CircularArray)
1169 })
1170 })
1171 })
1172 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1173 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1174 max * maxMultiplier
1175 )
1176 if (workerNode.usage.runTime.aggregate == null) {
1177 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1178 } else {
1179 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1180 }
1181 if (workerNode.usage.runTime.average == null) {
1182 expect(workerNode.usage.runTime.average).toBeUndefined()
1183 } else {
1184 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1185 }
1186 if (workerNode.usage.elu.active.aggregate == null) {
1187 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1188 } else {
1189 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1190 }
1191 if (workerNode.usage.elu.idle.aggregate == null) {
1192 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1193 } else {
1194 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1195 }
1196 if (workerNode.usage.elu.utilization == null) {
1197 expect(workerNode.usage.elu.utilization).toBeUndefined()
1198 } else {
1199 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1200 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1201 }
1202 }
1203 expect(
1204 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1205 pool.workerChoiceStrategyContext.workerChoiceStrategy
1206 ).workersVirtualTaskEndTimestamp.length
1207 ).toBe(pool.workerNodes.length)
1208 // We need to clean up the resources after our test
1209 await pool.destroy()
1210 })
1211
1212 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1213 const pool = new DynamicThreadPool(
1214 min,
1215 max,
1216 './tests/worker-files/thread/testWorker.js',
1217 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1218 )
1219 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1220 const promises = new Set()
1221 const maxMultiplier = 2
1222 for (let i = 0; i < max * maxMultiplier; i++) {
1223 promises.add(pool.execute())
1224 }
1225 await Promise.all(promises)
1226 for (const workerNode of pool.workerNodes) {
1227 expect(workerNode.usage).toStrictEqual({
1228 tasks: {
1229 executed: expect.any(Number),
1230 executing: 0,
1231 queued: 0,
1232 maxQueued: 0,
1233 stolen: 0,
1234 failed: 0
1235 },
1236 runTime: expect.objectContaining({
1237 history: expect.any(CircularArray)
1238 }),
1239 waitTime: {
1240 history: new CircularArray()
1241 },
1242 elu: expect.objectContaining({
1243 idle: expect.objectContaining({
1244 history: expect.any(CircularArray)
1245 }),
1246 active: expect.objectContaining({
1247 history: expect.any(CircularArray)
1248 })
1249 })
1250 })
1251 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1252 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1253 max * maxMultiplier
1254 )
1255 if (workerNode.usage.runTime.aggregate == null) {
1256 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1257 } else {
1258 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1259 }
1260 if (workerNode.usage.runTime.average == null) {
1261 expect(workerNode.usage.runTime.average).toBeUndefined()
1262 } else {
1263 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1264 }
1265 if (workerNode.usage.elu.active.aggregate == null) {
1266 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1267 } else {
1268 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1269 }
1270 if (workerNode.usage.elu.idle.aggregate == null) {
1271 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1272 } else {
1273 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1274 }
1275 if (workerNode.usage.elu.utilization == null) {
1276 expect(workerNode.usage.elu.utilization).toBeUndefined()
1277 } else {
1278 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1279 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1280 }
1281 }
1282 expect(
1283 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1284 pool.workerChoiceStrategyContext.workerChoiceStrategy
1285 ).workersVirtualTaskEndTimestamp.length
1286 ).toBe(pool.workerNodes.length)
1287 // We need to clean up the resources after our test
1288 await pool.destroy()
1289 })
1290
1291 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1292 const pool = new DynamicThreadPool(
1293 min,
1294 max,
1295 './tests/worker-files/thread/testWorker.js',
1296 {
1297 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1298 workerChoiceStrategyOptions: {
1299 runTime: { median: true }
1300 }
1301 }
1302 )
1303 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1304 const promises = new Set()
1305 const maxMultiplier = 2
1306 for (let i = 0; i < max * maxMultiplier; i++) {
1307 promises.add(pool.execute())
1308 }
1309 await Promise.all(promises)
1310 for (const workerNode of pool.workerNodes) {
1311 expect(workerNode.usage).toStrictEqual({
1312 tasks: {
1313 executed: expect.any(Number),
1314 executing: 0,
1315 queued: 0,
1316 maxQueued: 0,
1317 stolen: 0,
1318 failed: 0
1319 },
1320 runTime: expect.objectContaining({
1321 history: expect.any(CircularArray)
1322 }),
1323 waitTime: {
1324 history: new CircularArray()
1325 },
1326 elu: expect.objectContaining({
1327 idle: expect.objectContaining({
1328 history: expect.any(CircularArray)
1329 }),
1330 active: expect.objectContaining({
1331 history: expect.any(CircularArray)
1332 })
1333 })
1334 })
1335 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1336 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1337 max * maxMultiplier
1338 )
1339 if (workerNode.usage.runTime.aggregate == null) {
1340 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1341 } else {
1342 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1343 }
1344 if (workerNode.usage.runTime.median == null) {
1345 expect(workerNode.usage.runTime.median).toBeUndefined()
1346 } else {
1347 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1348 }
1349 if (workerNode.usage.elu.active.aggregate == null) {
1350 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1351 } else {
1352 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1353 }
1354 if (workerNode.usage.elu.idle.aggregate == null) {
1355 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1356 } else {
1357 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1358 }
1359 if (workerNode.usage.elu.utilization == null) {
1360 expect(workerNode.usage.elu.utilization).toBeUndefined()
1361 } else {
1362 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1363 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1364 }
1365 }
1366 expect(
1367 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1368 pool.workerChoiceStrategyContext.workerChoiceStrategy
1369 ).workersVirtualTaskEndTimestamp.length
1370 ).toBe(pool.workerNodes.length)
1371 // We need to clean up the resources after our test
1372 await pool.destroy()
1373 })
1374
1375 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1376 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1377 let pool = new FixedThreadPool(
1378 max,
1379 './tests/worker-files/thread/testWorker.js'
1380 )
1381 expect(
1382 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1383 workerChoiceStrategy
1384 ).workersVirtualTaskEndTimestamp
1385 ).toBeInstanceOf(Array)
1386 expect(
1387 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1388 workerChoiceStrategy
1389 ).workersVirtualTaskEndTimestamp.length
1390 ).toBe(0)
1391 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1392 workerChoiceStrategy
1393 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1394 expect(
1395 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1396 workerChoiceStrategy
1397 ).workersVirtualTaskEndTimestamp.length
1398 ).toBe(1)
1399 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1400 expect(
1401 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1402 pool.workerChoiceStrategyContext.workerChoiceStrategy
1403 ).workersVirtualTaskEndTimestamp
1404 ).toBeInstanceOf(Array)
1405 expect(
1406 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1407 pool.workerChoiceStrategyContext.workerChoiceStrategy
1408 ).workersVirtualTaskEndTimestamp.length
1409 ).toBe(0)
1410 await pool.destroy()
1411 pool = new DynamicThreadPool(
1412 min,
1413 max,
1414 './tests/worker-files/thread/testWorker.js'
1415 )
1416 expect(
1417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1418 workerChoiceStrategy
1419 ).workersVirtualTaskEndTimestamp
1420 ).toBeInstanceOf(Array)
1421 expect(
1422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1423 workerChoiceStrategy
1424 ).workersVirtualTaskEndTimestamp.length
1425 ).toBe(0)
1426 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1427 workerChoiceStrategy
1428 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1429 expect(
1430 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1431 workerChoiceStrategy
1432 ).workersVirtualTaskEndTimestamp.length
1433 ).toBe(1)
1434 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1435 expect(
1436 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1437 pool.workerChoiceStrategyContext.workerChoiceStrategy
1438 ).workersVirtualTaskEndTimestamp
1439 ).toBeInstanceOf(Array)
1440 expect(
1441 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1442 pool.workerChoiceStrategyContext.workerChoiceStrategy
1443 ).workersVirtualTaskEndTimestamp.length
1444 ).toBe(0)
1445 // We need to clean up the resources after our test
1446 await pool.destroy()
1447 })
1448
1449 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1450 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1451 let pool = new FixedThreadPool(
1452 max,
1453 './tests/worker-files/thread/testWorker.js',
1454 { workerChoiceStrategy }
1455 )
1456 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1457 dynamicWorkerUsage: false,
1458 dynamicWorkerReady: true
1459 })
1460 await pool.destroy()
1461 pool = new DynamicThreadPool(
1462 min,
1463 max,
1464 './tests/worker-files/thread/testWorker.js',
1465 { workerChoiceStrategy }
1466 )
1467 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1468 dynamicWorkerUsage: false,
1469 dynamicWorkerReady: true
1470 })
1471 // We need to clean up the resources after our test
1472 await pool.destroy()
1473 })
1474
1475 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1476 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1477 let pool = new FixedThreadPool(
1478 max,
1479 './tests/worker-files/thread/testWorker.js',
1480 { workerChoiceStrategy }
1481 )
1482 expect(
1483 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1484 ).toStrictEqual({
1485 runTime: {
1486 aggregate: true,
1487 average: true,
1488 median: false
1489 },
1490 waitTime: {
1491 aggregate: false,
1492 average: false,
1493 median: false
1494 },
1495 elu: {
1496 aggregate: false,
1497 average: false,
1498 median: false
1499 }
1500 })
1501 await pool.destroy()
1502 pool = new DynamicThreadPool(
1503 min,
1504 max,
1505 './tests/worker-files/thread/testWorker.js',
1506 { workerChoiceStrategy }
1507 )
1508 expect(
1509 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1510 ).toStrictEqual({
1511 runTime: {
1512 aggregate: true,
1513 average: true,
1514 median: false
1515 },
1516 waitTime: {
1517 aggregate: false,
1518 average: false,
1519 median: false
1520 },
1521 elu: {
1522 aggregate: false,
1523 average: false,
1524 median: false
1525 }
1526 })
1527 // We need to clean up the resources after our test
1528 await pool.destroy()
1529 })
1530
1531 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1532 const pool = new FixedThreadPool(
1533 max,
1534 './tests/worker-files/thread/testWorker.js',
1535 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1536 )
1537 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1538 const promises = new Set()
1539 const maxMultiplier = 2
1540 for (let i = 0; i < max * maxMultiplier; i++) {
1541 promises.add(pool.execute())
1542 }
1543 await Promise.all(promises)
1544 for (const workerNode of pool.workerNodes) {
1545 expect(workerNode.usage).toStrictEqual({
1546 tasks: {
1547 executed: expect.any(Number),
1548 executing: 0,
1549 queued: 0,
1550 maxQueued: 0,
1551 stolen: 0,
1552 failed: 0
1553 },
1554 runTime: expect.objectContaining({
1555 history: expect.any(CircularArray)
1556 }),
1557 waitTime: {
1558 history: new CircularArray()
1559 },
1560 elu: {
1561 idle: {
1562 history: new CircularArray()
1563 },
1564 active: {
1565 history: new CircularArray()
1566 }
1567 }
1568 })
1569 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1570 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1571 max * maxMultiplier
1572 )
1573 if (workerNode.usage.runTime.aggregate == null) {
1574 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1575 } else {
1576 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1577 }
1578 if (workerNode.usage.runTime.average == null) {
1579 expect(workerNode.usage.runTime.average).toBeUndefined()
1580 } else {
1581 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1582 }
1583 }
1584 expect(
1585 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1586 pool.workerChoiceStrategyContext.workerChoiceStrategy
1587 ).nextWorkerNodeKey
1588 ).toBe(0)
1589 expect(
1590 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1591 pool.workerChoiceStrategyContext.workerChoiceStrategy
1592 ).previousWorkerNodeKey
1593 ).toBe(0)
1594 expect(
1595 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1596 pool.workerChoiceStrategyContext.workerChoiceStrategy
1597 ).defaultWorkerWeight
1598 ).toBeGreaterThan(0)
1599 expect(
1600 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1601 pool.workerChoiceStrategyContext.workerChoiceStrategy
1602 ).workerVirtualTaskRunTime
1603 ).toBeGreaterThanOrEqual(0)
1604 // We need to clean up the resources after our test
1605 await pool.destroy()
1606 })
1607
1608 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1609 const pool = new DynamicThreadPool(
1610 min,
1611 max,
1612 './tests/worker-files/thread/testWorker.js',
1613 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1614 )
1615 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1616 const promises = new Set()
1617 const maxMultiplier = 2
1618 for (let i = 0; i < max * maxMultiplier; i++) {
1619 promises.add(pool.execute())
1620 }
1621 await Promise.all(promises)
1622 for (const workerNode of pool.workerNodes) {
1623 expect(workerNode.usage).toStrictEqual({
1624 tasks: {
1625 executed: expect.any(Number),
1626 executing: 0,
1627 queued: 0,
1628 maxQueued: 0,
1629 stolen: 0,
1630 failed: 0
1631 },
1632 runTime: expect.objectContaining({
1633 history: expect.any(CircularArray)
1634 }),
1635 waitTime: {
1636 history: new CircularArray()
1637 },
1638 elu: {
1639 idle: {
1640 history: new CircularArray()
1641 },
1642 active: {
1643 history: new CircularArray()
1644 }
1645 }
1646 })
1647 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1648 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1649 max * maxMultiplier
1650 )
1651 if (workerNode.usage.runTime.aggregate == null) {
1652 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1653 } else {
1654 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1655 }
1656 if (workerNode.usage.runTime.average == null) {
1657 expect(workerNode.usage.runTime.average).toBeUndefined()
1658 } else {
1659 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1660 }
1661 }
1662 expect(
1663 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1664 pool.workerChoiceStrategyContext.workerChoiceStrategy
1665 ).nextWorkerNodeKey
1666 ).toBe(0)
1667 expect(
1668 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1669 pool.workerChoiceStrategyContext.workerChoiceStrategy
1670 ).previousWorkerNodeKey
1671 ).toBe(0)
1672 expect(
1673 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1674 pool.workerChoiceStrategyContext.workerChoiceStrategy
1675 ).defaultWorkerWeight
1676 ).toBeGreaterThan(0)
1677 expect(
1678 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1679 pool.workerChoiceStrategyContext.workerChoiceStrategy
1680 ).workerVirtualTaskRunTime
1681 ).toBeGreaterThanOrEqual(0)
1682 // We need to clean up the resources after our test
1683 await pool.destroy()
1684 })
1685
1686 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1687 const pool = new DynamicThreadPool(
1688 min,
1689 max,
1690 './tests/worker-files/thread/testWorker.js',
1691 {
1692 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1693 workerChoiceStrategyOptions: {
1694 runTime: { median: true }
1695 }
1696 }
1697 )
1698 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1699 const promises = new Set()
1700 const maxMultiplier = 2
1701 for (let i = 0; i < max * maxMultiplier; i++) {
1702 promises.add(pool.execute())
1703 }
1704 await Promise.all(promises)
1705 for (const workerNode of pool.workerNodes) {
1706 expect(workerNode.usage).toStrictEqual({
1707 tasks: {
1708 executed: expect.any(Number),
1709 executing: 0,
1710 queued: 0,
1711 maxQueued: 0,
1712 stolen: 0,
1713 failed: 0
1714 },
1715 runTime: expect.objectContaining({
1716 history: expect.any(CircularArray)
1717 }),
1718 waitTime: {
1719 history: new CircularArray()
1720 },
1721 elu: {
1722 idle: {
1723 history: new CircularArray()
1724 },
1725 active: {
1726 history: new CircularArray()
1727 }
1728 }
1729 })
1730 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1731 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1732 max * maxMultiplier
1733 )
1734 if (workerNode.usage.runTime.aggregate == null) {
1735 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1736 } else {
1737 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1738 }
1739 if (workerNode.usage.runTime.median == null) {
1740 expect(workerNode.usage.runTime.median).toBeUndefined()
1741 } else {
1742 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1743 }
1744 }
1745 expect(
1746 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1747 pool.workerChoiceStrategyContext.workerChoiceStrategy
1748 ).nextWorkerNodeKey
1749 ).toBe(0)
1750 expect(
1751 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1752 pool.workerChoiceStrategyContext.workerChoiceStrategy
1753 ).previousWorkerNodeKey
1754 ).toBe(0)
1755 expect(
1756 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1757 pool.workerChoiceStrategyContext.workerChoiceStrategy
1758 ).defaultWorkerWeight
1759 ).toBeGreaterThan(0)
1760 expect(
1761 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1762 pool.workerChoiceStrategyContext.workerChoiceStrategy
1763 ).workerVirtualTaskRunTime
1764 ).toBeGreaterThanOrEqual(0)
1765 // We need to clean up the resources after our test
1766 await pool.destroy()
1767 })
1768
1769 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1770 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1771 let pool = new FixedThreadPool(
1772 max,
1773 './tests/worker-files/thread/testWorker.js'
1774 )
1775 expect(
1776 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1777 workerChoiceStrategy
1778 ).nextWorkerNodeKey
1779 ).toBeDefined()
1780 expect(
1781 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1782 workerChoiceStrategy
1783 ).previousWorkerNodeKey
1784 ).toBeDefined()
1785 expect(
1786 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1787 workerChoiceStrategy
1788 ).defaultWorkerWeight
1789 ).toBeDefined()
1790 expect(
1791 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1792 workerChoiceStrategy
1793 ).workerVirtualTaskRunTime
1794 ).toBeDefined()
1795 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1796 expect(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1798 pool.workerChoiceStrategyContext.workerChoiceStrategy
1799 ).nextWorkerNodeKey
1800 ).toBe(0)
1801 expect(
1802 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1803 pool.workerChoiceStrategyContext.workerChoiceStrategy
1804 ).previousWorkerNodeKey
1805 ).toBe(0)
1806 expect(
1807 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1808 pool.workerChoiceStrategyContext.workerChoiceStrategy
1809 ).defaultWorkerWeight
1810 ).toBeGreaterThan(0)
1811 expect(
1812 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1813 pool.workerChoiceStrategyContext.workerChoiceStrategy
1814 ).workerVirtualTaskRunTime
1815 ).toBe(0)
1816 await pool.destroy()
1817 pool = new DynamicThreadPool(
1818 min,
1819 max,
1820 './tests/worker-files/thread/testWorker.js'
1821 )
1822 expect(
1823 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1824 workerChoiceStrategy
1825 ).nextWorkerNodeKey
1826 ).toBeDefined()
1827 expect(
1828 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1829 workerChoiceStrategy
1830 ).previousWorkerNodeKey
1831 ).toBeDefined()
1832 expect(
1833 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1834 workerChoiceStrategy
1835 ).defaultWorkerWeight
1836 ).toBeDefined()
1837 expect(
1838 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1839 workerChoiceStrategy
1840 ).workerVirtualTaskRunTime
1841 ).toBeDefined()
1842 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1843 expect(
1844 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1845 pool.workerChoiceStrategyContext.workerChoiceStrategy
1846 ).nextWorkerNodeKey
1847 ).toBe(0)
1848 expect(
1849 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1850 pool.workerChoiceStrategyContext.workerChoiceStrategy
1851 ).previousWorkerNodeKey
1852 ).toBe(0)
1853 expect(
1854 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1855 pool.workerChoiceStrategyContext.workerChoiceStrategy
1856 ).defaultWorkerWeight
1857 ).toBeGreaterThan(0)
1858 expect(
1859 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1860 pool.workerChoiceStrategyContext.workerChoiceStrategy
1861 ).workerVirtualTaskRunTime
1862 ).toBe(0)
1863 // We need to clean up the resources after our test
1864 await pool.destroy()
1865 })
1866
1867 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1868 const workerChoiceStrategy =
1869 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1870 let pool = new FixedThreadPool(
1871 max,
1872 './tests/worker-files/thread/testWorker.js',
1873 { workerChoiceStrategy }
1874 )
1875 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1876 dynamicWorkerUsage: false,
1877 dynamicWorkerReady: true
1878 })
1879 await pool.destroy()
1880 pool = new DynamicThreadPool(
1881 min,
1882 max,
1883 './tests/worker-files/thread/testWorker.js',
1884 { workerChoiceStrategy }
1885 )
1886 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1887 dynamicWorkerUsage: false,
1888 dynamicWorkerReady: true
1889 })
1890 // We need to clean up the resources after our test
1891 await pool.destroy()
1892 })
1893
1894 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1895 const workerChoiceStrategy =
1896 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1897 let pool = new FixedThreadPool(
1898 max,
1899 './tests/worker-files/thread/testWorker.js',
1900 { workerChoiceStrategy }
1901 )
1902 expect(
1903 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1904 ).toStrictEqual({
1905 runTime: {
1906 aggregate: true,
1907 average: true,
1908 median: false
1909 },
1910 waitTime: {
1911 aggregate: false,
1912 average: false,
1913 median: false
1914 },
1915 elu: {
1916 aggregate: false,
1917 average: false,
1918 median: false
1919 }
1920 })
1921 await pool.destroy()
1922 pool = new DynamicThreadPool(
1923 min,
1924 max,
1925 './tests/worker-files/thread/testWorker.js',
1926 { workerChoiceStrategy }
1927 )
1928 expect(
1929 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1930 ).toStrictEqual({
1931 runTime: {
1932 aggregate: true,
1933 average: true,
1934 median: false
1935 },
1936 waitTime: {
1937 aggregate: false,
1938 average: false,
1939 median: false
1940 },
1941 elu: {
1942 aggregate: false,
1943 average: false,
1944 median: false
1945 }
1946 })
1947 // We need to clean up the resources after our test
1948 await pool.destroy()
1949 })
1950
1951 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1952 const pool = new FixedThreadPool(
1953 max,
1954 './tests/worker-files/thread/testWorker.js',
1955 {
1956 workerChoiceStrategy:
1957 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1958 }
1959 )
1960 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1961 const promises = new Set()
1962 const maxMultiplier = 2
1963 for (let i = 0; i < max * maxMultiplier; i++) {
1964 promises.add(pool.execute())
1965 }
1966 await Promise.all(promises)
1967 for (const workerNode of pool.workerNodes) {
1968 expect(workerNode.usage).toStrictEqual({
1969 tasks: {
1970 executed: expect.any(Number),
1971 executing: 0,
1972 queued: 0,
1973 maxQueued: 0,
1974 stolen: 0,
1975 failed: 0
1976 },
1977 runTime: expect.objectContaining({
1978 history: expect.any(CircularArray)
1979 }),
1980 waitTime: {
1981 history: new CircularArray()
1982 },
1983 elu: {
1984 idle: {
1985 history: new CircularArray()
1986 },
1987 active: {
1988 history: new CircularArray()
1989 }
1990 }
1991 })
1992 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1993 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1994 max * maxMultiplier
1995 )
1996 }
1997 expect(
1998 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1999 pool.workerChoiceStrategyContext.workerChoiceStrategy
2000 ).defaultWorkerWeight
2001 ).toBeGreaterThan(0)
2002 expect(
2003 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2004 pool.workerChoiceStrategyContext.workerChoiceStrategy
2005 ).roundId
2006 ).toBe(0)
2007 expect(
2008 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2009 pool.workerChoiceStrategyContext.workerChoiceStrategy
2010 ).workerNodeId
2011 ).toBe(0)
2012 expect(
2013 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2014 pool.workerChoiceStrategyContext.workerChoiceStrategy
2015 ).nextWorkerNodeKey
2016 ).toBe(0)
2017 expect(
2018 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2019 pool.workerChoiceStrategyContext.workerChoiceStrategy
2020 ).previousWorkerNodeKey
2021 ).toEqual(expect.any(Number))
2022 expect(
2023 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2024 pool.workerChoiceStrategyContext.workerChoiceStrategy
2025 ).roundWeights
2026 ).toStrictEqual([
2027 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2028 pool.workerChoiceStrategyContext.workerChoiceStrategy
2029 ).defaultWorkerWeight
2030 ])
2031 // We need to clean up the resources after our test
2032 await pool.destroy()
2033 })
2034
2035 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2036 const pool = new DynamicThreadPool(
2037 min,
2038 max,
2039 './tests/worker-files/thread/testWorker.js',
2040 {
2041 workerChoiceStrategy:
2042 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2043 }
2044 )
2045 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2046 const promises = new Set()
2047 const maxMultiplier = 2
2048 for (let i = 0; i < max * maxMultiplier; i++) {
2049 promises.add(pool.execute())
2050 }
2051 await Promise.all(promises)
2052 for (const workerNode of pool.workerNodes) {
2053 expect(workerNode.usage).toStrictEqual({
2054 tasks: {
2055 executed: expect.any(Number),
2056 executing: 0,
2057 queued: 0,
2058 maxQueued: 0,
2059 stolen: 0,
2060 failed: 0
2061 },
2062 runTime: expect.objectContaining({
2063 history: expect.any(CircularArray)
2064 }),
2065 waitTime: {
2066 history: new CircularArray()
2067 },
2068 elu: {
2069 idle: {
2070 history: new CircularArray()
2071 },
2072 active: {
2073 history: new CircularArray()
2074 }
2075 }
2076 })
2077 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2078 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2079 max * maxMultiplier
2080 )
2081 }
2082 expect(
2083 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2084 pool.workerChoiceStrategyContext.workerChoiceStrategy
2085 ).defaultWorkerWeight
2086 ).toBeGreaterThan(0)
2087 expect(
2088 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2089 pool.workerChoiceStrategyContext.workerChoiceStrategy
2090 ).roundId
2091 ).toBe(0)
2092 expect(
2093 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2094 pool.workerChoiceStrategyContext.workerChoiceStrategy
2095 ).workerNodeId
2096 ).toBe(0)
2097 expect(
2098 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2099 pool.workerChoiceStrategyContext.workerChoiceStrategy
2100 ).nextWorkerNodeKey
2101 ).toBe(0)
2102 expect(
2103 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2104 pool.workerChoiceStrategyContext.workerChoiceStrategy
2105 ).previousWorkerNodeKey
2106 ).toEqual(expect.any(Number))
2107 expect(
2108 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2109 pool.workerChoiceStrategyContext.workerChoiceStrategy
2110 ).roundWeights
2111 ).toStrictEqual([
2112 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2113 pool.workerChoiceStrategyContext.workerChoiceStrategy
2114 ).defaultWorkerWeight
2115 ])
2116 // We need to clean up the resources after our test
2117 await pool.destroy()
2118 })
2119
2120 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2121 const workerChoiceStrategy =
2122 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2123 let pool = new FixedThreadPool(
2124 max,
2125 './tests/worker-files/thread/testWorker.js'
2126 )
2127 expect(
2128 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2129 workerChoiceStrategy
2130 ).roundId
2131 ).toBeDefined()
2132 expect(
2133 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2134 workerChoiceStrategy
2135 ).workerNodeId
2136 ).toBeDefined()
2137 expect(
2138 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2139 workerChoiceStrategy
2140 ).nextWorkerNodeKey
2141 ).toBeDefined()
2142 expect(
2143 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2144 workerChoiceStrategy
2145 ).previousWorkerNodeKey
2146 ).toBeDefined()
2147 expect(
2148 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2149 workerChoiceStrategy
2150 ).defaultWorkerWeight
2151 ).toBeDefined()
2152 expect(
2153 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2154 workerChoiceStrategy
2155 ).roundWeights
2156 ).toBeDefined()
2157 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2158 expect(
2159 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2160 pool.workerChoiceStrategyContext.workerChoiceStrategy
2161 ).roundId
2162 ).toBe(0)
2163 expect(
2164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2165 pool.workerChoiceStrategyContext.workerChoiceStrategy
2166 ).workerNodeId
2167 ).toBe(0)
2168 expect(
2169 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2170 pool.workerChoiceStrategyContext.workerChoiceStrategy
2171 ).nextWorkerNodeKey
2172 ).toBe(0)
2173 expect(
2174 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2175 pool.workerChoiceStrategyContext.workerChoiceStrategy
2176 ).previousWorkerNodeKey
2177 ).toBe(0)
2178 expect(
2179 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2180 pool.workerChoiceStrategyContext.workerChoiceStrategy
2181 ).defaultWorkerWeight
2182 ).toBeGreaterThan(0)
2183 expect(
2184 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2185 pool.workerChoiceStrategyContext.workerChoiceStrategy
2186 ).roundWeights
2187 ).toStrictEqual([
2188 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2189 pool.workerChoiceStrategyContext.workerChoiceStrategy
2190 ).defaultWorkerWeight
2191 ])
2192 await pool.destroy()
2193 pool = new DynamicThreadPool(
2194 min,
2195 max,
2196 './tests/worker-files/thread/testWorker.js'
2197 )
2198 expect(
2199 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2200 workerChoiceStrategy
2201 ).roundId
2202 ).toBeDefined()
2203 expect(
2204 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2205 workerChoiceStrategy
2206 ).workerNodeId
2207 ).toBeDefined()
2208 expect(
2209 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2210 workerChoiceStrategy
2211 ).nextWorkerNodeKey
2212 ).toBeDefined()
2213 expect(
2214 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2215 workerChoiceStrategy
2216 ).previousWorkerNodeKey
2217 ).toBeDefined()
2218 expect(
2219 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2220 workerChoiceStrategy
2221 ).defaultWorkerWeight
2222 ).toBeDefined()
2223 expect(
2224 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2225 workerChoiceStrategy
2226 ).roundWeights
2227 ).toBeDefined()
2228 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2229 expect(
2230 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2231 pool.workerChoiceStrategyContext.workerChoiceStrategy
2232 ).roundId
2233 ).toBe(0)
2234 expect(
2235 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2236 pool.workerChoiceStrategyContext.workerChoiceStrategy
2237 ).workerNodeId
2238 ).toBe(0)
2239 expect(
2240 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2241 pool.workerChoiceStrategyContext.workerChoiceStrategy
2242 ).nextWorkerNodeKey
2243 ).toBe(0)
2244 expect(
2245 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2246 pool.workerChoiceStrategyContext.workerChoiceStrategy
2247 ).previousWorkerNodeKey
2248 ).toBe(0)
2249 expect(
2250 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2251 pool.workerChoiceStrategyContext.workerChoiceStrategy
2252 ).defaultWorkerWeight
2253 ).toBeGreaterThan(0)
2254 expect(
2255 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2256 pool.workerChoiceStrategyContext.workerChoiceStrategy
2257 ).roundWeights
2258 ).toStrictEqual([
2259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2260 pool.workerChoiceStrategyContext.workerChoiceStrategy
2261 ).defaultWorkerWeight
2262 ])
2263 // We need to clean up the resources after our test
2264 await pool.destroy()
2265 })
2266
2267 it('Verify unknown strategy throw error', () => {
2268 expect(
2269 () =>
2270 new DynamicThreadPool(
2271 min,
2272 max,
2273 './tests/worker-files/thread/testWorker.js',
2274 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2275 )
2276 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2277 })
2278})