feat: add per task function strategy support
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { randomInt } from 'node:crypto'
2
3 import { expect } from 'expect'
4
5 import { CircularArray } from '../../../lib/circular-array.cjs'
6 import {
7 DynamicClusterPool,
8 DynamicThreadPool,
9 FixedClusterPool,
10 FixedThreadPool,
11 WorkerChoiceStrategies
12 } from '../../../lib/index.cjs'
13
14 describe('Selection strategies test suite', () => {
15 const min = 0
16 const max = 3
17
18 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
19 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
20 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
21 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
22 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
23 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
24 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
25 'WEIGHTED_ROUND_ROBIN'
26 )
27 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
28 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
29 )
30 })
31
32 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
33 const pool = new DynamicThreadPool(
34 min,
35 max,
36 './tests/worker-files/thread/testWorker.mjs'
37 )
38 expect(pool.opts.workerChoiceStrategy).toBe(
39 WorkerChoiceStrategies.ROUND_ROBIN
40 )
41 expect(pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe(
42 WorkerChoiceStrategies.ROUND_ROBIN
43 )
44 // We need to clean up the resources after our test
45 await pool.destroy()
46 })
47
48 it('Verify available strategies are taken at pool creation', async () => {
49 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
50 const pool = new FixedThreadPool(
51 max,
52 './tests/worker-files/thread/testWorker.mjs',
53 { workerChoiceStrategy }
54 )
55 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
56 expect(
57 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
58 ).toBe(workerChoiceStrategy)
59 await pool.destroy()
60 }
61 })
62
63 it('Verify available strategies can be set after pool creation', async () => {
64 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
65 const pool = new DynamicThreadPool(
66 min,
67 max,
68 './tests/worker-files/thread/testWorker.mjs'
69 )
70 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
71 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
72 expect(
73 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
74 ).toBe(workerChoiceStrategy)
75 await pool.destroy()
76 }
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 const pool = new DynamicClusterPool(
79 min,
80 max,
81 './tests/worker-files/cluster/testWorker.cjs'
82 )
83 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
84 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
85 expect(
86 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
87 ).toBe(workerChoiceStrategy)
88 await pool.destroy()
89 }
90 })
91
92 it('Verify available strategies default internals at pool creation', async () => {
93 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
94 const pool = new FixedThreadPool(
95 max,
96 './tests/worker-files/thread/testWorker.mjs',
97 { workerChoiceStrategy }
98 )
99 expect(
100 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
101 workerChoiceStrategy
102 ).nextWorkerNodeKey
103 ).toBe(0)
104 expect(
105 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
106 workerChoiceStrategy
107 ).previousWorkerNodeKey
108 ).toBe(0)
109 if (
110 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
111 ) {
112 expect(
113 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
114 workerChoiceStrategy
115 ).workerNodeVirtualTaskRunTime
116 ).toBe(0)
117 } else if (
118 workerChoiceStrategy ===
119 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
120 ) {
121 expect(
122 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).workerNodeVirtualTaskRunTime
125 ).toBe(0)
126 expect(
127 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
128 workerChoiceStrategy
129 ).roundId
130 ).toBe(0)
131 expect(
132 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
133 workerChoiceStrategy
134 ).workerNodeId
135 ).toBe(0)
136 expect(
137 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
138 workerChoiceStrategy
139 ).roundWeights.length
140 ).toBe(1)
141 expect(
142 Number.isSafeInteger(
143 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
144 workerChoiceStrategy
145 ).roundWeights[0]
146 )
147 ).toBe(true)
148 }
149 await pool.destroy()
150 }
151 })
152
153 it('Verify ROUND_ROBIN strategy default policy', async () => {
154 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
155 let pool = new FixedThreadPool(
156 max,
157 './tests/worker-files/thread/testWorker.mjs',
158 { workerChoiceStrategy }
159 )
160 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
161 dynamicWorkerUsage: false,
162 dynamicWorkerReady: true
163 })
164 await pool.destroy()
165 pool = new DynamicThreadPool(
166 min,
167 max,
168 './tests/worker-files/thread/testWorker.mjs',
169 { workerChoiceStrategy }
170 )
171 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
172 dynamicWorkerUsage: false,
173 dynamicWorkerReady: true
174 })
175 // We need to clean up the resources after our test
176 await pool.destroy()
177 })
178
179 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
180 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
181 let pool = new FixedThreadPool(
182 max,
183 './tests/worker-files/thread/testWorker.mjs',
184 { workerChoiceStrategy }
185 )
186 expect(
187 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
188 ).toStrictEqual({
189 runTime: {
190 aggregate: false,
191 average: false,
192 median: false
193 },
194 waitTime: {
195 aggregate: false,
196 average: false,
197 median: false
198 },
199 elu: {
200 aggregate: false,
201 average: false,
202 median: false
203 }
204 })
205 await pool.destroy()
206 pool = new DynamicThreadPool(
207 min,
208 max,
209 './tests/worker-files/thread/testWorker.mjs',
210 { workerChoiceStrategy }
211 )
212 expect(
213 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
214 ).toStrictEqual({
215 runTime: {
216 aggregate: false,
217 average: false,
218 median: false
219 },
220 waitTime: {
221 aggregate: false,
222 average: false,
223 median: false
224 },
225 elu: {
226 aggregate: false,
227 average: false,
228 median: false
229 }
230 })
231 // We need to clean up the resources after our test
232 await pool.destroy()
233 })
234
235 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
236 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
237 const pool = new FixedThreadPool(
238 max,
239 './tests/worker-files/thread/testWorker.mjs',
240 { workerChoiceStrategy }
241 )
242 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
243 const promises = new Set()
244 const maxMultiplier = 2
245 for (let i = 0; i < max * maxMultiplier; i++) {
246 promises.add(pool.execute())
247 }
248 await Promise.all(promises)
249 for (const workerNode of pool.workerNodes) {
250 expect(workerNode.usage).toStrictEqual({
251 tasks: {
252 executed: maxMultiplier,
253 executing: 0,
254 queued: 0,
255 maxQueued: 0,
256 sequentiallyStolen: 0,
257 stolen: 0,
258 failed: 0
259 },
260 runTime: {
261 history: new CircularArray()
262 },
263 waitTime: {
264 history: new CircularArray()
265 },
266 elu: {
267 idle: {
268 history: new CircularArray()
269 },
270 active: {
271 history: new CircularArray()
272 }
273 }
274 })
275 }
276 expect(
277 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
278 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
279 ).nextWorkerNodeKey
280 ).toBe(0)
281 expect(
282 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
283 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
284 ).previousWorkerNodeKey
285 ).toBe(pool.workerNodes.length - 1)
286 // We need to clean up the resources after our test
287 await pool.destroy()
288 })
289
290 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
291 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
292 const pool = new DynamicThreadPool(
293 min,
294 max,
295 './tests/worker-files/thread/testWorker.mjs',
296 { workerChoiceStrategy }
297 )
298 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
299 const promises = new Set()
300 const maxMultiplier = 2
301 for (let i = 0; i < max * maxMultiplier; i++) {
302 promises.add(pool.execute())
303 }
304 await Promise.all(promises)
305 for (const workerNode of pool.workerNodes) {
306 expect(workerNode.usage).toStrictEqual({
307 tasks: {
308 executed: expect.any(Number),
309 executing: 0,
310 queued: 0,
311 maxQueued: 0,
312 sequentiallyStolen: 0,
313 stolen: 0,
314 failed: 0
315 },
316 runTime: {
317 history: new CircularArray()
318 },
319 waitTime: {
320 history: new CircularArray()
321 },
322 elu: {
323 idle: {
324 history: new CircularArray()
325 },
326 active: {
327 history: new CircularArray()
328 }
329 }
330 })
331 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
332 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
333 max * maxMultiplier
334 )
335 }
336 expect(
337 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
338 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
339 ).nextWorkerNodeKey
340 ).toBe(0)
341 expect(
342 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
343 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
344 ).previousWorkerNodeKey
345 ).toBe(pool.workerNodes.length - 1)
346 // We need to clean up the resources after our test
347 await pool.destroy()
348 })
349
350 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
351 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
352 let pool = new FixedClusterPool(
353 max,
354 './tests/worker-files/cluster/testWorker.cjs',
355 { workerChoiceStrategy }
356 )
357 let results = new Set()
358 for (let i = 0; i < max; i++) {
359 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
360 }
361 expect(results.size).toBe(max)
362 await pool.destroy()
363 pool = new FixedThreadPool(
364 max,
365 './tests/worker-files/thread/testWorker.mjs',
366 { workerChoiceStrategy }
367 )
368 results = new Set()
369 for (let i = 0; i < max; i++) {
370 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
371 }
372 expect(results.size).toBe(max)
373 await pool.destroy()
374 })
375
376 it("Verify ROUND_ROBIN strategy internals aren't reset after setting it", async () => {
377 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
378 let pool = new FixedThreadPool(
379 max,
380 './tests/worker-files/thread/testWorker.mjs',
381 { workerChoiceStrategy }
382 )
383 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
384 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
385 ).nextWorkerNodeKey = randomInt(1, max - 1)
386 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
387 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
388 ).previousWorkerNodeKey = randomInt(1, max - 1)
389 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
390 expect(
391 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
392 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
393 ).nextWorkerNodeKey
394 ).toBeGreaterThan(0)
395 expect(
396 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
397 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
398 ).previousWorkerNodeKey
399 ).toBeGreaterThan(0)
400 await pool.destroy()
401 pool = new DynamicThreadPool(
402 min,
403 max,
404 './tests/worker-files/thread/testWorker.mjs',
405 { workerChoiceStrategy }
406 )
407 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
408 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
409 ).nextWorkerNodeKey = randomInt(1, max - 1)
410 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
411 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
412 ).previousWorkerNodeKey = randomInt(1, max - 1)
413 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
414 expect(
415 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
416 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
417 ).nextWorkerNodeKey
418 ).toBeGreaterThan(0)
419 expect(
420 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
421 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
422 ).previousWorkerNodeKey
423 ).toBeGreaterThan(0)
424 // We need to clean up the resources after our test
425 await pool.destroy()
426 })
427
428 it('Verify LEAST_USED strategy default policy', async () => {
429 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
430 let pool = new FixedThreadPool(
431 max,
432 './tests/worker-files/thread/testWorker.mjs',
433 { workerChoiceStrategy }
434 )
435 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
436 dynamicWorkerUsage: false,
437 dynamicWorkerReady: true
438 })
439 await pool.destroy()
440 pool = new DynamicThreadPool(
441 min,
442 max,
443 './tests/worker-files/thread/testWorker.mjs',
444 { workerChoiceStrategy }
445 )
446 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
447 dynamicWorkerUsage: false,
448 dynamicWorkerReady: true
449 })
450 // We need to clean up the resources after our test
451 await pool.destroy()
452 })
453
454 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
455 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
456 let pool = new FixedThreadPool(
457 max,
458 './tests/worker-files/thread/testWorker.mjs',
459 { workerChoiceStrategy }
460 )
461 expect(
462 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
463 ).toStrictEqual({
464 runTime: {
465 aggregate: false,
466 average: false,
467 median: false
468 },
469 waitTime: {
470 aggregate: false,
471 average: false,
472 median: false
473 },
474 elu: {
475 aggregate: false,
476 average: false,
477 median: false
478 }
479 })
480 await pool.destroy()
481 pool = new DynamicThreadPool(
482 min,
483 max,
484 './tests/worker-files/thread/testWorker.mjs',
485 { workerChoiceStrategy }
486 )
487 expect(
488 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
489 ).toStrictEqual({
490 runTime: {
491 aggregate: false,
492 average: false,
493 median: false
494 },
495 waitTime: {
496 aggregate: false,
497 average: false,
498 median: false
499 },
500 elu: {
501 aggregate: false,
502 average: false,
503 median: false
504 }
505 })
506 // We need to clean up the resources after our test
507 await pool.destroy()
508 })
509
510 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
511 const pool = new FixedThreadPool(
512 max,
513 './tests/worker-files/thread/testWorker.mjs',
514 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
515 )
516 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
517 const promises = new Set()
518 const maxMultiplier = 2
519 for (let i = 0; i < max * maxMultiplier; i++) {
520 promises.add(pool.execute())
521 }
522 await Promise.all(promises)
523 for (const workerNode of pool.workerNodes) {
524 expect(workerNode.usage).toStrictEqual({
525 tasks: {
526 executed: expect.any(Number),
527 executing: 0,
528 queued: 0,
529 maxQueued: 0,
530 sequentiallyStolen: 0,
531 stolen: 0,
532 failed: 0
533 },
534 runTime: {
535 history: new CircularArray()
536 },
537 waitTime: {
538 history: new CircularArray()
539 },
540 elu: {
541 idle: {
542 history: new CircularArray()
543 },
544 active: {
545 history: new CircularArray()
546 }
547 }
548 })
549 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
550 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
551 max * maxMultiplier
552 )
553 }
554 expect(
555 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
556 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
557 ).nextWorkerNodeKey
558 ).toEqual(expect.any(Number))
559 expect(
560 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
561 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
562 ).previousWorkerNodeKey
563 ).toEqual(expect.any(Number))
564 // We need to clean up the resources after our test
565 await pool.destroy()
566 })
567
568 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
569 const pool = new DynamicThreadPool(
570 min,
571 max,
572 './tests/worker-files/thread/testWorker.mjs',
573 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
574 )
575 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
576 const promises = new Set()
577 const maxMultiplier = 2
578 for (let i = 0; i < max * maxMultiplier; i++) {
579 promises.add(pool.execute())
580 }
581 await Promise.all(promises)
582 for (const workerNode of pool.workerNodes) {
583 expect(workerNode.usage).toStrictEqual({
584 tasks: {
585 executed: expect.any(Number),
586 executing: 0,
587 queued: 0,
588 maxQueued: 0,
589 sequentiallyStolen: 0,
590 stolen: 0,
591 failed: 0
592 },
593 runTime: {
594 history: new CircularArray()
595 },
596 waitTime: {
597 history: new CircularArray()
598 },
599 elu: {
600 idle: {
601 history: new CircularArray()
602 },
603 active: {
604 history: new CircularArray()
605 }
606 }
607 })
608 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
609 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
610 max * maxMultiplier
611 )
612 }
613 expect(
614 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
615 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
616 ).nextWorkerNodeKey
617 ).toEqual(expect.any(Number))
618 expect(
619 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
620 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
621 ).previousWorkerNodeKey
622 ).toEqual(expect.any(Number))
623 // We need to clean up the resources after our test
624 await pool.destroy()
625 })
626
627 it('Verify LEAST_BUSY strategy default policy', async () => {
628 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
629 let pool = new FixedThreadPool(
630 max,
631 './tests/worker-files/thread/testWorker.mjs',
632 { workerChoiceStrategy }
633 )
634 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
635 dynamicWorkerUsage: false,
636 dynamicWorkerReady: true
637 })
638 await pool.destroy()
639 pool = new DynamicThreadPool(
640 min,
641 max,
642 './tests/worker-files/thread/testWorker.mjs',
643 { workerChoiceStrategy }
644 )
645 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
646 dynamicWorkerUsage: false,
647 dynamicWorkerReady: true
648 })
649 // We need to clean up the resources after our test
650 await pool.destroy()
651 })
652
653 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
654 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
655 let pool = new FixedThreadPool(
656 max,
657 './tests/worker-files/thread/testWorker.mjs',
658 { workerChoiceStrategy }
659 )
660 expect(
661 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
662 ).toStrictEqual({
663 runTime: {
664 aggregate: true,
665 average: false,
666 median: false
667 },
668 waitTime: {
669 aggregate: true,
670 average: false,
671 median: false
672 },
673 elu: {
674 aggregate: false,
675 average: false,
676 median: false
677 }
678 })
679 await pool.destroy()
680 pool = new DynamicThreadPool(
681 min,
682 max,
683 './tests/worker-files/thread/testWorker.mjs',
684 { workerChoiceStrategy }
685 )
686 expect(
687 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
688 ).toStrictEqual({
689 runTime: {
690 aggregate: true,
691 average: false,
692 median: false
693 },
694 waitTime: {
695 aggregate: true,
696 average: false,
697 median: false
698 },
699 elu: {
700 aggregate: false,
701 average: false,
702 median: false
703 }
704 })
705 // We need to clean up the resources after our test
706 await pool.destroy()
707 })
708
709 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
710 const pool = new FixedThreadPool(
711 max,
712 './tests/worker-files/thread/testWorker.mjs',
713 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
714 )
715 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
716 const promises = new Set()
717 const maxMultiplier = 2
718 for (let i = 0; i < max * maxMultiplier; i++) {
719 promises.add(pool.execute())
720 }
721 await Promise.all(promises)
722 for (const workerNode of pool.workerNodes) {
723 expect(workerNode.usage).toStrictEqual({
724 tasks: {
725 executed: expect.any(Number),
726 executing: 0,
727 queued: 0,
728 maxQueued: 0,
729 sequentiallyStolen: 0,
730 stolen: 0,
731 failed: 0
732 },
733 runTime: expect.objectContaining({
734 history: expect.any(CircularArray)
735 }),
736 waitTime: expect.objectContaining({
737 history: expect.any(CircularArray)
738 }),
739 elu: {
740 idle: {
741 history: new CircularArray()
742 },
743 active: {
744 history: new CircularArray()
745 }
746 }
747 })
748 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
749 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
750 max * maxMultiplier
751 )
752 if (workerNode.usage.runTime.aggregate == null) {
753 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
754 } else {
755 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
756 }
757 if (workerNode.usage.waitTime.aggregate == null) {
758 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
759 } else {
760 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
761 }
762 }
763 expect(
764 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
765 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
766 ).nextWorkerNodeKey
767 ).toEqual(expect.any(Number))
768 expect(
769 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
770 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
771 ).previousWorkerNodeKey
772 ).toEqual(expect.any(Number))
773 // We need to clean up the resources after our test
774 await pool.destroy()
775 })
776
777 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
778 const pool = new DynamicThreadPool(
779 min,
780 max,
781 './tests/worker-files/thread/testWorker.mjs',
782 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
783 )
784 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
785 const promises = new Set()
786 const maxMultiplier = 2
787 for (let i = 0; i < max * maxMultiplier; i++) {
788 promises.add(pool.execute())
789 }
790 await Promise.all(promises)
791 for (const workerNode of pool.workerNodes) {
792 expect(workerNode.usage).toStrictEqual({
793 tasks: {
794 executed: expect.any(Number),
795 executing: 0,
796 queued: 0,
797 maxQueued: 0,
798 sequentiallyStolen: 0,
799 stolen: 0,
800 failed: 0
801 },
802 runTime: expect.objectContaining({
803 history: expect.any(CircularArray)
804 }),
805 waitTime: expect.objectContaining({
806 history: expect.any(CircularArray)
807 }),
808 elu: {
809 idle: {
810 history: new CircularArray()
811 },
812 active: {
813 history: new CircularArray()
814 }
815 }
816 })
817 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
818 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
819 max * maxMultiplier
820 )
821 if (workerNode.usage.runTime.aggregate == null) {
822 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
823 } else {
824 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
825 }
826 if (workerNode.usage.waitTime.aggregate == null) {
827 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
828 } else {
829 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
830 }
831 }
832 expect(
833 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
834 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
835 ).nextWorkerNodeKey
836 ).toEqual(expect.any(Number))
837 expect(
838 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
839 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
840 ).previousWorkerNodeKey
841 ).toEqual(expect.any(Number))
842 // We need to clean up the resources after our test
843 await pool.destroy()
844 })
845
846 it('Verify LEAST_ELU strategy default policy', async () => {
847 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
848 let pool = new FixedThreadPool(
849 max,
850 './tests/worker-files/thread/testWorker.mjs',
851 { workerChoiceStrategy }
852 )
853 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
854 dynamicWorkerUsage: false,
855 dynamicWorkerReady: true
856 })
857 await pool.destroy()
858 pool = new DynamicThreadPool(
859 min,
860 max,
861 './tests/worker-files/thread/testWorker.mjs',
862 { workerChoiceStrategy }
863 )
864 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
865 dynamicWorkerUsage: false,
866 dynamicWorkerReady: true
867 })
868 // We need to clean up the resources after our test
869 await pool.destroy()
870 })
871
872 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
873 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
874 let pool = new FixedThreadPool(
875 max,
876 './tests/worker-files/thread/testWorker.mjs',
877 { workerChoiceStrategy }
878 )
879 expect(
880 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
881 ).toStrictEqual({
882 runTime: {
883 aggregate: false,
884 average: false,
885 median: false
886 },
887 waitTime: {
888 aggregate: false,
889 average: false,
890 median: false
891 },
892 elu: {
893 aggregate: true,
894 average: false,
895 median: false
896 }
897 })
898 await pool.destroy()
899 pool = new DynamicThreadPool(
900 min,
901 max,
902 './tests/worker-files/thread/testWorker.mjs',
903 { workerChoiceStrategy }
904 )
905 expect(
906 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
907 ).toStrictEqual({
908 runTime: {
909 aggregate: false,
910 average: false,
911 median: false
912 },
913 waitTime: {
914 aggregate: false,
915 average: false,
916 median: false
917 },
918 elu: {
919 aggregate: true,
920 average: false,
921 median: false
922 }
923 })
924 // We need to clean up the resources after our test
925 await pool.destroy()
926 })
927
928 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
929 const pool = new FixedThreadPool(
930 max,
931 './tests/worker-files/thread/testWorker.mjs',
932 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
933 )
934 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
935 const promises = new Set()
936 const maxMultiplier = 2
937 for (let i = 0; i < max * maxMultiplier; i++) {
938 promises.add(pool.execute())
939 }
940 await Promise.all(promises)
941 for (const workerNode of pool.workerNodes) {
942 expect(workerNode.usage).toStrictEqual({
943 tasks: {
944 executed: expect.any(Number),
945 executing: 0,
946 queued: 0,
947 maxQueued: 0,
948 sequentiallyStolen: 0,
949 stolen: 0,
950 failed: 0
951 },
952 runTime: {
953 history: new CircularArray()
954 },
955 waitTime: {
956 history: new CircularArray()
957 },
958 elu: expect.objectContaining({
959 idle: expect.objectContaining({
960 history: expect.any(CircularArray)
961 }),
962 active: expect.objectContaining({
963 history: expect.any(CircularArray)
964 })
965 })
966 })
967 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
968 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
969 max * maxMultiplier
970 )
971 if (workerNode.usage.elu.active.aggregate == null) {
972 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
973 } else {
974 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
975 }
976 if (workerNode.usage.elu.idle.aggregate == null) {
977 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
978 } else {
979 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
980 }
981 if (workerNode.usage.elu.utilization == null) {
982 expect(workerNode.usage.elu.utilization).toBeUndefined()
983 } else {
984 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
985 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
986 }
987 }
988 expect(
989 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
990 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
991 ).nextWorkerNodeKey
992 ).toEqual(expect.any(Number))
993 expect(
994 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
995 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
996 ).previousWorkerNodeKey
997 ).toEqual(expect.any(Number))
998 // We need to clean up the resources after our test
999 await pool.destroy()
1000 })
1001
1002 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1003 const pool = new DynamicThreadPool(
1004 min,
1005 max,
1006 './tests/worker-files/thread/testWorker.mjs',
1007 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1008 )
1009 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1010 const promises = new Set()
1011 const maxMultiplier = 2
1012 for (let i = 0; i < max * maxMultiplier; i++) {
1013 promises.add(pool.execute())
1014 }
1015 await Promise.all(promises)
1016 for (const workerNode of pool.workerNodes) {
1017 expect(workerNode.usage).toStrictEqual({
1018 tasks: {
1019 executed: expect.any(Number),
1020 executing: 0,
1021 queued: 0,
1022 maxQueued: 0,
1023 sequentiallyStolen: 0,
1024 stolen: 0,
1025 failed: 0
1026 },
1027 runTime: {
1028 history: new CircularArray()
1029 },
1030 waitTime: {
1031 history: new CircularArray()
1032 },
1033 elu: expect.objectContaining({
1034 idle: expect.objectContaining({
1035 history: expect.any(CircularArray)
1036 }),
1037 active: expect.objectContaining({
1038 history: expect.any(CircularArray)
1039 })
1040 })
1041 })
1042 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1043 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1044 max * maxMultiplier
1045 )
1046 if (workerNode.usage.elu.active.aggregate == null) {
1047 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1048 } else {
1049 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1050 }
1051 if (workerNode.usage.elu.idle.aggregate == null) {
1052 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1053 } else {
1054 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1055 }
1056 if (workerNode.usage.elu.utilization == null) {
1057 expect(workerNode.usage.elu.utilization).toBeUndefined()
1058 } else {
1059 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1060 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1061 }
1062 }
1063 expect(
1064 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1065 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1066 ).nextWorkerNodeKey
1067 ).toEqual(expect.any(Number))
1068 expect(
1069 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1070 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1071 ).previousWorkerNodeKey
1072 ).toEqual(expect.any(Number))
1073 // We need to clean up the resources after our test
1074 await pool.destroy()
1075 })
1076
1077 it('Verify FAIR_SHARE strategy default policy', async () => {
1078 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1079 let pool = new FixedThreadPool(
1080 max,
1081 './tests/worker-files/thread/testWorker.mjs',
1082 { workerChoiceStrategy }
1083 )
1084 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1085 dynamicWorkerUsage: false,
1086 dynamicWorkerReady: true
1087 })
1088 await pool.destroy()
1089 pool = new DynamicThreadPool(
1090 min,
1091 max,
1092 './tests/worker-files/thread/testWorker.mjs',
1093 { workerChoiceStrategy }
1094 )
1095 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1096 dynamicWorkerUsage: false,
1097 dynamicWorkerReady: true
1098 })
1099 // We need to clean up the resources after our test
1100 await pool.destroy()
1101 })
1102
1103 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1104 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1105 let pool = new FixedThreadPool(
1106 max,
1107 './tests/worker-files/thread/testWorker.mjs',
1108 { workerChoiceStrategy }
1109 )
1110 expect(
1111 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1112 ).toStrictEqual({
1113 runTime: {
1114 aggregate: true,
1115 average: true,
1116 median: false
1117 },
1118 waitTime: {
1119 aggregate: false,
1120 average: false,
1121 median: false
1122 },
1123 elu: {
1124 aggregate: true,
1125 average: true,
1126 median: false
1127 }
1128 })
1129 await pool.destroy()
1130 pool = new DynamicThreadPool(
1131 min,
1132 max,
1133 './tests/worker-files/thread/testWorker.mjs',
1134 { workerChoiceStrategy }
1135 )
1136 expect(
1137 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1138 ).toStrictEqual({
1139 runTime: {
1140 aggregate: true,
1141 average: true,
1142 median: false
1143 },
1144 waitTime: {
1145 aggregate: false,
1146 average: false,
1147 median: false
1148 },
1149 elu: {
1150 aggregate: true,
1151 average: true,
1152 median: false
1153 }
1154 })
1155 // We need to clean up the resources after our test
1156 await pool.destroy()
1157 })
1158
1159 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1160 const pool = new FixedThreadPool(
1161 max,
1162 './tests/worker-files/thread/testWorker.mjs',
1163 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1164 )
1165 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1166 const promises = new Set()
1167 const maxMultiplier = 2
1168 for (let i = 0; i < max * maxMultiplier; i++) {
1169 promises.add(pool.execute())
1170 }
1171 await Promise.all(promises)
1172 for (const workerNode of pool.workerNodes) {
1173 expect(workerNode.usage).toStrictEqual({
1174 tasks: {
1175 executed: expect.any(Number),
1176 executing: 0,
1177 queued: 0,
1178 maxQueued: 0,
1179 sequentiallyStolen: 0,
1180 stolen: 0,
1181 failed: 0
1182 },
1183 runTime: expect.objectContaining({
1184 history: expect.any(CircularArray)
1185 }),
1186 waitTime: {
1187 history: new CircularArray()
1188 },
1189 elu: expect.objectContaining({
1190 idle: expect.objectContaining({
1191 history: expect.any(CircularArray)
1192 }),
1193 active: expect.objectContaining({
1194 history: expect.any(CircularArray)
1195 })
1196 })
1197 })
1198 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1199 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1200 max * maxMultiplier
1201 )
1202 if (workerNode.usage.runTime.aggregate == null) {
1203 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1204 } else {
1205 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1206 }
1207 if (workerNode.usage.runTime.average == null) {
1208 expect(workerNode.usage.runTime.average).toBeUndefined()
1209 } else {
1210 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1211 }
1212 if (workerNode.usage.elu.active.aggregate == null) {
1213 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1214 } else {
1215 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1216 }
1217 if (workerNode.usage.elu.idle.aggregate == null) {
1218 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1219 } else {
1220 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1221 }
1222 if (workerNode.usage.elu.utilization == null) {
1223 expect(workerNode.usage.elu.utilization).toBeUndefined()
1224 } else {
1225 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1226 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1227 }
1228 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1229 }
1230 expect(
1231 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1232 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1233 ).nextWorkerNodeKey
1234 ).toEqual(expect.any(Number))
1235 expect(
1236 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1237 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1238 ).previousWorkerNodeKey
1239 ).toEqual(expect.any(Number))
1240 // We need to clean up the resources after our test
1241 await pool.destroy()
1242 })
1243
1244 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1245 const pool = new DynamicThreadPool(
1246 min,
1247 max,
1248 './tests/worker-files/thread/testWorker.mjs',
1249 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1250 )
1251 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1252 const promises = new Set()
1253 const maxMultiplier = 2
1254 for (let i = 0; i < max * maxMultiplier; i++) {
1255 promises.add(pool.execute())
1256 }
1257 await Promise.all(promises)
1258 for (const workerNode of pool.workerNodes) {
1259 expect(workerNode.usage).toStrictEqual({
1260 tasks: {
1261 executed: expect.any(Number),
1262 executing: 0,
1263 queued: 0,
1264 maxQueued: 0,
1265 sequentiallyStolen: 0,
1266 stolen: 0,
1267 failed: 0
1268 },
1269 runTime: expect.objectContaining({
1270 history: expect.any(CircularArray)
1271 }),
1272 waitTime: {
1273 history: new CircularArray()
1274 },
1275 elu: expect.objectContaining({
1276 idle: expect.objectContaining({
1277 history: expect.any(CircularArray)
1278 }),
1279 active: expect.objectContaining({
1280 history: expect.any(CircularArray)
1281 })
1282 })
1283 })
1284 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1285 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1286 max * maxMultiplier
1287 )
1288 if (workerNode.usage.runTime.aggregate == null) {
1289 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1290 } else {
1291 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1292 }
1293 if (workerNode.usage.runTime.average == null) {
1294 expect(workerNode.usage.runTime.average).toBeUndefined()
1295 } else {
1296 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1297 }
1298 if (workerNode.usage.elu.active.aggregate == null) {
1299 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1300 } else {
1301 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1302 }
1303 if (workerNode.usage.elu.idle.aggregate == null) {
1304 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1305 } else {
1306 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1307 }
1308 if (workerNode.usage.elu.utilization == null) {
1309 expect(workerNode.usage.elu.utilization).toBeUndefined()
1310 } else {
1311 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1312 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1313 }
1314 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1315 }
1316 expect(
1317 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1318 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1319 ).nextWorkerNodeKey
1320 ).toEqual(expect.any(Number))
1321 expect(
1322 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1323 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1324 ).previousWorkerNodeKey
1325 ).toEqual(expect.any(Number))
1326 // We need to clean up the resources after our test
1327 await pool.destroy()
1328 })
1329
1330 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1331 const pool = new DynamicThreadPool(
1332 min,
1333 max,
1334 './tests/worker-files/thread/testWorker.mjs',
1335 {
1336 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1337 workerChoiceStrategyOptions: {
1338 runTime: { median: true }
1339 }
1340 }
1341 )
1342 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1343 const promises = new Set()
1344 const maxMultiplier = 2
1345 for (let i = 0; i < max * maxMultiplier; i++) {
1346 promises.add(pool.execute())
1347 }
1348 await Promise.all(promises)
1349 for (const workerNode of pool.workerNodes) {
1350 expect(workerNode.usage).toStrictEqual({
1351 tasks: {
1352 executed: expect.any(Number),
1353 executing: 0,
1354 queued: 0,
1355 maxQueued: 0,
1356 sequentiallyStolen: 0,
1357 stolen: 0,
1358 failed: 0
1359 },
1360 runTime: expect.objectContaining({
1361 history: expect.any(CircularArray)
1362 }),
1363 waitTime: {
1364 history: new CircularArray()
1365 },
1366 elu: expect.objectContaining({
1367 idle: expect.objectContaining({
1368 history: expect.any(CircularArray)
1369 }),
1370 active: expect.objectContaining({
1371 history: expect.any(CircularArray)
1372 })
1373 })
1374 })
1375 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1376 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1377 max * maxMultiplier
1378 )
1379 if (workerNode.usage.runTime.aggregate == null) {
1380 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1381 } else {
1382 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1383 }
1384 if (workerNode.usage.runTime.median == null) {
1385 expect(workerNode.usage.runTime.median).toBeUndefined()
1386 } else {
1387 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1388 }
1389 if (workerNode.usage.elu.active.aggregate == null) {
1390 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1391 } else {
1392 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1393 }
1394 if (workerNode.usage.elu.idle.aggregate == null) {
1395 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1396 } else {
1397 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1398 }
1399 if (workerNode.usage.elu.utilization == null) {
1400 expect(workerNode.usage.elu.utilization).toBeUndefined()
1401 } else {
1402 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1403 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1404 }
1405 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1406 }
1407 expect(
1408 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1409 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1410 ).nextWorkerNodeKey
1411 ).toEqual(expect.any(Number))
1412 expect(
1413 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1414 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1415 ).previousWorkerNodeKey
1416 ).toEqual(expect.any(Number))
1417 // We need to clean up the resources after our test
1418 await pool.destroy()
1419 })
1420
1421 it("Verify FAIR_SHARE strategy internals aren't reset after setting it", async () => {
1422 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1423 let pool = new FixedThreadPool(
1424 max,
1425 './tests/worker-files/thread/testWorker.mjs'
1426 )
1427 for (const workerNode of pool.workerNodes) {
1428 workerNode.strategyData = {
1429 virtualTaskEndTimestamp: performance.now()
1430 }
1431 }
1432 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1433 for (const workerNode of pool.workerNodes) {
1434 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1435 }
1436 await pool.destroy()
1437 pool = new DynamicThreadPool(
1438 min,
1439 max,
1440 './tests/worker-files/thread/testWorker.mjs'
1441 )
1442 for (const workerNode of pool.workerNodes) {
1443 workerNode.strategyData = {
1444 virtualTaskEndTimestamp: performance.now()
1445 }
1446 }
1447 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1448 for (const workerNode of pool.workerNodes) {
1449 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1450 }
1451 // We need to clean up the resources after our test
1452 await pool.destroy()
1453 })
1454
1455 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1456 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1457 let pool = new FixedThreadPool(
1458 max,
1459 './tests/worker-files/thread/testWorker.mjs',
1460 { workerChoiceStrategy }
1461 )
1462 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1463 dynamicWorkerUsage: false,
1464 dynamicWorkerReady: true
1465 })
1466 await pool.destroy()
1467 pool = new DynamicThreadPool(
1468 min,
1469 max,
1470 './tests/worker-files/thread/testWorker.mjs',
1471 { workerChoiceStrategy }
1472 )
1473 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1474 dynamicWorkerUsage: false,
1475 dynamicWorkerReady: true
1476 })
1477 // We need to clean up the resources after our test
1478 await pool.destroy()
1479 })
1480
1481 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1482 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1483 let pool = new FixedThreadPool(
1484 max,
1485 './tests/worker-files/thread/testWorker.mjs',
1486 { workerChoiceStrategy }
1487 )
1488 expect(
1489 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1490 ).toStrictEqual({
1491 runTime: {
1492 aggregate: true,
1493 average: true,
1494 median: false
1495 },
1496 waitTime: {
1497 aggregate: false,
1498 average: false,
1499 median: false
1500 },
1501 elu: {
1502 aggregate: false,
1503 average: false,
1504 median: false
1505 }
1506 })
1507 await pool.destroy()
1508 pool = new DynamicThreadPool(
1509 min,
1510 max,
1511 './tests/worker-files/thread/testWorker.mjs',
1512 { workerChoiceStrategy }
1513 )
1514 expect(
1515 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1516 ).toStrictEqual({
1517 runTime: {
1518 aggregate: true,
1519 average: true,
1520 median: false
1521 },
1522 waitTime: {
1523 aggregate: false,
1524 average: false,
1525 median: false
1526 },
1527 elu: {
1528 aggregate: false,
1529 average: false,
1530 median: false
1531 }
1532 })
1533 // We need to clean up the resources after our test
1534 await pool.destroy()
1535 })
1536
1537 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1538 const pool = new FixedThreadPool(
1539 max,
1540 './tests/worker-files/thread/testWorker.mjs',
1541 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1542 )
1543 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1544 const promises = new Set()
1545 const maxMultiplier = 2
1546 for (let i = 0; i < max * maxMultiplier; i++) {
1547 promises.add(pool.execute())
1548 }
1549 await Promise.all(promises)
1550 for (const workerNode of pool.workerNodes) {
1551 expect(workerNode.usage).toStrictEqual({
1552 tasks: {
1553 executed: expect.any(Number),
1554 executing: 0,
1555 queued: 0,
1556 maxQueued: 0,
1557 sequentiallyStolen: 0,
1558 stolen: 0,
1559 failed: 0
1560 },
1561 runTime: expect.objectContaining({
1562 history: expect.any(CircularArray)
1563 }),
1564 waitTime: {
1565 history: new CircularArray()
1566 },
1567 elu: {
1568 idle: {
1569 history: new CircularArray()
1570 },
1571 active: {
1572 history: new CircularArray()
1573 }
1574 }
1575 })
1576 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1577 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1578 max * maxMultiplier
1579 )
1580 if (workerNode.usage.runTime.aggregate == null) {
1581 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1582 } else {
1583 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1584 }
1585 if (workerNode.usage.runTime.average == null) {
1586 expect(workerNode.usage.runTime.average).toBeUndefined()
1587 } else {
1588 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1589 }
1590 }
1591 expect(
1592 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1593 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1594 ).nextWorkerNodeKey
1595 ).toBe(0)
1596 expect(
1597 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1598 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1599 ).previousWorkerNodeKey
1600 ).toEqual(0)
1601 expect(
1602 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1603 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1604 ).workerNodeVirtualTaskRunTime
1605 ).toBeGreaterThanOrEqual(0)
1606 // We need to clean up the resources after our test
1607 await pool.destroy()
1608 })
1609
1610 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1611 const pool = new DynamicThreadPool(
1612 min,
1613 max,
1614 './tests/worker-files/thread/testWorker.mjs',
1615 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1616 )
1617 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1618 const promises = new Set()
1619 const maxMultiplier = 2
1620 for (let i = 0; i < max * maxMultiplier; i++) {
1621 promises.add(pool.execute())
1622 }
1623 await Promise.all(promises)
1624 for (const workerNode of pool.workerNodes) {
1625 expect(workerNode.usage).toStrictEqual({
1626 tasks: {
1627 executed: expect.any(Number),
1628 executing: 0,
1629 queued: 0,
1630 maxQueued: 0,
1631 sequentiallyStolen: 0,
1632 stolen: 0,
1633 failed: 0
1634 },
1635 runTime: expect.objectContaining({
1636 history: expect.any(CircularArray)
1637 }),
1638 waitTime: {
1639 history: new CircularArray()
1640 },
1641 elu: {
1642 idle: {
1643 history: new CircularArray()
1644 },
1645 active: {
1646 history: new CircularArray()
1647 }
1648 }
1649 })
1650 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1651 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1652 max * maxMultiplier
1653 )
1654 if (workerNode.usage.runTime.aggregate == null) {
1655 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1656 } else {
1657 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1658 }
1659 if (workerNode.usage.runTime.average == null) {
1660 expect(workerNode.usage.runTime.average).toBeUndefined()
1661 } else {
1662 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1663 }
1664 }
1665 expect(
1666 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1667 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1668 ).nextWorkerNodeKey
1669 ).toEqual(0)
1670 expect(
1671 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1672 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1673 ).previousWorkerNodeKey
1674 ).toEqual(0)
1675 expect(
1676 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1677 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1678 ).workerNodeVirtualTaskRunTime
1679 ).toBeGreaterThanOrEqual(0)
1680 // We need to clean up the resources after our test
1681 await pool.destroy()
1682 })
1683
1684 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1685 const pool = new DynamicThreadPool(
1686 min,
1687 max,
1688 './tests/worker-files/thread/testWorker.mjs',
1689 {
1690 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1691 workerChoiceStrategyOptions: {
1692 runTime: { median: true }
1693 }
1694 }
1695 )
1696 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1697 const promises = new Set()
1698 const maxMultiplier = 2
1699 for (let i = 0; i < max * maxMultiplier; i++) {
1700 promises.add(pool.execute())
1701 }
1702 await Promise.all(promises)
1703 for (const workerNode of pool.workerNodes) {
1704 expect(workerNode.usage).toStrictEqual({
1705 tasks: {
1706 executed: expect.any(Number),
1707 executing: 0,
1708 queued: 0,
1709 maxQueued: 0,
1710 sequentiallyStolen: 0,
1711 stolen: 0,
1712 failed: 0
1713 },
1714 runTime: expect.objectContaining({
1715 history: expect.any(CircularArray)
1716 }),
1717 waitTime: {
1718 history: new CircularArray()
1719 },
1720 elu: {
1721 idle: {
1722 history: new CircularArray()
1723 },
1724 active: {
1725 history: new CircularArray()
1726 }
1727 }
1728 })
1729 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1730 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1731 max * maxMultiplier
1732 )
1733 if (workerNode.usage.runTime.aggregate == null) {
1734 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1735 } else {
1736 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1737 }
1738 if (workerNode.usage.runTime.median == null) {
1739 expect(workerNode.usage.runTime.median).toBeUndefined()
1740 } else {
1741 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1742 }
1743 }
1744 expect(
1745 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1746 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1747 ).nextWorkerNodeKey
1748 ).toEqual(0)
1749 expect(
1750 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1751 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1752 ).previousWorkerNodeKey
1753 ).toEqual(0)
1754 expect(
1755 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1756 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1757 ).workerNodeVirtualTaskRunTime
1758 ).toBeGreaterThanOrEqual(0)
1759 // We need to clean up the resources after our test
1760 await pool.destroy()
1761 })
1762
1763 it("Verify WEIGHTED_ROUND_ROBIN strategy internals aren't reset after setting it", async () => {
1764 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1765 let pool = new FixedThreadPool(
1766 max,
1767 './tests/worker-files/thread/testWorker.mjs',
1768 { workerChoiceStrategy }
1769 )
1770 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1771 workerChoiceStrategy
1772 ).nextWorkerNodeKey = randomInt(1, max - 1)
1773 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1774 workerChoiceStrategy
1775 ).previousWorkerNodeKey = randomInt(1, max - 1)
1776 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1777 workerChoiceStrategy
1778 ).workerNodeVirtualTaskRunTime = randomInt(100, 1000)
1779 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1780 expect(
1781 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1782 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1783 ).nextWorkerNodeKey
1784 ).toBeGreaterThan(0)
1785 expect(
1786 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1787 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1788 ).previousWorkerNodeKey
1789 ).toBeGreaterThan(0)
1790 expect(
1791 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1792 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1793 ).workerNodeVirtualTaskRunTime
1794 ).toBeGreaterThan(99)
1795 await pool.destroy()
1796 pool = new DynamicThreadPool(
1797 min,
1798 max,
1799 './tests/worker-files/thread/testWorker.mjs',
1800 { workerChoiceStrategy }
1801 )
1802 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1803 workerChoiceStrategy
1804 ).nextWorkerNodeKey = randomInt(1, max - 1)
1805 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1806 workerChoiceStrategy
1807 ).previousWorkerNodeKey = randomInt(1, max - 1)
1808 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1809 workerChoiceStrategy
1810 ).workerNodeVirtualTaskRunTime = randomInt(100, 1000)
1811 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1812 expect(
1813 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1814 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1815 ).nextWorkerNodeKey
1816 ).toBeGreaterThan(0)
1817 expect(
1818 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1819 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1820 ).previousWorkerNodeKey
1821 ).toBeGreaterThan(0)
1822 expect(
1823 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1824 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1825 ).workerNodeVirtualTaskRunTime
1826 ).toBeGreaterThan(99)
1827 // We need to clean up the resources after our test
1828 await pool.destroy()
1829 })
1830
1831 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1832 const workerChoiceStrategy =
1833 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1834 let pool = new FixedThreadPool(
1835 max,
1836 './tests/worker-files/thread/testWorker.mjs',
1837 { workerChoiceStrategy }
1838 )
1839 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1840 dynamicWorkerUsage: false,
1841 dynamicWorkerReady: true
1842 })
1843 await pool.destroy()
1844 pool = new DynamicThreadPool(
1845 min,
1846 max,
1847 './tests/worker-files/thread/testWorker.mjs',
1848 { workerChoiceStrategy }
1849 )
1850 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1851 dynamicWorkerUsage: false,
1852 dynamicWorkerReady: true
1853 })
1854 // We need to clean up the resources after our test
1855 await pool.destroy()
1856 })
1857
1858 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1859 const workerChoiceStrategy =
1860 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1861 let pool = new FixedThreadPool(
1862 max,
1863 './tests/worker-files/thread/testWorker.mjs',
1864 { workerChoiceStrategy }
1865 )
1866 expect(
1867 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1868 ).toStrictEqual({
1869 runTime: {
1870 aggregate: true,
1871 average: true,
1872 median: false
1873 },
1874 waitTime: {
1875 aggregate: false,
1876 average: false,
1877 median: false
1878 },
1879 elu: {
1880 aggregate: false,
1881 average: false,
1882 median: false
1883 }
1884 })
1885 await pool.destroy()
1886 pool = new DynamicThreadPool(
1887 min,
1888 max,
1889 './tests/worker-files/thread/testWorker.mjs',
1890 { workerChoiceStrategy }
1891 )
1892 expect(
1893 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1894 ).toStrictEqual({
1895 runTime: {
1896 aggregate: true,
1897 average: true,
1898 median: false
1899 },
1900 waitTime: {
1901 aggregate: false,
1902 average: false,
1903 median: false
1904 },
1905 elu: {
1906 aggregate: false,
1907 average: false,
1908 median: false
1909 }
1910 })
1911 // We need to clean up the resources after our test
1912 await pool.destroy()
1913 })
1914
1915 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1916 const pool = new FixedThreadPool(
1917 max,
1918 './tests/worker-files/thread/testWorker.mjs',
1919 {
1920 workerChoiceStrategy:
1921 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1922 }
1923 )
1924 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1925 const promises = new Set()
1926 const maxMultiplier = 2
1927 for (let i = 0; i < max * maxMultiplier; i++) {
1928 promises.add(pool.execute())
1929 }
1930 await Promise.all(promises)
1931 for (const workerNode of pool.workerNodes) {
1932 expect(workerNode.usage).toStrictEqual({
1933 tasks: {
1934 executed: expect.any(Number),
1935 executing: 0,
1936 queued: 0,
1937 maxQueued: 0,
1938 sequentiallyStolen: 0,
1939 stolen: 0,
1940 failed: 0
1941 },
1942 runTime: expect.objectContaining({
1943 history: expect.any(CircularArray)
1944 }),
1945 waitTime: {
1946 history: new CircularArray()
1947 },
1948 elu: {
1949 idle: {
1950 history: new CircularArray()
1951 },
1952 active: {
1953 history: new CircularArray()
1954 }
1955 }
1956 })
1957 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1958 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1959 max * maxMultiplier
1960 )
1961 }
1962 expect(
1963 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1964 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1965 ).roundId
1966 ).toBe(0)
1967 expect(
1968 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1969 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1970 ).workerNodeId
1971 ).toBe(0)
1972 expect(
1973 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1974 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1975 ).nextWorkerNodeKey
1976 ).toBe(0)
1977 expect(
1978 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1979 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1980 ).previousWorkerNodeKey
1981 ).toEqual(0)
1982 expect(
1983 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1984 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1985 ).roundWeights.length
1986 ).toBe(1)
1987 expect(
1988 Number.isSafeInteger(
1989 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1990 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1991 ).roundWeights[0]
1992 )
1993 ).toBe(true)
1994 // We need to clean up the resources after our test
1995 await pool.destroy()
1996 })
1997
1998 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1999 const pool = new DynamicThreadPool(
2000 min,
2001 max,
2002 './tests/worker-files/thread/testWorker.mjs',
2003 {
2004 workerChoiceStrategy:
2005 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2006 }
2007 )
2008 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2009 const promises = new Set()
2010 const maxMultiplier = 2
2011 for (let i = 0; i < max * maxMultiplier; i++) {
2012 promises.add(pool.execute())
2013 }
2014 await Promise.all(promises)
2015 for (const workerNode of pool.workerNodes) {
2016 expect(workerNode.usage).toStrictEqual({
2017 tasks: {
2018 executed: expect.any(Number),
2019 executing: 0,
2020 queued: 0,
2021 maxQueued: 0,
2022 sequentiallyStolen: 0,
2023 stolen: 0,
2024 failed: 0
2025 },
2026 runTime: expect.objectContaining({
2027 history: expect.any(CircularArray)
2028 }),
2029 waitTime: {
2030 history: new CircularArray()
2031 },
2032 elu: {
2033 idle: {
2034 history: new CircularArray()
2035 },
2036 active: {
2037 history: new CircularArray()
2038 }
2039 }
2040 })
2041 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2042 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2043 max * maxMultiplier
2044 )
2045 }
2046 expect(
2047 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2048 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2049 ).roundId
2050 ).toBe(0)
2051 expect(
2052 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2053 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2054 ).workerNodeId
2055 ).toBe(0)
2056 expect(
2057 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2058 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2059 ).nextWorkerNodeKey
2060 ).toBe(0)
2061 expect(
2062 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2063 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2064 ).previousWorkerNodeKey
2065 ).toEqual(0)
2066 expect(
2067 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2068 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2069 ).roundWeights.length
2070 ).toBe(1)
2071 expect(
2072 Number.isSafeInteger(
2073 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2074 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2075 ).roundWeights[0]
2076 )
2077 ).toBe(true)
2078 // We need to clean up the resources after our test
2079 await pool.destroy()
2080 })
2081
2082 it("Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals aren't resets after setting it", async () => {
2083 const workerChoiceStrategy =
2084 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2085 let pool = new FixedThreadPool(
2086 max,
2087 './tests/worker-files/thread/testWorker.mjs',
2088 { workerChoiceStrategy }
2089 )
2090 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2091 workerChoiceStrategy
2092 ).roundId = randomInt(1, max - 1)
2093 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2094 workerChoiceStrategy
2095 ).workerNodeId = randomInt(1, max - 1)
2096 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2097 workerChoiceStrategy
2098 ).nextWorkerNodeKey = randomInt(1, max - 1)
2099 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2100 workerChoiceStrategy
2101 ).previousWorkerNodeKey = randomInt(1, max - 1)
2102 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2103 workerChoiceStrategy
2104 ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)]
2105 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2106 expect(
2107 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2108 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2109 ).roundId
2110 ).toBeGreaterThan(0)
2111 expect(
2112 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2113 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2114 ).workerNodeId
2115 ).toBeGreaterThan(0)
2116 expect(
2117 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2118 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2119 ).nextWorkerNodeKey
2120 ).toBeGreaterThan(0)
2121 expect(
2122 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2123 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2124 ).previousWorkerNodeKey
2125 ).toBeGreaterThan(0)
2126 expect(
2127 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2128 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2129 ).roundWeights.length
2130 ).toBeGreaterThan(1)
2131 expect(
2132 Number.isSafeInteger(
2133 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2134 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2135 ).roundWeights[0]
2136 )
2137 ).toBe(true)
2138 await pool.destroy()
2139 pool = new DynamicThreadPool(
2140 min,
2141 max,
2142 './tests/worker-files/thread/testWorker.mjs',
2143 { workerChoiceStrategy }
2144 )
2145 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2146 workerChoiceStrategy
2147 ).roundId = randomInt(1, max - 1)
2148 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2149 workerChoiceStrategy
2150 ).workerNodeId = randomInt(1, max - 1)
2151 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2152 workerChoiceStrategy
2153 ).nextWorkerNodeKey = randomInt(1, max - 1)
2154 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2155 workerChoiceStrategy
2156 ).previousWorkerNodeKey = randomInt(1, max - 1)
2157 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2158 workerChoiceStrategy
2159 ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)]
2160 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2161 expect(
2162 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2163 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2164 ).roundId
2165 ).toBeGreaterThan(0)
2166 expect(
2167 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2168 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2169 ).workerNodeId
2170 ).toBeGreaterThan(0)
2171 expect(
2172 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2173 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2174 ).nextWorkerNodeKey
2175 ).toBeGreaterThan(0)
2176 expect(
2177 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2178 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2179 ).previousWorkerNodeKey
2180 ).toBeGreaterThan(0)
2181 expect(
2182 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2183 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2184 ).roundWeights.length
2185 ).toBeGreaterThan(1)
2186 expect(
2187 Number.isSafeInteger(
2188 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2189 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2190 ).roundWeights[0]
2191 )
2192 ).toBe(true)
2193 // We need to clean up the resources after our test
2194 await pool.destroy()
2195 })
2196
2197 it('Verify unknown strategy throw error', () => {
2198 expect(
2199 () =>
2200 new DynamicThreadPool(
2201 min,
2202 max,
2203 './tests/worker-files/thread/testWorker.mjs',
2204 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2205 )
2206 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2207 })
2208 })