test: code cleanup
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } = require('../../../lib')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.js'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.js',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.js'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.js'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 expect(
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
118 workerChoiceStrategy
119 ).nextWorkerNodeKey
120 ).toBe(0)
121 expect(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).previousWorkerNodeKey
125 ).toBe(0)
126 if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
127 expect(
128 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
129 workerChoiceStrategy
130 ).workersVirtualTaskEndTimestamp
131 ).toBeInstanceOf(Array)
132 expect(
133 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
134 workerChoiceStrategy
135 ).workersVirtualTaskEndTimestamp.length
136 ).toBe(0)
137 } else if (
138 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
139 ) {
140 expect(
141 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
142 workerChoiceStrategy
143 ).defaultWorkerWeight
144 ).toBeGreaterThan(0)
145 expect(
146 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
147 workerChoiceStrategy
148 ).workerVirtualTaskRunTime
149 ).toBe(0)
150 } else if (
151 workerChoiceStrategy ===
152 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
153 ) {
154 expect(
155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
156 workerChoiceStrategy
157 ).defaultWorkerWeight
158 ).toBeGreaterThan(0)
159 expect(
160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
161 workerChoiceStrategy
162 ).workerVirtualTaskRunTime
163 ).toBe(0)
164 expect(
165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
166 workerChoiceStrategy
167 ).roundId
168 ).toBe(0)
169 expect(
170 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
171 workerChoiceStrategy
172 ).workerNodeId
173 ).toBe(0)
174 expect(
175 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
176 workerChoiceStrategy
177 ).roundWeights
178 ).toStrictEqual([
179 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
180 workerChoiceStrategy
181 ).defaultWorkerWeight
182 ])
183 }
184 }
185 await pool.destroy()
186 })
187
188 it('Verify ROUND_ROBIN strategy default policy', async () => {
189 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
190 let pool = new FixedThreadPool(
191 max,
192 './tests/worker-files/thread/testWorker.js',
193 { workerChoiceStrategy }
194 )
195 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
196 dynamicWorkerUsage: false,
197 dynamicWorkerReady: true
198 })
199 await pool.destroy()
200 pool = new DynamicThreadPool(
201 min,
202 max,
203 './tests/worker-files/thread/testWorker.js',
204 { workerChoiceStrategy }
205 )
206 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
207 dynamicWorkerUsage: false,
208 dynamicWorkerReady: true
209 })
210 // We need to clean up the resources after our test
211 await pool.destroy()
212 })
213
214 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
215 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
216 let pool = new FixedThreadPool(
217 max,
218 './tests/worker-files/thread/testWorker.js',
219 { workerChoiceStrategy }
220 )
221 expect(
222 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
223 ).toStrictEqual({
224 runTime: {
225 aggregate: false,
226 average: false,
227 median: false
228 },
229 waitTime: {
230 aggregate: false,
231 average: false,
232 median: false
233 },
234 elu: {
235 aggregate: false,
236 average: false,
237 median: false
238 }
239 })
240 await pool.destroy()
241 pool = new DynamicThreadPool(
242 min,
243 max,
244 './tests/worker-files/thread/testWorker.js',
245 { workerChoiceStrategy }
246 )
247 expect(
248 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
249 ).toStrictEqual({
250 runTime: {
251 aggregate: false,
252 average: false,
253 median: false
254 },
255 waitTime: {
256 aggregate: false,
257 average: false,
258 median: false
259 },
260 elu: {
261 aggregate: false,
262 average: false,
263 median: false
264 }
265 })
266 // We need to clean up the resources after our test
267 await pool.destroy()
268 })
269
270 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
271 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
272 const pool = new FixedThreadPool(
273 max,
274 './tests/worker-files/thread/testWorker.js',
275 { workerChoiceStrategy }
276 )
277 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
278 const promises = new Set()
279 const maxMultiplier = 2
280 for (let i = 0; i < max * maxMultiplier; i++) {
281 promises.add(pool.execute())
282 }
283 await Promise.all(promises)
284 for (const workerNode of pool.workerNodes) {
285 expect(workerNode.usage).toStrictEqual({
286 tasks: {
287 executed: maxMultiplier,
288 executing: 0,
289 queued: 0,
290 maxQueued: 0,
291 stolen: 0,
292 failed: 0
293 },
294 runTime: {
295 history: new CircularArray()
296 },
297 waitTime: {
298 history: new CircularArray()
299 },
300 elu: {
301 idle: {
302 history: new CircularArray()
303 },
304 active: {
305 history: new CircularArray()
306 }
307 }
308 })
309 }
310 expect(
311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
312 pool.workerChoiceStrategyContext.workerChoiceStrategy
313 ).nextWorkerNodeKey
314 ).toBe(0)
315 expect(
316 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
317 pool.workerChoiceStrategyContext.workerChoiceStrategy
318 ).previousWorkerNodeKey
319 ).toBe(pool.workerNodes.length - 1)
320 // We need to clean up the resources after our test
321 await pool.destroy()
322 })
323
324 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
325 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
326 const pool = new DynamicThreadPool(
327 min,
328 max,
329 './tests/worker-files/thread/testWorker.js',
330 { workerChoiceStrategy }
331 )
332 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
333 const promises = new Set()
334 const maxMultiplier = 2
335 for (let i = 0; i < max * maxMultiplier; i++) {
336 promises.add(pool.execute())
337 }
338 await Promise.all(promises)
339 for (const workerNode of pool.workerNodes) {
340 expect(workerNode.usage).toStrictEqual({
341 tasks: {
342 executed: expect.any(Number),
343 executing: 0,
344 queued: 0,
345 maxQueued: 0,
346 stolen: 0,
347 failed: 0
348 },
349 runTime: {
350 history: new CircularArray()
351 },
352 waitTime: {
353 history: new CircularArray()
354 },
355 elu: {
356 idle: {
357 history: new CircularArray()
358 },
359 active: {
360 history: new CircularArray()
361 }
362 }
363 })
364 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
365 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
366 max * maxMultiplier
367 )
368 }
369 expect(
370 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
371 pool.workerChoiceStrategyContext.workerChoiceStrategy
372 ).nextWorkerNodeKey
373 ).toBe(0)
374 expect(
375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
376 pool.workerChoiceStrategyContext.workerChoiceStrategy
377 ).previousWorkerNodeKey
378 ).toBe(pool.workerNodes.length - 1)
379 // We need to clean up the resources after our test
380 await pool.destroy()
381 })
382
383 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
385 let pool = new FixedClusterPool(
386 max,
387 './tests/worker-files/cluster/testWorker.js',
388 { workerChoiceStrategy }
389 )
390 let results = new Set()
391 for (let i = 0; i < max; i++) {
392 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
393 }
394 expect(results.size).toBe(max)
395 await pool.destroy()
396 pool = new FixedThreadPool(
397 max,
398 './tests/worker-files/thread/testWorker.js',
399 { workerChoiceStrategy }
400 )
401 results = new Set()
402 for (let i = 0; i < max; i++) {
403 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
404 }
405 expect(results.size).toBe(max)
406 await pool.destroy()
407 })
408
409 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
410 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
411 let pool = new FixedThreadPool(
412 max,
413 './tests/worker-files/thread/testWorker.js',
414 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
415 )
416 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
417 expect(
418 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
419 pool.workerChoiceStrategyContext.workerChoiceStrategy
420 ).nextWorkerNodeKey
421 ).toBe(0)
422 expect(
423 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
424 pool.workerChoiceStrategyContext.workerChoiceStrategy
425 ).previousWorkerNodeKey
426 ).toBe(0)
427 await pool.destroy()
428 pool = new DynamicThreadPool(
429 min,
430 max,
431 './tests/worker-files/thread/testWorker.js',
432 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
433 )
434 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
435 expect(
436 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
437 pool.workerChoiceStrategyContext.workerChoiceStrategy
438 ).nextWorkerNodeKey
439 ).toBe(0)
440 expect(
441 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
442 pool.workerChoiceStrategyContext.workerChoiceStrategy
443 ).previousWorkerNodeKey
444 ).toBe(0)
445 // We need to clean up the resources after our test
446 await pool.destroy()
447 })
448
449 it('Verify LEAST_USED strategy default policy', async () => {
450 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
451 let pool = new FixedThreadPool(
452 max,
453 './tests/worker-files/thread/testWorker.js',
454 { workerChoiceStrategy }
455 )
456 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
457 dynamicWorkerUsage: false,
458 dynamicWorkerReady: true
459 })
460 await pool.destroy()
461 pool = new DynamicThreadPool(
462 min,
463 max,
464 './tests/worker-files/thread/testWorker.js',
465 { workerChoiceStrategy }
466 )
467 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
468 dynamicWorkerUsage: false,
469 dynamicWorkerReady: true
470 })
471 // We need to clean up the resources after our test
472 await pool.destroy()
473 })
474
475 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
476 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
477 let pool = new FixedThreadPool(
478 max,
479 './tests/worker-files/thread/testWorker.js',
480 { workerChoiceStrategy }
481 )
482 expect(
483 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
484 ).toStrictEqual({
485 runTime: {
486 aggregate: false,
487 average: false,
488 median: false
489 },
490 waitTime: {
491 aggregate: false,
492 average: false,
493 median: false
494 },
495 elu: {
496 aggregate: false,
497 average: false,
498 median: false
499 }
500 })
501 await pool.destroy()
502 pool = new DynamicThreadPool(
503 min,
504 max,
505 './tests/worker-files/thread/testWorker.js',
506 { workerChoiceStrategy }
507 )
508 expect(
509 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
510 ).toStrictEqual({
511 runTime: {
512 aggregate: false,
513 average: false,
514 median: false
515 },
516 waitTime: {
517 aggregate: false,
518 average: false,
519 median: false
520 },
521 elu: {
522 aggregate: false,
523 average: false,
524 median: false
525 }
526 })
527 // We need to clean up the resources after our test
528 await pool.destroy()
529 })
530
531 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
532 const pool = new FixedThreadPool(
533 max,
534 './tests/worker-files/thread/testWorker.js',
535 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
536 )
537 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
538 const promises = new Set()
539 const maxMultiplier = 2
540 for (let i = 0; i < max * maxMultiplier; i++) {
541 promises.add(pool.execute())
542 }
543 await Promise.all(promises)
544 for (const workerNode of pool.workerNodes) {
545 expect(workerNode.usage).toStrictEqual({
546 tasks: {
547 executed: expect.any(Number),
548 executing: 0,
549 queued: 0,
550 maxQueued: 0,
551 stolen: 0,
552 failed: 0
553 },
554 runTime: {
555 history: new CircularArray()
556 },
557 waitTime: {
558 history: new CircularArray()
559 },
560 elu: {
561 idle: {
562 history: new CircularArray()
563 },
564 active: {
565 history: new CircularArray()
566 }
567 }
568 })
569 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
570 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
571 max * maxMultiplier
572 )
573 }
574 // We need to clean up the resources after our test
575 await pool.destroy()
576 })
577
578 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
579 const pool = new DynamicThreadPool(
580 min,
581 max,
582 './tests/worker-files/thread/testWorker.js',
583 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
584 )
585 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
586 const promises = new Set()
587 const maxMultiplier = 2
588 for (let i = 0; i < max * maxMultiplier; i++) {
589 promises.add(pool.execute())
590 }
591 await Promise.all(promises)
592 for (const workerNode of pool.workerNodes) {
593 expect(workerNode.usage).toStrictEqual({
594 tasks: {
595 executed: expect.any(Number),
596 executing: 0,
597 queued: 0,
598 maxQueued: 0,
599 stolen: 0,
600 failed: 0
601 },
602 runTime: {
603 history: new CircularArray()
604 },
605 waitTime: {
606 history: new CircularArray()
607 },
608 elu: {
609 idle: {
610 history: new CircularArray()
611 },
612 active: {
613 history: new CircularArray()
614 }
615 }
616 })
617 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
618 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
619 max * maxMultiplier
620 )
621 }
622 // We need to clean up the resources after our test
623 await pool.destroy()
624 })
625
626 it('Verify LEAST_BUSY strategy default policy', async () => {
627 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
628 let pool = new FixedThreadPool(
629 max,
630 './tests/worker-files/thread/testWorker.js',
631 { workerChoiceStrategy }
632 )
633 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
634 dynamicWorkerUsage: false,
635 dynamicWorkerReady: true
636 })
637 await pool.destroy()
638 pool = new DynamicThreadPool(
639 min,
640 max,
641 './tests/worker-files/thread/testWorker.js',
642 { workerChoiceStrategy }
643 )
644 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
645 dynamicWorkerUsage: false,
646 dynamicWorkerReady: true
647 })
648 // We need to clean up the resources after our test
649 await pool.destroy()
650 })
651
652 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
653 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
654 let pool = new FixedThreadPool(
655 max,
656 './tests/worker-files/thread/testWorker.js',
657 { workerChoiceStrategy }
658 )
659 expect(
660 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
661 ).toStrictEqual({
662 runTime: {
663 aggregate: true,
664 average: false,
665 median: false
666 },
667 waitTime: {
668 aggregate: true,
669 average: false,
670 median: false
671 },
672 elu: {
673 aggregate: false,
674 average: false,
675 median: false
676 }
677 })
678 await pool.destroy()
679 pool = new DynamicThreadPool(
680 min,
681 max,
682 './tests/worker-files/thread/testWorker.js',
683 { workerChoiceStrategy }
684 )
685 expect(
686 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
687 ).toStrictEqual({
688 runTime: {
689 aggregate: true,
690 average: false,
691 median: false
692 },
693 waitTime: {
694 aggregate: true,
695 average: false,
696 median: false
697 },
698 elu: {
699 aggregate: false,
700 average: false,
701 median: false
702 }
703 })
704 // We need to clean up the resources after our test
705 await pool.destroy()
706 })
707
708 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
709 const pool = new FixedThreadPool(
710 max,
711 './tests/worker-files/thread/testWorker.js',
712 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
713 )
714 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
715 const promises = new Set()
716 const maxMultiplier = 2
717 for (let i = 0; i < max * maxMultiplier; i++) {
718 promises.add(pool.execute())
719 }
720 await Promise.all(promises)
721 for (const workerNode of pool.workerNodes) {
722 expect(workerNode.usage).toStrictEqual({
723 tasks: {
724 executed: expect.any(Number),
725 executing: 0,
726 queued: 0,
727 maxQueued: 0,
728 stolen: 0,
729 failed: 0
730 },
731 runTime: expect.objectContaining({
732 history: expect.any(CircularArray)
733 }),
734 waitTime: expect.objectContaining({
735 history: expect.any(CircularArray)
736 }),
737 elu: {
738 idle: {
739 history: new CircularArray()
740 },
741 active: {
742 history: new CircularArray()
743 }
744 }
745 })
746 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
747 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
748 max * maxMultiplier
749 )
750 if (workerNode.usage.runTime.aggregate == null) {
751 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
752 } else {
753 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
754 }
755 if (workerNode.usage.waitTime.aggregate == null) {
756 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
757 } else {
758 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
759 }
760 }
761 // We need to clean up the resources after our test
762 await pool.destroy()
763 })
764
765 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
766 const pool = new DynamicThreadPool(
767 min,
768 max,
769 './tests/worker-files/thread/testWorker.js',
770 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
771 )
772 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
773 const promises = new Set()
774 const maxMultiplier = 2
775 for (let i = 0; i < max * maxMultiplier; i++) {
776 promises.add(pool.execute())
777 }
778 await Promise.all(promises)
779 for (const workerNode of pool.workerNodes) {
780 expect(workerNode.usage).toStrictEqual({
781 tasks: {
782 executed: expect.any(Number),
783 executing: 0,
784 queued: 0,
785 maxQueued: 0,
786 stolen: 0,
787 failed: 0
788 },
789 runTime: expect.objectContaining({
790 history: expect.any(CircularArray)
791 }),
792 waitTime: expect.objectContaining({
793 history: expect.any(CircularArray)
794 }),
795 elu: {
796 idle: {
797 history: new CircularArray()
798 },
799 active: {
800 history: new CircularArray()
801 }
802 }
803 })
804 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
805 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
806 max * maxMultiplier
807 )
808 if (workerNode.usage.runTime.aggregate == null) {
809 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
810 } else {
811 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
812 }
813 if (workerNode.usage.waitTime.aggregate == null) {
814 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
815 } else {
816 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
817 }
818 }
819 // We need to clean up the resources after our test
820 await pool.destroy()
821 })
822
823 it('Verify LEAST_ELU strategy default policy', async () => {
824 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
825 let pool = new FixedThreadPool(
826 max,
827 './tests/worker-files/thread/testWorker.js',
828 { workerChoiceStrategy }
829 )
830 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
831 dynamicWorkerUsage: false,
832 dynamicWorkerReady: true
833 })
834 await pool.destroy()
835 pool = new DynamicThreadPool(
836 min,
837 max,
838 './tests/worker-files/thread/testWorker.js',
839 { workerChoiceStrategy }
840 )
841 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
842 dynamicWorkerUsage: false,
843 dynamicWorkerReady: true
844 })
845 // We need to clean up the resources after our test
846 await pool.destroy()
847 })
848
849 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
850 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
851 let pool = new FixedThreadPool(
852 max,
853 './tests/worker-files/thread/testWorker.js',
854 { workerChoiceStrategy }
855 )
856 expect(
857 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
858 ).toStrictEqual({
859 runTime: {
860 aggregate: false,
861 average: false,
862 median: false
863 },
864 waitTime: {
865 aggregate: false,
866 average: false,
867 median: false
868 },
869 elu: {
870 aggregate: true,
871 average: false,
872 median: false
873 }
874 })
875 await pool.destroy()
876 pool = new DynamicThreadPool(
877 min,
878 max,
879 './tests/worker-files/thread/testWorker.js',
880 { workerChoiceStrategy }
881 )
882 expect(
883 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
884 ).toStrictEqual({
885 runTime: {
886 aggregate: false,
887 average: false,
888 median: false
889 },
890 waitTime: {
891 aggregate: false,
892 average: false,
893 median: false
894 },
895 elu: {
896 aggregate: true,
897 average: false,
898 median: false
899 }
900 })
901 // We need to clean up the resources after our test
902 await pool.destroy()
903 })
904
905 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
906 const pool = new FixedThreadPool(
907 max,
908 './tests/worker-files/thread/testWorker.js',
909 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
910 )
911 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
912 const promises = new Set()
913 const maxMultiplier = 2
914 for (let i = 0; i < max * maxMultiplier; i++) {
915 promises.add(pool.execute())
916 }
917 await Promise.all(promises)
918 for (const workerNode of pool.workerNodes) {
919 expect(workerNode.usage).toStrictEqual({
920 tasks: {
921 executed: expect.any(Number),
922 executing: 0,
923 queued: 0,
924 maxQueued: 0,
925 stolen: 0,
926 failed: 0
927 },
928 runTime: {
929 history: new CircularArray()
930 },
931 waitTime: {
932 history: new CircularArray()
933 },
934 elu: expect.objectContaining({
935 idle: expect.objectContaining({
936 history: expect.any(CircularArray)
937 }),
938 active: expect.objectContaining({
939 history: expect.any(CircularArray)
940 })
941 })
942 })
943 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
944 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
945 max * maxMultiplier
946 )
947 if (workerNode.usage.elu.active.aggregate == null) {
948 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
949 } else {
950 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
951 }
952 if (workerNode.usage.elu.idle.aggregate == null) {
953 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
954 } else {
955 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
956 }
957 if (workerNode.usage.elu.utilization == null) {
958 expect(workerNode.usage.elu.utilization).toBeUndefined()
959 } else {
960 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
961 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
962 }
963 }
964 // We need to clean up the resources after our test
965 await pool.destroy()
966 })
967
968 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
969 const pool = new DynamicThreadPool(
970 min,
971 max,
972 './tests/worker-files/thread/testWorker.js',
973 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
974 )
975 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
976 const promises = new Set()
977 const maxMultiplier = 2
978 for (let i = 0; i < max * maxMultiplier; i++) {
979 promises.add(pool.execute())
980 }
981 await Promise.all(promises)
982 for (const workerNode of pool.workerNodes) {
983 expect(workerNode.usage).toStrictEqual({
984 tasks: {
985 executed: expect.any(Number),
986 executing: 0,
987 queued: 0,
988 maxQueued: 0,
989 stolen: 0,
990 failed: 0
991 },
992 runTime: {
993 history: new CircularArray()
994 },
995 waitTime: {
996 history: new CircularArray()
997 },
998 elu: expect.objectContaining({
999 idle: expect.objectContaining({
1000 history: expect.any(CircularArray)
1001 }),
1002 active: expect.objectContaining({
1003 history: expect.any(CircularArray)
1004 })
1005 })
1006 })
1007 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1008 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1009 max * maxMultiplier
1010 )
1011 if (workerNode.usage.elu.active.aggregate == null) {
1012 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1013 } else {
1014 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1015 }
1016 if (workerNode.usage.elu.idle.aggregate == null) {
1017 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1018 } else {
1019 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1020 }
1021 if (workerNode.usage.elu.utilization == null) {
1022 expect(workerNode.usage.elu.utilization).toBeUndefined()
1023 } else {
1024 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1025 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1026 }
1027 }
1028 // We need to clean up the resources after our test
1029 await pool.destroy()
1030 })
1031
1032 it('Verify FAIR_SHARE strategy default policy', async () => {
1033 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1034 let pool = new FixedThreadPool(
1035 max,
1036 './tests/worker-files/thread/testWorker.js',
1037 { workerChoiceStrategy }
1038 )
1039 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1040 dynamicWorkerUsage: false,
1041 dynamicWorkerReady: true
1042 })
1043 await pool.destroy()
1044 pool = new DynamicThreadPool(
1045 min,
1046 max,
1047 './tests/worker-files/thread/testWorker.js',
1048 { workerChoiceStrategy }
1049 )
1050 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1051 dynamicWorkerUsage: false,
1052 dynamicWorkerReady: true
1053 })
1054 // We need to clean up the resources after our test
1055 await pool.destroy()
1056 })
1057
1058 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1059 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1060 let pool = new FixedThreadPool(
1061 max,
1062 './tests/worker-files/thread/testWorker.js',
1063 { workerChoiceStrategy }
1064 )
1065 expect(
1066 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1067 ).toStrictEqual({
1068 runTime: {
1069 aggregate: true,
1070 average: true,
1071 median: false
1072 },
1073 waitTime: {
1074 aggregate: false,
1075 average: false,
1076 median: false
1077 },
1078 elu: {
1079 aggregate: true,
1080 average: true,
1081 median: false
1082 }
1083 })
1084 await pool.destroy()
1085 pool = new DynamicThreadPool(
1086 min,
1087 max,
1088 './tests/worker-files/thread/testWorker.js',
1089 { workerChoiceStrategy }
1090 )
1091 expect(
1092 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1093 ).toStrictEqual({
1094 runTime: {
1095 aggregate: true,
1096 average: true,
1097 median: false
1098 },
1099 waitTime: {
1100 aggregate: false,
1101 average: false,
1102 median: false
1103 },
1104 elu: {
1105 aggregate: true,
1106 average: true,
1107 median: false
1108 }
1109 })
1110 // We need to clean up the resources after our test
1111 await pool.destroy()
1112 })
1113
1114 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1115 const pool = new FixedThreadPool(
1116 max,
1117 './tests/worker-files/thread/testWorker.js',
1118 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1119 )
1120 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1121 const promises = new Set()
1122 const maxMultiplier = 2
1123 for (let i = 0; i < max * maxMultiplier; i++) {
1124 promises.add(pool.execute())
1125 }
1126 await Promise.all(promises)
1127 for (const workerNode of pool.workerNodes) {
1128 expect(workerNode.usage).toStrictEqual({
1129 tasks: {
1130 executed: expect.any(Number),
1131 executing: 0,
1132 queued: 0,
1133 maxQueued: 0,
1134 stolen: 0,
1135 failed: 0
1136 },
1137 runTime: expect.objectContaining({
1138 history: expect.any(CircularArray)
1139 }),
1140 waitTime: {
1141 history: new CircularArray()
1142 },
1143 elu: expect.objectContaining({
1144 idle: expect.objectContaining({
1145 history: expect.any(CircularArray)
1146 }),
1147 active: expect.objectContaining({
1148 history: expect.any(CircularArray)
1149 })
1150 })
1151 })
1152 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1153 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1154 max * maxMultiplier
1155 )
1156 if (workerNode.usage.runTime.aggregate == null) {
1157 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1158 } else {
1159 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1160 }
1161 if (workerNode.usage.runTime.average == null) {
1162 expect(workerNode.usage.runTime.average).toBeUndefined()
1163 } else {
1164 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1165 }
1166 if (workerNode.usage.elu.active.aggregate == null) {
1167 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1168 } else {
1169 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1170 }
1171 if (workerNode.usage.elu.idle.aggregate == null) {
1172 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1173 } else {
1174 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1175 }
1176 if (workerNode.usage.elu.utilization == null) {
1177 expect(workerNode.usage.elu.utilization).toBeUndefined()
1178 } else {
1179 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1180 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1181 }
1182 }
1183 expect(
1184 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1185 pool.workerChoiceStrategyContext.workerChoiceStrategy
1186 ).workersVirtualTaskEndTimestamp.length
1187 ).toBe(pool.workerNodes.length)
1188 // We need to clean up the resources after our test
1189 await pool.destroy()
1190 })
1191
1192 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1193 const pool = new DynamicThreadPool(
1194 min,
1195 max,
1196 './tests/worker-files/thread/testWorker.js',
1197 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1198 )
1199 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1200 const promises = new Set()
1201 const maxMultiplier = 2
1202 for (let i = 0; i < max * maxMultiplier; i++) {
1203 promises.add(pool.execute())
1204 }
1205 await Promise.all(promises)
1206 for (const workerNode of pool.workerNodes) {
1207 expect(workerNode.usage).toStrictEqual({
1208 tasks: {
1209 executed: expect.any(Number),
1210 executing: 0,
1211 queued: 0,
1212 maxQueued: 0,
1213 stolen: 0,
1214 failed: 0
1215 },
1216 runTime: expect.objectContaining({
1217 history: expect.any(CircularArray)
1218 }),
1219 waitTime: {
1220 history: new CircularArray()
1221 },
1222 elu: expect.objectContaining({
1223 idle: expect.objectContaining({
1224 history: expect.any(CircularArray)
1225 }),
1226 active: expect.objectContaining({
1227 history: expect.any(CircularArray)
1228 })
1229 })
1230 })
1231 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1232 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1233 max * maxMultiplier
1234 )
1235 if (workerNode.usage.runTime.aggregate == null) {
1236 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1237 } else {
1238 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1239 }
1240 if (workerNode.usage.runTime.average == null) {
1241 expect(workerNode.usage.runTime.average).toBeUndefined()
1242 } else {
1243 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1244 }
1245 if (workerNode.usage.elu.active.aggregate == null) {
1246 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1247 } else {
1248 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1249 }
1250 if (workerNode.usage.elu.idle.aggregate == null) {
1251 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1252 } else {
1253 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1254 }
1255 if (workerNode.usage.elu.utilization == null) {
1256 expect(workerNode.usage.elu.utilization).toBeUndefined()
1257 } else {
1258 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1259 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1260 }
1261 }
1262 expect(
1263 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1264 pool.workerChoiceStrategyContext.workerChoiceStrategy
1265 ).workersVirtualTaskEndTimestamp.length
1266 ).toBe(pool.workerNodes.length)
1267 // We need to clean up the resources after our test
1268 await pool.destroy()
1269 })
1270
1271 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1272 const pool = new DynamicThreadPool(
1273 min,
1274 max,
1275 './tests/worker-files/thread/testWorker.js',
1276 {
1277 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1278 workerChoiceStrategyOptions: {
1279 runTime: { median: true }
1280 }
1281 }
1282 )
1283 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1284 const promises = new Set()
1285 const maxMultiplier = 2
1286 for (let i = 0; i < max * maxMultiplier; i++) {
1287 promises.add(pool.execute())
1288 }
1289 await Promise.all(promises)
1290 for (const workerNode of pool.workerNodes) {
1291 expect(workerNode.usage).toStrictEqual({
1292 tasks: {
1293 executed: expect.any(Number),
1294 executing: 0,
1295 queued: 0,
1296 maxQueued: 0,
1297 stolen: 0,
1298 failed: 0
1299 },
1300 runTime: expect.objectContaining({
1301 history: expect.any(CircularArray)
1302 }),
1303 waitTime: {
1304 history: new CircularArray()
1305 },
1306 elu: expect.objectContaining({
1307 idle: expect.objectContaining({
1308 history: expect.any(CircularArray)
1309 }),
1310 active: expect.objectContaining({
1311 history: expect.any(CircularArray)
1312 })
1313 })
1314 })
1315 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1316 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1317 max * maxMultiplier
1318 )
1319 if (workerNode.usage.runTime.aggregate == null) {
1320 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1321 } else {
1322 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1323 }
1324 if (workerNode.usage.runTime.median == null) {
1325 expect(workerNode.usage.runTime.median).toBeUndefined()
1326 } else {
1327 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1328 }
1329 if (workerNode.usage.elu.active.aggregate == null) {
1330 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1331 } else {
1332 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1333 }
1334 if (workerNode.usage.elu.idle.aggregate == null) {
1335 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1336 } else {
1337 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1338 }
1339 if (workerNode.usage.elu.utilization == null) {
1340 expect(workerNode.usage.elu.utilization).toBeUndefined()
1341 } else {
1342 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1343 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1344 }
1345 }
1346 expect(
1347 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1348 pool.workerChoiceStrategyContext.workerChoiceStrategy
1349 ).workersVirtualTaskEndTimestamp.length
1350 ).toBe(pool.workerNodes.length)
1351 // We need to clean up the resources after our test
1352 await pool.destroy()
1353 })
1354
1355 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1356 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1357 let pool = new FixedThreadPool(
1358 max,
1359 './tests/worker-files/thread/testWorker.js'
1360 )
1361 expect(
1362 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1363 workerChoiceStrategy
1364 ).workersVirtualTaskEndTimestamp
1365 ).toBeInstanceOf(Array)
1366 expect(
1367 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1368 workerChoiceStrategy
1369 ).workersVirtualTaskEndTimestamp.length
1370 ).toBe(0)
1371 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1372 workerChoiceStrategy
1373 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1374 expect(
1375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1376 workerChoiceStrategy
1377 ).workersVirtualTaskEndTimestamp.length
1378 ).toBe(1)
1379 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1380 expect(
1381 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1382 pool.workerChoiceStrategyContext.workerChoiceStrategy
1383 ).workersVirtualTaskEndTimestamp
1384 ).toBeInstanceOf(Array)
1385 expect(
1386 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1387 pool.workerChoiceStrategyContext.workerChoiceStrategy
1388 ).workersVirtualTaskEndTimestamp.length
1389 ).toBe(0)
1390 await pool.destroy()
1391 pool = new DynamicThreadPool(
1392 min,
1393 max,
1394 './tests/worker-files/thread/testWorker.js'
1395 )
1396 expect(
1397 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1398 workerChoiceStrategy
1399 ).workersVirtualTaskEndTimestamp
1400 ).toBeInstanceOf(Array)
1401 expect(
1402 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1403 workerChoiceStrategy
1404 ).workersVirtualTaskEndTimestamp.length
1405 ).toBe(0)
1406 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1407 workerChoiceStrategy
1408 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1409 expect(
1410 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1411 workerChoiceStrategy
1412 ).workersVirtualTaskEndTimestamp.length
1413 ).toBe(1)
1414 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1415 expect(
1416 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1417 pool.workerChoiceStrategyContext.workerChoiceStrategy
1418 ).workersVirtualTaskEndTimestamp
1419 ).toBeInstanceOf(Array)
1420 expect(
1421 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1422 pool.workerChoiceStrategyContext.workerChoiceStrategy
1423 ).workersVirtualTaskEndTimestamp.length
1424 ).toBe(0)
1425 // We need to clean up the resources after our test
1426 await pool.destroy()
1427 })
1428
1429 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1430 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1431 let pool = new FixedThreadPool(
1432 max,
1433 './tests/worker-files/thread/testWorker.js',
1434 { workerChoiceStrategy }
1435 )
1436 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1437 dynamicWorkerUsage: false,
1438 dynamicWorkerReady: true
1439 })
1440 await pool.destroy()
1441 pool = new DynamicThreadPool(
1442 min,
1443 max,
1444 './tests/worker-files/thread/testWorker.js',
1445 { workerChoiceStrategy }
1446 )
1447 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1448 dynamicWorkerUsage: false,
1449 dynamicWorkerReady: true
1450 })
1451 // We need to clean up the resources after our test
1452 await pool.destroy()
1453 })
1454
1455 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1456 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1457 let pool = new FixedThreadPool(
1458 max,
1459 './tests/worker-files/thread/testWorker.js',
1460 { workerChoiceStrategy }
1461 )
1462 expect(
1463 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1464 ).toStrictEqual({
1465 runTime: {
1466 aggregate: true,
1467 average: true,
1468 median: false
1469 },
1470 waitTime: {
1471 aggregate: false,
1472 average: false,
1473 median: false
1474 },
1475 elu: {
1476 aggregate: false,
1477 average: false,
1478 median: false
1479 }
1480 })
1481 await pool.destroy()
1482 pool = new DynamicThreadPool(
1483 min,
1484 max,
1485 './tests/worker-files/thread/testWorker.js',
1486 { workerChoiceStrategy }
1487 )
1488 expect(
1489 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1490 ).toStrictEqual({
1491 runTime: {
1492 aggregate: true,
1493 average: true,
1494 median: false
1495 },
1496 waitTime: {
1497 aggregate: false,
1498 average: false,
1499 median: false
1500 },
1501 elu: {
1502 aggregate: false,
1503 average: false,
1504 median: false
1505 }
1506 })
1507 // We need to clean up the resources after our test
1508 await pool.destroy()
1509 })
1510
1511 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1512 const pool = new FixedThreadPool(
1513 max,
1514 './tests/worker-files/thread/testWorker.js',
1515 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1516 )
1517 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1518 const promises = new Set()
1519 const maxMultiplier = 2
1520 for (let i = 0; i < max * maxMultiplier; i++) {
1521 promises.add(pool.execute())
1522 }
1523 await Promise.all(promises)
1524 for (const workerNode of pool.workerNodes) {
1525 expect(workerNode.usage).toStrictEqual({
1526 tasks: {
1527 executed: expect.any(Number),
1528 executing: 0,
1529 queued: 0,
1530 maxQueued: 0,
1531 stolen: 0,
1532 failed: 0
1533 },
1534 runTime: expect.objectContaining({
1535 history: expect.any(CircularArray)
1536 }),
1537 waitTime: {
1538 history: new CircularArray()
1539 },
1540 elu: {
1541 idle: {
1542 history: new CircularArray()
1543 },
1544 active: {
1545 history: new CircularArray()
1546 }
1547 }
1548 })
1549 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1550 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1551 max * maxMultiplier
1552 )
1553 if (workerNode.usage.runTime.aggregate == null) {
1554 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1555 } else {
1556 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1557 }
1558 if (workerNode.usage.runTime.average == null) {
1559 expect(workerNode.usage.runTime.average).toBeUndefined()
1560 } else {
1561 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1562 }
1563 }
1564 expect(
1565 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1566 pool.workerChoiceStrategyContext.workerChoiceStrategy
1567 ).nextWorkerNodeKey
1568 ).toBe(0)
1569 expect(
1570 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1571 pool.workerChoiceStrategyContext.workerChoiceStrategy
1572 ).previousWorkerNodeKey
1573 ).toBe(0)
1574 expect(
1575 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1576 pool.workerChoiceStrategyContext.workerChoiceStrategy
1577 ).defaultWorkerWeight
1578 ).toBeGreaterThan(0)
1579 expect(
1580 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1581 pool.workerChoiceStrategyContext.workerChoiceStrategy
1582 ).workerVirtualTaskRunTime
1583 ).toBeGreaterThanOrEqual(0)
1584 // We need to clean up the resources after our test
1585 await pool.destroy()
1586 })
1587
1588 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1589 const pool = new DynamicThreadPool(
1590 min,
1591 max,
1592 './tests/worker-files/thread/testWorker.js',
1593 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1594 )
1595 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1596 const promises = new Set()
1597 const maxMultiplier = 2
1598 for (let i = 0; i < max * maxMultiplier; i++) {
1599 promises.add(pool.execute())
1600 }
1601 await Promise.all(promises)
1602 for (const workerNode of pool.workerNodes) {
1603 expect(workerNode.usage).toStrictEqual({
1604 tasks: {
1605 executed: expect.any(Number),
1606 executing: 0,
1607 queued: 0,
1608 maxQueued: 0,
1609 stolen: 0,
1610 failed: 0
1611 },
1612 runTime: expect.objectContaining({
1613 history: expect.any(CircularArray)
1614 }),
1615 waitTime: {
1616 history: new CircularArray()
1617 },
1618 elu: {
1619 idle: {
1620 history: new CircularArray()
1621 },
1622 active: {
1623 history: new CircularArray()
1624 }
1625 }
1626 })
1627 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1628 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1629 max * maxMultiplier
1630 )
1631 if (workerNode.usage.runTime.aggregate == null) {
1632 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1633 } else {
1634 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1635 }
1636 if (workerNode.usage.runTime.average == null) {
1637 expect(workerNode.usage.runTime.average).toBeUndefined()
1638 } else {
1639 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1640 }
1641 }
1642 expect(
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 pool.workerChoiceStrategyContext.workerChoiceStrategy
1645 ).nextWorkerNodeKey
1646 ).toBe(0)
1647 expect(
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 ).previousWorkerNodeKey
1651 ).toBe(0)
1652 expect(
1653 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1654 pool.workerChoiceStrategyContext.workerChoiceStrategy
1655 ).defaultWorkerWeight
1656 ).toBeGreaterThan(0)
1657 expect(
1658 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1659 pool.workerChoiceStrategyContext.workerChoiceStrategy
1660 ).workerVirtualTaskRunTime
1661 ).toBeGreaterThanOrEqual(0)
1662 // We need to clean up the resources after our test
1663 await pool.destroy()
1664 })
1665
1666 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1667 const pool = new DynamicThreadPool(
1668 min,
1669 max,
1670 './tests/worker-files/thread/testWorker.js',
1671 {
1672 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1673 workerChoiceStrategyOptions: {
1674 runTime: { median: true }
1675 }
1676 }
1677 )
1678 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1679 const promises = new Set()
1680 const maxMultiplier = 2
1681 for (let i = 0; i < max * maxMultiplier; i++) {
1682 promises.add(pool.execute())
1683 }
1684 await Promise.all(promises)
1685 for (const workerNode of pool.workerNodes) {
1686 expect(workerNode.usage).toStrictEqual({
1687 tasks: {
1688 executed: expect.any(Number),
1689 executing: 0,
1690 queued: 0,
1691 maxQueued: 0,
1692 stolen: 0,
1693 failed: 0
1694 },
1695 runTime: expect.objectContaining({
1696 history: expect.any(CircularArray)
1697 }),
1698 waitTime: {
1699 history: new CircularArray()
1700 },
1701 elu: {
1702 idle: {
1703 history: new CircularArray()
1704 },
1705 active: {
1706 history: new CircularArray()
1707 }
1708 }
1709 })
1710 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1711 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1712 max * maxMultiplier
1713 )
1714 if (workerNode.usage.runTime.aggregate == null) {
1715 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1716 } else {
1717 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1718 }
1719 if (workerNode.usage.runTime.median == null) {
1720 expect(workerNode.usage.runTime.median).toBeUndefined()
1721 } else {
1722 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1723 }
1724 }
1725 expect(
1726 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1727 pool.workerChoiceStrategyContext.workerChoiceStrategy
1728 ).nextWorkerNodeKey
1729 ).toBe(0)
1730 expect(
1731 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1732 pool.workerChoiceStrategyContext.workerChoiceStrategy
1733 ).previousWorkerNodeKey
1734 ).toBe(0)
1735 expect(
1736 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1737 pool.workerChoiceStrategyContext.workerChoiceStrategy
1738 ).defaultWorkerWeight
1739 ).toBeGreaterThan(0)
1740 expect(
1741 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1742 pool.workerChoiceStrategyContext.workerChoiceStrategy
1743 ).workerVirtualTaskRunTime
1744 ).toBeGreaterThanOrEqual(0)
1745 // We need to clean up the resources after our test
1746 await pool.destroy()
1747 })
1748
1749 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1750 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1751 let pool = new FixedThreadPool(
1752 max,
1753 './tests/worker-files/thread/testWorker.js'
1754 )
1755 expect(
1756 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1757 workerChoiceStrategy
1758 ).nextWorkerNodeKey
1759 ).toBeDefined()
1760 expect(
1761 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1762 workerChoiceStrategy
1763 ).previousWorkerNodeKey
1764 ).toBeDefined()
1765 expect(
1766 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1767 workerChoiceStrategy
1768 ).defaultWorkerWeight
1769 ).toBeDefined()
1770 expect(
1771 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1772 workerChoiceStrategy
1773 ).workerVirtualTaskRunTime
1774 ).toBeDefined()
1775 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1776 expect(
1777 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1778 pool.workerChoiceStrategyContext.workerChoiceStrategy
1779 ).nextWorkerNodeKey
1780 ).toBe(0)
1781 expect(
1782 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1783 pool.workerChoiceStrategyContext.workerChoiceStrategy
1784 ).previousWorkerNodeKey
1785 ).toBe(0)
1786 expect(
1787 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1788 pool.workerChoiceStrategyContext.workerChoiceStrategy
1789 ).defaultWorkerWeight
1790 ).toBeGreaterThan(0)
1791 expect(
1792 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1793 pool.workerChoiceStrategyContext.workerChoiceStrategy
1794 ).workerVirtualTaskRunTime
1795 ).toBe(0)
1796 await pool.destroy()
1797 pool = new DynamicThreadPool(
1798 min,
1799 max,
1800 './tests/worker-files/thread/testWorker.js'
1801 )
1802 expect(
1803 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1804 workerChoiceStrategy
1805 ).nextWorkerNodeKey
1806 ).toBeDefined()
1807 expect(
1808 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1809 workerChoiceStrategy
1810 ).previousWorkerNodeKey
1811 ).toBeDefined()
1812 expect(
1813 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1814 workerChoiceStrategy
1815 ).defaultWorkerWeight
1816 ).toBeDefined()
1817 expect(
1818 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1819 workerChoiceStrategy
1820 ).workerVirtualTaskRunTime
1821 ).toBeDefined()
1822 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1823 expect(
1824 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1825 pool.workerChoiceStrategyContext.workerChoiceStrategy
1826 ).nextWorkerNodeKey
1827 ).toBe(0)
1828 expect(
1829 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1830 pool.workerChoiceStrategyContext.workerChoiceStrategy
1831 ).previousWorkerNodeKey
1832 ).toBe(0)
1833 expect(
1834 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1835 pool.workerChoiceStrategyContext.workerChoiceStrategy
1836 ).defaultWorkerWeight
1837 ).toBeGreaterThan(0)
1838 expect(
1839 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1840 pool.workerChoiceStrategyContext.workerChoiceStrategy
1841 ).workerVirtualTaskRunTime
1842 ).toBe(0)
1843 // We need to clean up the resources after our test
1844 await pool.destroy()
1845 })
1846
1847 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1848 const workerChoiceStrategy =
1849 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1850 let pool = new FixedThreadPool(
1851 max,
1852 './tests/worker-files/thread/testWorker.js',
1853 { workerChoiceStrategy }
1854 )
1855 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1856 dynamicWorkerUsage: false,
1857 dynamicWorkerReady: true
1858 })
1859 await pool.destroy()
1860 pool = new DynamicThreadPool(
1861 min,
1862 max,
1863 './tests/worker-files/thread/testWorker.js',
1864 { workerChoiceStrategy }
1865 )
1866 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1867 dynamicWorkerUsage: false,
1868 dynamicWorkerReady: true
1869 })
1870 // We need to clean up the resources after our test
1871 await pool.destroy()
1872 })
1873
1874 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1875 const workerChoiceStrategy =
1876 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1877 let pool = new FixedThreadPool(
1878 max,
1879 './tests/worker-files/thread/testWorker.js',
1880 { workerChoiceStrategy }
1881 )
1882 expect(
1883 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1884 ).toStrictEqual({
1885 runTime: {
1886 aggregate: true,
1887 average: true,
1888 median: false
1889 },
1890 waitTime: {
1891 aggregate: false,
1892 average: false,
1893 median: false
1894 },
1895 elu: {
1896 aggregate: false,
1897 average: false,
1898 median: false
1899 }
1900 })
1901 await pool.destroy()
1902 pool = new DynamicThreadPool(
1903 min,
1904 max,
1905 './tests/worker-files/thread/testWorker.js',
1906 { workerChoiceStrategy }
1907 )
1908 expect(
1909 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1910 ).toStrictEqual({
1911 runTime: {
1912 aggregate: true,
1913 average: true,
1914 median: false
1915 },
1916 waitTime: {
1917 aggregate: false,
1918 average: false,
1919 median: false
1920 },
1921 elu: {
1922 aggregate: false,
1923 average: false,
1924 median: false
1925 }
1926 })
1927 // We need to clean up the resources after our test
1928 await pool.destroy()
1929 })
1930
1931 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1932 const pool = new FixedThreadPool(
1933 max,
1934 './tests/worker-files/thread/testWorker.js',
1935 {
1936 workerChoiceStrategy:
1937 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1938 }
1939 )
1940 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1941 const promises = new Set()
1942 const maxMultiplier = 2
1943 for (let i = 0; i < max * maxMultiplier; i++) {
1944 promises.add(pool.execute())
1945 }
1946 await Promise.all(promises)
1947 for (const workerNode of pool.workerNodes) {
1948 expect(workerNode.usage).toStrictEqual({
1949 tasks: {
1950 executed: expect.any(Number),
1951 executing: 0,
1952 queued: 0,
1953 maxQueued: 0,
1954 stolen: 0,
1955 failed: 0
1956 },
1957 runTime: expect.objectContaining({
1958 history: expect.any(CircularArray)
1959 }),
1960 waitTime: {
1961 history: new CircularArray()
1962 },
1963 elu: {
1964 idle: {
1965 history: new CircularArray()
1966 },
1967 active: {
1968 history: new CircularArray()
1969 }
1970 }
1971 })
1972 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1973 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1974 max * maxMultiplier
1975 )
1976 }
1977 expect(
1978 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1979 pool.workerChoiceStrategyContext.workerChoiceStrategy
1980 ).defaultWorkerWeight
1981 ).toBeGreaterThan(0)
1982 expect(
1983 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1984 pool.workerChoiceStrategyContext.workerChoiceStrategy
1985 ).roundId
1986 ).toBe(0)
1987 expect(
1988 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1989 pool.workerChoiceStrategyContext.workerChoiceStrategy
1990 ).workerNodeId
1991 ).toBe(0)
1992 expect(
1993 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1994 pool.workerChoiceStrategyContext.workerChoiceStrategy
1995 ).nextWorkerNodeKey
1996 ).toBe(0)
1997 expect(
1998 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1999 pool.workerChoiceStrategyContext.workerChoiceStrategy
2000 ).previousWorkerNodeKey
2001 ).toEqual(expect.any(Number))
2002 expect(
2003 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2004 pool.workerChoiceStrategyContext.workerChoiceStrategy
2005 ).roundWeights
2006 ).toStrictEqual([
2007 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2008 pool.workerChoiceStrategyContext.workerChoiceStrategy
2009 ).defaultWorkerWeight
2010 ])
2011 // We need to clean up the resources after our test
2012 await pool.destroy()
2013 })
2014
2015 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2016 const pool = new DynamicThreadPool(
2017 min,
2018 max,
2019 './tests/worker-files/thread/testWorker.js',
2020 {
2021 workerChoiceStrategy:
2022 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2023 }
2024 )
2025 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2026 const promises = new Set()
2027 const maxMultiplier = 2
2028 for (let i = 0; i < max * maxMultiplier; i++) {
2029 promises.add(pool.execute())
2030 }
2031 await Promise.all(promises)
2032 for (const workerNode of pool.workerNodes) {
2033 expect(workerNode.usage).toStrictEqual({
2034 tasks: {
2035 executed: expect.any(Number),
2036 executing: 0,
2037 queued: 0,
2038 maxQueued: 0,
2039 stolen: 0,
2040 failed: 0
2041 },
2042 runTime: expect.objectContaining({
2043 history: expect.any(CircularArray)
2044 }),
2045 waitTime: {
2046 history: new CircularArray()
2047 },
2048 elu: {
2049 idle: {
2050 history: new CircularArray()
2051 },
2052 active: {
2053 history: new CircularArray()
2054 }
2055 }
2056 })
2057 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2058 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2059 max * maxMultiplier
2060 )
2061 }
2062 expect(
2063 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2064 pool.workerChoiceStrategyContext.workerChoiceStrategy
2065 ).defaultWorkerWeight
2066 ).toBeGreaterThan(0)
2067 expect(
2068 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2069 pool.workerChoiceStrategyContext.workerChoiceStrategy
2070 ).roundId
2071 ).toBe(0)
2072 expect(
2073 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2074 pool.workerChoiceStrategyContext.workerChoiceStrategy
2075 ).workerNodeId
2076 ).toBe(0)
2077 expect(
2078 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2079 pool.workerChoiceStrategyContext.workerChoiceStrategy
2080 ).nextWorkerNodeKey
2081 ).toBe(0)
2082 expect(
2083 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2084 pool.workerChoiceStrategyContext.workerChoiceStrategy
2085 ).previousWorkerNodeKey
2086 ).toEqual(expect.any(Number))
2087 expect(
2088 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2089 pool.workerChoiceStrategyContext.workerChoiceStrategy
2090 ).roundWeights
2091 ).toStrictEqual([
2092 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2093 pool.workerChoiceStrategyContext.workerChoiceStrategy
2094 ).defaultWorkerWeight
2095 ])
2096 // We need to clean up the resources after our test
2097 await pool.destroy()
2098 })
2099
2100 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2101 const workerChoiceStrategy =
2102 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2103 let pool = new FixedThreadPool(
2104 max,
2105 './tests/worker-files/thread/testWorker.js'
2106 )
2107 expect(
2108 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2109 workerChoiceStrategy
2110 ).roundId
2111 ).toBeDefined()
2112 expect(
2113 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2114 workerChoiceStrategy
2115 ).workerNodeId
2116 ).toBeDefined()
2117 expect(
2118 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2119 workerChoiceStrategy
2120 ).nextWorkerNodeKey
2121 ).toBeDefined()
2122 expect(
2123 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2124 workerChoiceStrategy
2125 ).previousWorkerNodeKey
2126 ).toBeDefined()
2127 expect(
2128 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2129 workerChoiceStrategy
2130 ).defaultWorkerWeight
2131 ).toBeDefined()
2132 expect(
2133 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2134 workerChoiceStrategy
2135 ).roundWeights
2136 ).toBeDefined()
2137 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2138 expect(
2139 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2140 pool.workerChoiceStrategyContext.workerChoiceStrategy
2141 ).roundId
2142 ).toBe(0)
2143 expect(
2144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2145 pool.workerChoiceStrategyContext.workerChoiceStrategy
2146 ).workerNodeId
2147 ).toBe(0)
2148 expect(
2149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2150 pool.workerChoiceStrategyContext.workerChoiceStrategy
2151 ).nextWorkerNodeKey
2152 ).toBe(0)
2153 expect(
2154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2155 pool.workerChoiceStrategyContext.workerChoiceStrategy
2156 ).previousWorkerNodeKey
2157 ).toBe(0)
2158 expect(
2159 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2160 pool.workerChoiceStrategyContext.workerChoiceStrategy
2161 ).defaultWorkerWeight
2162 ).toBeGreaterThan(0)
2163 expect(
2164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2165 pool.workerChoiceStrategyContext.workerChoiceStrategy
2166 ).roundWeights
2167 ).toStrictEqual([
2168 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2169 pool.workerChoiceStrategyContext.workerChoiceStrategy
2170 ).defaultWorkerWeight
2171 ])
2172 await pool.destroy()
2173 pool = new DynamicThreadPool(
2174 min,
2175 max,
2176 './tests/worker-files/thread/testWorker.js'
2177 )
2178 expect(
2179 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2180 workerChoiceStrategy
2181 ).roundId
2182 ).toBeDefined()
2183 expect(
2184 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2185 workerChoiceStrategy
2186 ).workerNodeId
2187 ).toBeDefined()
2188 expect(
2189 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2190 workerChoiceStrategy
2191 ).nextWorkerNodeKey
2192 ).toBeDefined()
2193 expect(
2194 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2195 workerChoiceStrategy
2196 ).previousWorkerNodeKey
2197 ).toBeDefined()
2198 expect(
2199 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2200 workerChoiceStrategy
2201 ).defaultWorkerWeight
2202 ).toBeDefined()
2203 expect(
2204 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2205 workerChoiceStrategy
2206 ).roundWeights
2207 ).toBeDefined()
2208 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2209 expect(
2210 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2211 pool.workerChoiceStrategyContext.workerChoiceStrategy
2212 ).roundId
2213 ).toBe(0)
2214 expect(
2215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2216 pool.workerChoiceStrategyContext.workerChoiceStrategy
2217 ).workerNodeId
2218 ).toBe(0)
2219 expect(
2220 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2221 pool.workerChoiceStrategyContext.workerChoiceStrategy
2222 ).nextWorkerNodeKey
2223 ).toBe(0)
2224 expect(
2225 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2226 pool.workerChoiceStrategyContext.workerChoiceStrategy
2227 ).previousWorkerNodeKey
2228 ).toBe(0)
2229 expect(
2230 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2231 pool.workerChoiceStrategyContext.workerChoiceStrategy
2232 ).defaultWorkerWeight
2233 ).toBeGreaterThan(0)
2234 expect(
2235 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2236 pool.workerChoiceStrategyContext.workerChoiceStrategy
2237 ).roundWeights
2238 ).toStrictEqual([
2239 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2240 pool.workerChoiceStrategyContext.workerChoiceStrategy
2241 ).defaultWorkerWeight
2242 ])
2243 // We need to clean up the resources after our test
2244 await pool.destroy()
2245 })
2246
2247 it('Verify unknown strategy throw error', () => {
2248 expect(
2249 () =>
2250 new DynamicThreadPool(
2251 min,
2252 max,
2253 './tests/worker-files/thread/testWorker.js',
2254 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2255 )
2256 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2257 })
2258 })