test: switch thread worker to ESM
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.mjs
1 import { expect } from 'expect'
2 import {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.mjs'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.mjs'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.mjs'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 expect(
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
118 workerChoiceStrategy
119 ).nextWorkerNodeKey
120 ).toBe(0)
121 expect(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 workerChoiceStrategy
124 ).previousWorkerNodeKey
125 ).toBe(0)
126 if (
127 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
128 ) {
129 expect(
130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
131 workerChoiceStrategy
132 ).defaultWorkerWeight
133 ).toBeGreaterThan(0)
134 expect(
135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
136 workerChoiceStrategy
137 ).workerNodeVirtualTaskRunTime
138 ).toBe(0)
139 } else if (
140 workerChoiceStrategy ===
141 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
142 ) {
143 expect(
144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
145 workerChoiceStrategy
146 ).defaultWorkerWeight
147 ).toBeGreaterThan(0)
148 expect(
149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
150 workerChoiceStrategy
151 ).workerNodeVirtualTaskRunTime
152 ).toBe(0)
153 expect(
154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
155 workerChoiceStrategy
156 ).roundId
157 ).toBe(0)
158 expect(
159 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
160 workerChoiceStrategy
161 ).workerNodeId
162 ).toBe(0)
163 expect(
164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
165 workerChoiceStrategy
166 ).roundWeights
167 ).toStrictEqual([
168 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
169 workerChoiceStrategy
170 ).defaultWorkerWeight
171 ])
172 }
173 }
174 await pool.destroy()
175 })
176
177 it('Verify ROUND_ROBIN strategy default policy', async () => {
178 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
179 let pool = new FixedThreadPool(
180 max,
181 './tests/worker-files/thread/testWorker.mjs',
182 { workerChoiceStrategy }
183 )
184 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
185 dynamicWorkerUsage: false,
186 dynamicWorkerReady: true
187 })
188 await pool.destroy()
189 pool = new DynamicThreadPool(
190 min,
191 max,
192 './tests/worker-files/thread/testWorker.mjs',
193 { workerChoiceStrategy }
194 )
195 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
196 dynamicWorkerUsage: false,
197 dynamicWorkerReady: true
198 })
199 // We need to clean up the resources after our test
200 await pool.destroy()
201 })
202
203 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
204 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
205 let pool = new FixedThreadPool(
206 max,
207 './tests/worker-files/thread/testWorker.mjs',
208 { workerChoiceStrategy }
209 )
210 expect(
211 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
212 ).toStrictEqual({
213 runTime: {
214 aggregate: false,
215 average: false,
216 median: false
217 },
218 waitTime: {
219 aggregate: false,
220 average: false,
221 median: false
222 },
223 elu: {
224 aggregate: false,
225 average: false,
226 median: false
227 }
228 })
229 await pool.destroy()
230 pool = new DynamicThreadPool(
231 min,
232 max,
233 './tests/worker-files/thread/testWorker.mjs',
234 { workerChoiceStrategy }
235 )
236 expect(
237 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
238 ).toStrictEqual({
239 runTime: {
240 aggregate: false,
241 average: false,
242 median: false
243 },
244 waitTime: {
245 aggregate: false,
246 average: false,
247 median: false
248 },
249 elu: {
250 aggregate: false,
251 average: false,
252 median: false
253 }
254 })
255 // We need to clean up the resources after our test
256 await pool.destroy()
257 })
258
259 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
260 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
261 const pool = new FixedThreadPool(
262 max,
263 './tests/worker-files/thread/testWorker.mjs',
264 { workerChoiceStrategy }
265 )
266 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
267 const promises = new Set()
268 const maxMultiplier = 2
269 for (let i = 0; i < max * maxMultiplier; i++) {
270 promises.add(pool.execute())
271 }
272 await Promise.all(promises)
273 for (const workerNode of pool.workerNodes) {
274 expect(workerNode.usage).toStrictEqual({
275 tasks: {
276 executed: maxMultiplier,
277 executing: 0,
278 queued: 0,
279 maxQueued: 0,
280 stolen: 0,
281 failed: 0
282 },
283 runTime: {
284 history: new CircularArray()
285 },
286 waitTime: {
287 history: new CircularArray()
288 },
289 elu: {
290 idle: {
291 history: new CircularArray()
292 },
293 active: {
294 history: new CircularArray()
295 }
296 }
297 })
298 }
299 expect(
300 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
301 pool.workerChoiceStrategyContext.workerChoiceStrategy
302 ).nextWorkerNodeKey
303 ).toBe(0)
304 expect(
305 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
306 pool.workerChoiceStrategyContext.workerChoiceStrategy
307 ).previousWorkerNodeKey
308 ).toBe(pool.workerNodes.length - 1)
309 // We need to clean up the resources after our test
310 await pool.destroy()
311 })
312
313 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
314 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
315 const pool = new DynamicThreadPool(
316 min,
317 max,
318 './tests/worker-files/thread/testWorker.mjs',
319 { workerChoiceStrategy }
320 )
321 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
322 const promises = new Set()
323 const maxMultiplier = 2
324 for (let i = 0; i < max * maxMultiplier; i++) {
325 promises.add(pool.execute())
326 }
327 await Promise.all(promises)
328 for (const workerNode of pool.workerNodes) {
329 expect(workerNode.usage).toStrictEqual({
330 tasks: {
331 executed: expect.any(Number),
332 executing: 0,
333 queued: 0,
334 maxQueued: 0,
335 stolen: 0,
336 failed: 0
337 },
338 runTime: {
339 history: new CircularArray()
340 },
341 waitTime: {
342 history: new CircularArray()
343 },
344 elu: {
345 idle: {
346 history: new CircularArray()
347 },
348 active: {
349 history: new CircularArray()
350 }
351 }
352 })
353 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
354 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
355 max * maxMultiplier
356 )
357 }
358 expect(
359 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
360 pool.workerChoiceStrategyContext.workerChoiceStrategy
361 ).nextWorkerNodeKey
362 ).toBe(0)
363 expect(
364 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
365 pool.workerChoiceStrategyContext.workerChoiceStrategy
366 ).previousWorkerNodeKey
367 ).toBe(pool.workerNodes.length - 1)
368 // We need to clean up the resources after our test
369 await pool.destroy()
370 })
371
372 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
373 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
374 let pool = new FixedClusterPool(
375 max,
376 './tests/worker-files/cluster/testWorker.js',
377 { workerChoiceStrategy }
378 )
379 let results = new Set()
380 for (let i = 0; i < max; i++) {
381 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
382 }
383 expect(results.size).toBe(max)
384 await pool.destroy()
385 pool = new FixedThreadPool(
386 max,
387 './tests/worker-files/thread/testWorker.mjs',
388 { workerChoiceStrategy }
389 )
390 results = new Set()
391 for (let i = 0; i < max; i++) {
392 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
393 }
394 expect(results.size).toBe(max)
395 await pool.destroy()
396 })
397
398 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
399 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
400 let pool = new FixedThreadPool(
401 max,
402 './tests/worker-files/thread/testWorker.mjs',
403 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
404 )
405 expect(
406 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
407 pool.workerChoiceStrategyContext.workerChoiceStrategy
408 ).nextWorkerNodeKey
409 ).toBeDefined()
410 expect(
411 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
412 pool.workerChoiceStrategyContext.workerChoiceStrategy
413 ).previousWorkerNodeKey
414 ).toBeDefined()
415 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
416 expect(
417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
418 pool.workerChoiceStrategyContext.workerChoiceStrategy
419 ).nextWorkerNodeKey
420 ).toBe(0)
421 expect(
422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
423 pool.workerChoiceStrategyContext.workerChoiceStrategy
424 ).previousWorkerNodeKey
425 ).toBe(0)
426 await pool.destroy()
427 pool = new DynamicThreadPool(
428 min,
429 max,
430 './tests/worker-files/thread/testWorker.mjs',
431 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
432 )
433 expect(
434 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
435 pool.workerChoiceStrategyContext.workerChoiceStrategy
436 ).nextWorkerNodeKey
437 ).toBeDefined()
438 expect(
439 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
440 pool.workerChoiceStrategyContext.workerChoiceStrategy
441 ).previousWorkerNodeKey
442 ).toBeDefined()
443 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
444 expect(
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
447 ).nextWorkerNodeKey
448 ).toBe(0)
449 expect(
450 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
451 pool.workerChoiceStrategyContext.workerChoiceStrategy
452 ).previousWorkerNodeKey
453 ).toBe(0)
454 // We need to clean up the resources after our test
455 await pool.destroy()
456 })
457
458 it('Verify LEAST_USED strategy default policy', async () => {
459 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
460 let pool = new FixedThreadPool(
461 max,
462 './tests/worker-files/thread/testWorker.mjs',
463 { workerChoiceStrategy }
464 )
465 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
466 dynamicWorkerUsage: false,
467 dynamicWorkerReady: true
468 })
469 await pool.destroy()
470 pool = new DynamicThreadPool(
471 min,
472 max,
473 './tests/worker-files/thread/testWorker.mjs',
474 { workerChoiceStrategy }
475 )
476 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
477 dynamicWorkerUsage: false,
478 dynamicWorkerReady: true
479 })
480 // We need to clean up the resources after our test
481 await pool.destroy()
482 })
483
484 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
485 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
486 let pool = new FixedThreadPool(
487 max,
488 './tests/worker-files/thread/testWorker.mjs',
489 { workerChoiceStrategy }
490 )
491 expect(
492 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
493 ).toStrictEqual({
494 runTime: {
495 aggregate: false,
496 average: false,
497 median: false
498 },
499 waitTime: {
500 aggregate: false,
501 average: false,
502 median: false
503 },
504 elu: {
505 aggregate: false,
506 average: false,
507 median: false
508 }
509 })
510 await pool.destroy()
511 pool = new DynamicThreadPool(
512 min,
513 max,
514 './tests/worker-files/thread/testWorker.mjs',
515 { workerChoiceStrategy }
516 )
517 expect(
518 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
519 ).toStrictEqual({
520 runTime: {
521 aggregate: false,
522 average: false,
523 median: false
524 },
525 waitTime: {
526 aggregate: false,
527 average: false,
528 median: false
529 },
530 elu: {
531 aggregate: false,
532 average: false,
533 median: false
534 }
535 })
536 // We need to clean up the resources after our test
537 await pool.destroy()
538 })
539
540 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
541 const pool = new FixedThreadPool(
542 max,
543 './tests/worker-files/thread/testWorker.mjs',
544 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
545 )
546 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
547 const promises = new Set()
548 const maxMultiplier = 2
549 for (let i = 0; i < max * maxMultiplier; i++) {
550 promises.add(pool.execute())
551 }
552 await Promise.all(promises)
553 for (const workerNode of pool.workerNodes) {
554 expect(workerNode.usage).toStrictEqual({
555 tasks: {
556 executed: expect.any(Number),
557 executing: 0,
558 queued: 0,
559 maxQueued: 0,
560 stolen: 0,
561 failed: 0
562 },
563 runTime: {
564 history: new CircularArray()
565 },
566 waitTime: {
567 history: new CircularArray()
568 },
569 elu: {
570 idle: {
571 history: new CircularArray()
572 },
573 active: {
574 history: new CircularArray()
575 }
576 }
577 })
578 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
579 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
580 max * maxMultiplier
581 )
582 }
583 expect(
584 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
585 pool.workerChoiceStrategyContext.workerChoiceStrategy
586 ).nextWorkerNodeKey
587 ).toEqual(expect.any(Number))
588 expect(
589 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
590 pool.workerChoiceStrategyContext.workerChoiceStrategy
591 ).previousWorkerNodeKey
592 ).toEqual(expect.any(Number))
593 // We need to clean up the resources after our test
594 await pool.destroy()
595 })
596
597 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
598 const pool = new DynamicThreadPool(
599 min,
600 max,
601 './tests/worker-files/thread/testWorker.mjs',
602 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
603 )
604 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
605 const promises = new Set()
606 const maxMultiplier = 2
607 for (let i = 0; i < max * maxMultiplier; i++) {
608 promises.add(pool.execute())
609 }
610 await Promise.all(promises)
611 for (const workerNode of pool.workerNodes) {
612 expect(workerNode.usage).toStrictEqual({
613 tasks: {
614 executed: expect.any(Number),
615 executing: 0,
616 queued: 0,
617 maxQueued: 0,
618 stolen: 0,
619 failed: 0
620 },
621 runTime: {
622 history: new CircularArray()
623 },
624 waitTime: {
625 history: new CircularArray()
626 },
627 elu: {
628 idle: {
629 history: new CircularArray()
630 },
631 active: {
632 history: new CircularArray()
633 }
634 }
635 })
636 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
637 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
638 max * maxMultiplier
639 )
640 }
641 expect(
642 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
643 pool.workerChoiceStrategyContext.workerChoiceStrategy
644 ).nextWorkerNodeKey
645 ).toEqual(expect.any(Number))
646 expect(
647 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
648 pool.workerChoiceStrategyContext.workerChoiceStrategy
649 ).previousWorkerNodeKey
650 ).toEqual(expect.any(Number))
651 // We need to clean up the resources after our test
652 await pool.destroy()
653 })
654
655 it('Verify LEAST_BUSY strategy default policy', async () => {
656 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
657 let pool = new FixedThreadPool(
658 max,
659 './tests/worker-files/thread/testWorker.mjs',
660 { workerChoiceStrategy }
661 )
662 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
663 dynamicWorkerUsage: false,
664 dynamicWorkerReady: true
665 })
666 await pool.destroy()
667 pool = new DynamicThreadPool(
668 min,
669 max,
670 './tests/worker-files/thread/testWorker.mjs',
671 { workerChoiceStrategy }
672 )
673 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
674 dynamicWorkerUsage: false,
675 dynamicWorkerReady: true
676 })
677 // We need to clean up the resources after our test
678 await pool.destroy()
679 })
680
681 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
682 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
683 let pool = new FixedThreadPool(
684 max,
685 './tests/worker-files/thread/testWorker.mjs',
686 { workerChoiceStrategy }
687 )
688 expect(
689 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
690 ).toStrictEqual({
691 runTime: {
692 aggregate: true,
693 average: false,
694 median: false
695 },
696 waitTime: {
697 aggregate: true,
698 average: false,
699 median: false
700 },
701 elu: {
702 aggregate: false,
703 average: false,
704 median: false
705 }
706 })
707 await pool.destroy()
708 pool = new DynamicThreadPool(
709 min,
710 max,
711 './tests/worker-files/thread/testWorker.mjs',
712 { workerChoiceStrategy }
713 )
714 expect(
715 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
716 ).toStrictEqual({
717 runTime: {
718 aggregate: true,
719 average: false,
720 median: false
721 },
722 waitTime: {
723 aggregate: true,
724 average: false,
725 median: false
726 },
727 elu: {
728 aggregate: false,
729 average: false,
730 median: false
731 }
732 })
733 // We need to clean up the resources after our test
734 await pool.destroy()
735 })
736
737 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
738 const pool = new FixedThreadPool(
739 max,
740 './tests/worker-files/thread/testWorker.mjs',
741 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
742 )
743 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
744 const promises = new Set()
745 const maxMultiplier = 2
746 for (let i = 0; i < max * maxMultiplier; i++) {
747 promises.add(pool.execute())
748 }
749 await Promise.all(promises)
750 for (const workerNode of pool.workerNodes) {
751 expect(workerNode.usage).toStrictEqual({
752 tasks: {
753 executed: expect.any(Number),
754 executing: 0,
755 queued: 0,
756 maxQueued: 0,
757 stolen: 0,
758 failed: 0
759 },
760 runTime: expect.objectContaining({
761 history: expect.any(CircularArray)
762 }),
763 waitTime: expect.objectContaining({
764 history: expect.any(CircularArray)
765 }),
766 elu: {
767 idle: {
768 history: new CircularArray()
769 },
770 active: {
771 history: new CircularArray()
772 }
773 }
774 })
775 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
776 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
777 max * maxMultiplier
778 )
779 if (workerNode.usage.runTime.aggregate == null) {
780 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
781 } else {
782 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
783 }
784 if (workerNode.usage.waitTime.aggregate == null) {
785 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
786 } else {
787 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
788 }
789 }
790 expect(
791 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
792 pool.workerChoiceStrategyContext.workerChoiceStrategy
793 ).nextWorkerNodeKey
794 ).toEqual(expect.any(Number))
795 expect(
796 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
797 pool.workerChoiceStrategyContext.workerChoiceStrategy
798 ).previousWorkerNodeKey
799 ).toEqual(expect.any(Number))
800 // We need to clean up the resources after our test
801 await pool.destroy()
802 })
803
804 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
805 const pool = new DynamicThreadPool(
806 min,
807 max,
808 './tests/worker-files/thread/testWorker.mjs',
809 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
810 )
811 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
812 const promises = new Set()
813 const maxMultiplier = 2
814 for (let i = 0; i < max * maxMultiplier; i++) {
815 promises.add(pool.execute())
816 }
817 await Promise.all(promises)
818 for (const workerNode of pool.workerNodes) {
819 expect(workerNode.usage).toStrictEqual({
820 tasks: {
821 executed: expect.any(Number),
822 executing: 0,
823 queued: 0,
824 maxQueued: 0,
825 stolen: 0,
826 failed: 0
827 },
828 runTime: expect.objectContaining({
829 history: expect.any(CircularArray)
830 }),
831 waitTime: expect.objectContaining({
832 history: expect.any(CircularArray)
833 }),
834 elu: {
835 idle: {
836 history: new CircularArray()
837 },
838 active: {
839 history: new CircularArray()
840 }
841 }
842 })
843 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
844 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
845 max * maxMultiplier
846 )
847 if (workerNode.usage.runTime.aggregate == null) {
848 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
849 } else {
850 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
851 }
852 if (workerNode.usage.waitTime.aggregate == null) {
853 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
854 } else {
855 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
856 }
857 }
858 expect(
859 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
860 pool.workerChoiceStrategyContext.workerChoiceStrategy
861 ).nextWorkerNodeKey
862 ).toEqual(expect.any(Number))
863 expect(
864 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
865 pool.workerChoiceStrategyContext.workerChoiceStrategy
866 ).previousWorkerNodeKey
867 ).toEqual(expect.any(Number))
868 // We need to clean up the resources after our test
869 await pool.destroy()
870 })
871
872 it('Verify LEAST_ELU strategy default policy', async () => {
873 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
874 let pool = new FixedThreadPool(
875 max,
876 './tests/worker-files/thread/testWorker.mjs',
877 { workerChoiceStrategy }
878 )
879 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
880 dynamicWorkerUsage: false,
881 dynamicWorkerReady: true
882 })
883 await pool.destroy()
884 pool = new DynamicThreadPool(
885 min,
886 max,
887 './tests/worker-files/thread/testWorker.mjs',
888 { workerChoiceStrategy }
889 )
890 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
891 dynamicWorkerUsage: false,
892 dynamicWorkerReady: true
893 })
894 // We need to clean up the resources after our test
895 await pool.destroy()
896 })
897
898 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
899 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
900 let pool = new FixedThreadPool(
901 max,
902 './tests/worker-files/thread/testWorker.mjs',
903 { workerChoiceStrategy }
904 )
905 expect(
906 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
907 ).toStrictEqual({
908 runTime: {
909 aggregate: false,
910 average: false,
911 median: false
912 },
913 waitTime: {
914 aggregate: false,
915 average: false,
916 median: false
917 },
918 elu: {
919 aggregate: true,
920 average: false,
921 median: false
922 }
923 })
924 await pool.destroy()
925 pool = new DynamicThreadPool(
926 min,
927 max,
928 './tests/worker-files/thread/testWorker.mjs',
929 { workerChoiceStrategy }
930 )
931 expect(
932 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
933 ).toStrictEqual({
934 runTime: {
935 aggregate: false,
936 average: false,
937 median: false
938 },
939 waitTime: {
940 aggregate: false,
941 average: false,
942 median: false
943 },
944 elu: {
945 aggregate: true,
946 average: false,
947 median: false
948 }
949 })
950 // We need to clean up the resources after our test
951 await pool.destroy()
952 })
953
954 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
955 const pool = new FixedThreadPool(
956 max,
957 './tests/worker-files/thread/testWorker.mjs',
958 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
959 )
960 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
961 const promises = new Set()
962 const maxMultiplier = 2
963 for (let i = 0; i < max * maxMultiplier; i++) {
964 promises.add(pool.execute())
965 }
966 await Promise.all(promises)
967 for (const workerNode of pool.workerNodes) {
968 expect(workerNode.usage).toStrictEqual({
969 tasks: {
970 executed: expect.any(Number),
971 executing: 0,
972 queued: 0,
973 maxQueued: 0,
974 stolen: 0,
975 failed: 0
976 },
977 runTime: {
978 history: new CircularArray()
979 },
980 waitTime: {
981 history: new CircularArray()
982 },
983 elu: expect.objectContaining({
984 idle: expect.objectContaining({
985 history: expect.any(CircularArray)
986 }),
987 active: expect.objectContaining({
988 history: expect.any(CircularArray)
989 })
990 })
991 })
992 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
993 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
994 max * maxMultiplier
995 )
996 if (workerNode.usage.elu.active.aggregate == null) {
997 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
998 } else {
999 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1000 }
1001 if (workerNode.usage.elu.idle.aggregate == null) {
1002 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1003 } else {
1004 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1005 }
1006 if (workerNode.usage.elu.utilization == null) {
1007 expect(workerNode.usage.elu.utilization).toBeUndefined()
1008 } else {
1009 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1010 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1011 }
1012 }
1013 expect(
1014 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1015 pool.workerChoiceStrategyContext.workerChoiceStrategy
1016 ).nextWorkerNodeKey
1017 ).toEqual(expect.any(Number))
1018 expect(
1019 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1020 pool.workerChoiceStrategyContext.workerChoiceStrategy
1021 ).previousWorkerNodeKey
1022 ).toEqual(expect.any(Number))
1023 // We need to clean up the resources after our test
1024 await pool.destroy()
1025 })
1026
1027 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1028 const pool = new DynamicThreadPool(
1029 min,
1030 max,
1031 './tests/worker-files/thread/testWorker.mjs',
1032 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1033 )
1034 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1035 const promises = new Set()
1036 const maxMultiplier = 2
1037 for (let i = 0; i < max * maxMultiplier; i++) {
1038 promises.add(pool.execute())
1039 }
1040 await Promise.all(promises)
1041 for (const workerNode of pool.workerNodes) {
1042 expect(workerNode.usage).toStrictEqual({
1043 tasks: {
1044 executed: expect.any(Number),
1045 executing: 0,
1046 queued: 0,
1047 maxQueued: 0,
1048 stolen: 0,
1049 failed: 0
1050 },
1051 runTime: {
1052 history: new CircularArray()
1053 },
1054 waitTime: {
1055 history: new CircularArray()
1056 },
1057 elu: expect.objectContaining({
1058 idle: expect.objectContaining({
1059 history: expect.any(CircularArray)
1060 }),
1061 active: expect.objectContaining({
1062 history: expect.any(CircularArray)
1063 })
1064 })
1065 })
1066 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1067 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1068 max * maxMultiplier
1069 )
1070 if (workerNode.usage.elu.active.aggregate == null) {
1071 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1072 } else {
1073 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1074 }
1075 if (workerNode.usage.elu.idle.aggregate == null) {
1076 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1077 } else {
1078 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1079 }
1080 if (workerNode.usage.elu.utilization == null) {
1081 expect(workerNode.usage.elu.utilization).toBeUndefined()
1082 } else {
1083 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1084 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1085 }
1086 }
1087 expect(
1088 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1089 pool.workerChoiceStrategyContext.workerChoiceStrategy
1090 ).nextWorkerNodeKey
1091 ).toEqual(expect.any(Number))
1092 expect(
1093 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1094 pool.workerChoiceStrategyContext.workerChoiceStrategy
1095 ).previousWorkerNodeKey
1096 ).toEqual(expect.any(Number))
1097 // We need to clean up the resources after our test
1098 await pool.destroy()
1099 })
1100
1101 it('Verify FAIR_SHARE strategy default policy', async () => {
1102 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1103 let pool = new FixedThreadPool(
1104 max,
1105 './tests/worker-files/thread/testWorker.mjs',
1106 { workerChoiceStrategy }
1107 )
1108 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1109 dynamicWorkerUsage: false,
1110 dynamicWorkerReady: true
1111 })
1112 await pool.destroy()
1113 pool = new DynamicThreadPool(
1114 min,
1115 max,
1116 './tests/worker-files/thread/testWorker.mjs',
1117 { workerChoiceStrategy }
1118 )
1119 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1120 dynamicWorkerUsage: false,
1121 dynamicWorkerReady: true
1122 })
1123 // We need to clean up the resources after our test
1124 await pool.destroy()
1125 })
1126
1127 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1128 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1129 let pool = new FixedThreadPool(
1130 max,
1131 './tests/worker-files/thread/testWorker.mjs',
1132 { workerChoiceStrategy }
1133 )
1134 expect(
1135 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1136 ).toStrictEqual({
1137 runTime: {
1138 aggregate: true,
1139 average: true,
1140 median: false
1141 },
1142 waitTime: {
1143 aggregate: false,
1144 average: false,
1145 median: false
1146 },
1147 elu: {
1148 aggregate: true,
1149 average: true,
1150 median: false
1151 }
1152 })
1153 await pool.destroy()
1154 pool = new DynamicThreadPool(
1155 min,
1156 max,
1157 './tests/worker-files/thread/testWorker.mjs',
1158 { workerChoiceStrategy }
1159 )
1160 expect(
1161 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1162 ).toStrictEqual({
1163 runTime: {
1164 aggregate: true,
1165 average: true,
1166 median: false
1167 },
1168 waitTime: {
1169 aggregate: false,
1170 average: false,
1171 median: false
1172 },
1173 elu: {
1174 aggregate: true,
1175 average: true,
1176 median: false
1177 }
1178 })
1179 // We need to clean up the resources after our test
1180 await pool.destroy()
1181 })
1182
1183 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1184 const pool = new FixedThreadPool(
1185 max,
1186 './tests/worker-files/thread/testWorker.mjs',
1187 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1188 )
1189 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1190 const promises = new Set()
1191 const maxMultiplier = 2
1192 for (let i = 0; i < max * maxMultiplier; i++) {
1193 promises.add(pool.execute())
1194 }
1195 await Promise.all(promises)
1196 for (const workerNode of pool.workerNodes) {
1197 expect(workerNode.usage).toStrictEqual({
1198 tasks: {
1199 executed: expect.any(Number),
1200 executing: 0,
1201 queued: 0,
1202 maxQueued: 0,
1203 stolen: 0,
1204 failed: 0
1205 },
1206 runTime: expect.objectContaining({
1207 history: expect.any(CircularArray)
1208 }),
1209 waitTime: {
1210 history: new CircularArray()
1211 },
1212 elu: expect.objectContaining({
1213 idle: expect.objectContaining({
1214 history: expect.any(CircularArray)
1215 }),
1216 active: expect.objectContaining({
1217 history: expect.any(CircularArray)
1218 })
1219 })
1220 })
1221 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1222 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1223 max * maxMultiplier
1224 )
1225 if (workerNode.usage.runTime.aggregate == null) {
1226 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1227 } else {
1228 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1229 }
1230 if (workerNode.usage.runTime.average == null) {
1231 expect(workerNode.usage.runTime.average).toBeUndefined()
1232 } else {
1233 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1234 }
1235 if (workerNode.usage.elu.active.aggregate == null) {
1236 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1237 } else {
1238 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1239 }
1240 if (workerNode.usage.elu.idle.aggregate == null) {
1241 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1242 } else {
1243 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1244 }
1245 if (workerNode.usage.elu.utilization == null) {
1246 expect(workerNode.usage.elu.utilization).toBeUndefined()
1247 } else {
1248 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1249 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1250 }
1251 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1252 }
1253 expect(
1254 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1255 pool.workerChoiceStrategyContext.workerChoiceStrategy
1256 ).nextWorkerNodeKey
1257 ).toEqual(expect.any(Number))
1258 expect(
1259 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1260 pool.workerChoiceStrategyContext.workerChoiceStrategy
1261 ).previousWorkerNodeKey
1262 ).toEqual(expect.any(Number))
1263 // We need to clean up the resources after our test
1264 await pool.destroy()
1265 })
1266
1267 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1268 const pool = new DynamicThreadPool(
1269 min,
1270 max,
1271 './tests/worker-files/thread/testWorker.mjs',
1272 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1273 )
1274 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1275 const promises = new Set()
1276 const maxMultiplier = 2
1277 for (let i = 0; i < max * maxMultiplier; i++) {
1278 promises.add(pool.execute())
1279 }
1280 await Promise.all(promises)
1281 for (const workerNode of pool.workerNodes) {
1282 expect(workerNode.usage).toStrictEqual({
1283 tasks: {
1284 executed: expect.any(Number),
1285 executing: 0,
1286 queued: 0,
1287 maxQueued: 0,
1288 stolen: 0,
1289 failed: 0
1290 },
1291 runTime: expect.objectContaining({
1292 history: expect.any(CircularArray)
1293 }),
1294 waitTime: {
1295 history: new CircularArray()
1296 },
1297 elu: expect.objectContaining({
1298 idle: expect.objectContaining({
1299 history: expect.any(CircularArray)
1300 }),
1301 active: expect.objectContaining({
1302 history: expect.any(CircularArray)
1303 })
1304 })
1305 })
1306 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1307 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1308 max * maxMultiplier
1309 )
1310 if (workerNode.usage.runTime.aggregate == null) {
1311 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1312 } else {
1313 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1314 }
1315 if (workerNode.usage.runTime.average == null) {
1316 expect(workerNode.usage.runTime.average).toBeUndefined()
1317 } else {
1318 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1319 }
1320 if (workerNode.usage.elu.active.aggregate == null) {
1321 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1322 } else {
1323 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1324 }
1325 if (workerNode.usage.elu.idle.aggregate == null) {
1326 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1327 } else {
1328 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1329 }
1330 if (workerNode.usage.elu.utilization == null) {
1331 expect(workerNode.usage.elu.utilization).toBeUndefined()
1332 } else {
1333 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1334 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1335 }
1336 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1337 }
1338 expect(
1339 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1340 pool.workerChoiceStrategyContext.workerChoiceStrategy
1341 ).nextWorkerNodeKey
1342 ).toEqual(expect.any(Number))
1343 expect(
1344 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1345 pool.workerChoiceStrategyContext.workerChoiceStrategy
1346 ).previousWorkerNodeKey
1347 ).toEqual(expect.any(Number))
1348 // We need to clean up the resources after our test
1349 await pool.destroy()
1350 })
1351
1352 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1353 const pool = new DynamicThreadPool(
1354 min,
1355 max,
1356 './tests/worker-files/thread/testWorker.mjs',
1357 {
1358 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1359 workerChoiceStrategyOptions: {
1360 runTime: { median: true }
1361 }
1362 }
1363 )
1364 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1365 const promises = new Set()
1366 const maxMultiplier = 2
1367 for (let i = 0; i < max * maxMultiplier; i++) {
1368 promises.add(pool.execute())
1369 }
1370 await Promise.all(promises)
1371 for (const workerNode of pool.workerNodes) {
1372 expect(workerNode.usage).toStrictEqual({
1373 tasks: {
1374 executed: expect.any(Number),
1375 executing: 0,
1376 queued: 0,
1377 maxQueued: 0,
1378 stolen: 0,
1379 failed: 0
1380 },
1381 runTime: expect.objectContaining({
1382 history: expect.any(CircularArray)
1383 }),
1384 waitTime: {
1385 history: new CircularArray()
1386 },
1387 elu: expect.objectContaining({
1388 idle: expect.objectContaining({
1389 history: expect.any(CircularArray)
1390 }),
1391 active: expect.objectContaining({
1392 history: expect.any(CircularArray)
1393 })
1394 })
1395 })
1396 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1397 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1398 max * maxMultiplier
1399 )
1400 if (workerNode.usage.runTime.aggregate == null) {
1401 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1402 } else {
1403 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1404 }
1405 if (workerNode.usage.runTime.median == null) {
1406 expect(workerNode.usage.runTime.median).toBeUndefined()
1407 } else {
1408 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1409 }
1410 if (workerNode.usage.elu.active.aggregate == null) {
1411 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1412 } else {
1413 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1414 }
1415 if (workerNode.usage.elu.idle.aggregate == null) {
1416 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1417 } else {
1418 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1419 }
1420 if (workerNode.usage.elu.utilization == null) {
1421 expect(workerNode.usage.elu.utilization).toBeUndefined()
1422 } else {
1423 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1424 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1425 }
1426 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1427 }
1428 expect(
1429 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1430 pool.workerChoiceStrategyContext.workerChoiceStrategy
1431 ).nextWorkerNodeKey
1432 ).toEqual(expect.any(Number))
1433 expect(
1434 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1435 pool.workerChoiceStrategyContext.workerChoiceStrategy
1436 ).previousWorkerNodeKey
1437 ).toEqual(expect.any(Number))
1438 // We need to clean up the resources after our test
1439 await pool.destroy()
1440 })
1441
1442 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1443 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1444 let pool = new FixedThreadPool(
1445 max,
1446 './tests/worker-files/thread/testWorker.mjs'
1447 )
1448 for (const workerNode of pool.workerNodes) {
1449 workerNode.strategyData = {
1450 virtualTaskEndTimestamp: performance.now()
1451 }
1452 }
1453 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1454 for (const workerNode of pool.workerNodes) {
1455 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1456 }
1457 await pool.destroy()
1458 pool = new DynamicThreadPool(
1459 min,
1460 max,
1461 './tests/worker-files/thread/testWorker.mjs'
1462 )
1463 for (const workerNode of pool.workerNodes) {
1464 workerNode.strategyData = {
1465 virtualTaskEndTimestamp: performance.now()
1466 }
1467 }
1468 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1469 for (const workerNode of pool.workerNodes) {
1470 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1471 }
1472 // We need to clean up the resources after our test
1473 await pool.destroy()
1474 })
1475
1476 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1477 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1478 let pool = new FixedThreadPool(
1479 max,
1480 './tests/worker-files/thread/testWorker.mjs',
1481 { workerChoiceStrategy }
1482 )
1483 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1484 dynamicWorkerUsage: false,
1485 dynamicWorkerReady: true
1486 })
1487 await pool.destroy()
1488 pool = new DynamicThreadPool(
1489 min,
1490 max,
1491 './tests/worker-files/thread/testWorker.mjs',
1492 { workerChoiceStrategy }
1493 )
1494 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1495 dynamicWorkerUsage: false,
1496 dynamicWorkerReady: true
1497 })
1498 // We need to clean up the resources after our test
1499 await pool.destroy()
1500 })
1501
1502 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1503 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1504 let pool = new FixedThreadPool(
1505 max,
1506 './tests/worker-files/thread/testWorker.mjs',
1507 { workerChoiceStrategy }
1508 )
1509 expect(
1510 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1511 ).toStrictEqual({
1512 runTime: {
1513 aggregate: true,
1514 average: true,
1515 median: false
1516 },
1517 waitTime: {
1518 aggregate: false,
1519 average: false,
1520 median: false
1521 },
1522 elu: {
1523 aggregate: false,
1524 average: false,
1525 median: false
1526 }
1527 })
1528 await pool.destroy()
1529 pool = new DynamicThreadPool(
1530 min,
1531 max,
1532 './tests/worker-files/thread/testWorker.mjs',
1533 { workerChoiceStrategy }
1534 )
1535 expect(
1536 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1537 ).toStrictEqual({
1538 runTime: {
1539 aggregate: true,
1540 average: true,
1541 median: false
1542 },
1543 waitTime: {
1544 aggregate: false,
1545 average: false,
1546 median: false
1547 },
1548 elu: {
1549 aggregate: false,
1550 average: false,
1551 median: false
1552 }
1553 })
1554 // We need to clean up the resources after our test
1555 await pool.destroy()
1556 })
1557
1558 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1559 const pool = new FixedThreadPool(
1560 max,
1561 './tests/worker-files/thread/testWorker.mjs',
1562 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1563 )
1564 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1565 const promises = new Set()
1566 const maxMultiplier = 2
1567 for (let i = 0; i < max * maxMultiplier; i++) {
1568 promises.add(pool.execute())
1569 }
1570 await Promise.all(promises)
1571 for (const workerNode of pool.workerNodes) {
1572 expect(workerNode.usage).toStrictEqual({
1573 tasks: {
1574 executed: expect.any(Number),
1575 executing: 0,
1576 queued: 0,
1577 maxQueued: 0,
1578 stolen: 0,
1579 failed: 0
1580 },
1581 runTime: expect.objectContaining({
1582 history: expect.any(CircularArray)
1583 }),
1584 waitTime: {
1585 history: new CircularArray()
1586 },
1587 elu: {
1588 idle: {
1589 history: new CircularArray()
1590 },
1591 active: {
1592 history: new CircularArray()
1593 }
1594 }
1595 })
1596 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1597 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1598 max * maxMultiplier
1599 )
1600 if (workerNode.usage.runTime.aggregate == null) {
1601 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1602 } else {
1603 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1604 }
1605 if (workerNode.usage.runTime.average == null) {
1606 expect(workerNode.usage.runTime.average).toBeUndefined()
1607 } else {
1608 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1609 }
1610 }
1611 expect(
1612 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1613 pool.workerChoiceStrategyContext.workerChoiceStrategy
1614 ).nextWorkerNodeKey
1615 ).toBe(0)
1616 expect(
1617 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1618 pool.workerChoiceStrategyContext.workerChoiceStrategy
1619 ).previousWorkerNodeKey
1620 ).toBe(0)
1621 expect(
1622 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1623 pool.workerChoiceStrategyContext.workerChoiceStrategy
1624 ).defaultWorkerWeight
1625 ).toBeGreaterThan(0)
1626 expect(
1627 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1628 pool.workerChoiceStrategyContext.workerChoiceStrategy
1629 ).workerNodeVirtualTaskRunTime
1630 ).toBeGreaterThanOrEqual(0)
1631 // We need to clean up the resources after our test
1632 await pool.destroy()
1633 })
1634
1635 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1636 const pool = new DynamicThreadPool(
1637 min,
1638 max,
1639 './tests/worker-files/thread/testWorker.mjs',
1640 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1641 )
1642 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1643 const promises = new Set()
1644 const maxMultiplier = 2
1645 for (let i = 0; i < max * maxMultiplier; i++) {
1646 promises.add(pool.execute())
1647 }
1648 await Promise.all(promises)
1649 for (const workerNode of pool.workerNodes) {
1650 expect(workerNode.usage).toStrictEqual({
1651 tasks: {
1652 executed: expect.any(Number),
1653 executing: 0,
1654 queued: 0,
1655 maxQueued: 0,
1656 stolen: 0,
1657 failed: 0
1658 },
1659 runTime: expect.objectContaining({
1660 history: expect.any(CircularArray)
1661 }),
1662 waitTime: {
1663 history: new CircularArray()
1664 },
1665 elu: {
1666 idle: {
1667 history: new CircularArray()
1668 },
1669 active: {
1670 history: new CircularArray()
1671 }
1672 }
1673 })
1674 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1675 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1676 max * maxMultiplier
1677 )
1678 if (workerNode.usage.runTime.aggregate == null) {
1679 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1680 } else {
1681 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1682 }
1683 if (workerNode.usage.runTime.average == null) {
1684 expect(workerNode.usage.runTime.average).toBeUndefined()
1685 } else {
1686 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1687 }
1688 }
1689 expect(
1690 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1691 pool.workerChoiceStrategyContext.workerChoiceStrategy
1692 ).nextWorkerNodeKey
1693 ).toBe(0)
1694 expect(
1695 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1696 pool.workerChoiceStrategyContext.workerChoiceStrategy
1697 ).previousWorkerNodeKey
1698 ).toBe(0)
1699 expect(
1700 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1701 pool.workerChoiceStrategyContext.workerChoiceStrategy
1702 ).defaultWorkerWeight
1703 ).toBeGreaterThan(0)
1704 expect(
1705 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1706 pool.workerChoiceStrategyContext.workerChoiceStrategy
1707 ).workerNodeVirtualTaskRunTime
1708 ).toBeGreaterThanOrEqual(0)
1709 // We need to clean up the resources after our test
1710 await pool.destroy()
1711 })
1712
1713 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1714 const pool = new DynamicThreadPool(
1715 min,
1716 max,
1717 './tests/worker-files/thread/testWorker.mjs',
1718 {
1719 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1720 workerChoiceStrategyOptions: {
1721 runTime: { median: true }
1722 }
1723 }
1724 )
1725 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1726 const promises = new Set()
1727 const maxMultiplier = 2
1728 for (let i = 0; i < max * maxMultiplier; i++) {
1729 promises.add(pool.execute())
1730 }
1731 await Promise.all(promises)
1732 for (const workerNode of pool.workerNodes) {
1733 expect(workerNode.usage).toStrictEqual({
1734 tasks: {
1735 executed: expect.any(Number),
1736 executing: 0,
1737 queued: 0,
1738 maxQueued: 0,
1739 stolen: 0,
1740 failed: 0
1741 },
1742 runTime: expect.objectContaining({
1743 history: expect.any(CircularArray)
1744 }),
1745 waitTime: {
1746 history: new CircularArray()
1747 },
1748 elu: {
1749 idle: {
1750 history: new CircularArray()
1751 },
1752 active: {
1753 history: new CircularArray()
1754 }
1755 }
1756 })
1757 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1758 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1759 max * maxMultiplier
1760 )
1761 if (workerNode.usage.runTime.aggregate == null) {
1762 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1763 } else {
1764 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1765 }
1766 if (workerNode.usage.runTime.median == null) {
1767 expect(workerNode.usage.runTime.median).toBeUndefined()
1768 } else {
1769 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1770 }
1771 }
1772 expect(
1773 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1774 pool.workerChoiceStrategyContext.workerChoiceStrategy
1775 ).nextWorkerNodeKey
1776 ).toBe(0)
1777 expect(
1778 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1779 pool.workerChoiceStrategyContext.workerChoiceStrategy
1780 ).previousWorkerNodeKey
1781 ).toBe(0)
1782 expect(
1783 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1784 pool.workerChoiceStrategyContext.workerChoiceStrategy
1785 ).defaultWorkerWeight
1786 ).toBeGreaterThan(0)
1787 expect(
1788 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1789 pool.workerChoiceStrategyContext.workerChoiceStrategy
1790 ).workerNodeVirtualTaskRunTime
1791 ).toBeGreaterThanOrEqual(0)
1792 // We need to clean up the resources after our test
1793 await pool.destroy()
1794 })
1795
1796 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1797 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1798 let pool = new FixedThreadPool(
1799 max,
1800 './tests/worker-files/thread/testWorker.mjs'
1801 )
1802 expect(
1803 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1804 workerChoiceStrategy
1805 ).nextWorkerNodeKey
1806 ).toBeDefined()
1807 expect(
1808 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1809 workerChoiceStrategy
1810 ).previousWorkerNodeKey
1811 ).toBeDefined()
1812 expect(
1813 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1814 workerChoiceStrategy
1815 ).defaultWorkerWeight
1816 ).toBeDefined()
1817 expect(
1818 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1819 workerChoiceStrategy
1820 ).workerNodeVirtualTaskRunTime
1821 ).toBeDefined()
1822 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1823 expect(
1824 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1825 pool.workerChoiceStrategyContext.workerChoiceStrategy
1826 ).nextWorkerNodeKey
1827 ).toBe(0)
1828 expect(
1829 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1830 pool.workerChoiceStrategyContext.workerChoiceStrategy
1831 ).previousWorkerNodeKey
1832 ).toBe(0)
1833 expect(
1834 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1835 pool.workerChoiceStrategyContext.workerChoiceStrategy
1836 ).defaultWorkerWeight
1837 ).toBeGreaterThan(0)
1838 expect(
1839 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1840 pool.workerChoiceStrategyContext.workerChoiceStrategy
1841 ).workerNodeVirtualTaskRunTime
1842 ).toBe(0)
1843 await pool.destroy()
1844 pool = new DynamicThreadPool(
1845 min,
1846 max,
1847 './tests/worker-files/thread/testWorker.mjs'
1848 )
1849 expect(
1850 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1851 workerChoiceStrategy
1852 ).nextWorkerNodeKey
1853 ).toBeDefined()
1854 expect(
1855 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1856 workerChoiceStrategy
1857 ).previousWorkerNodeKey
1858 ).toBeDefined()
1859 expect(
1860 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1861 workerChoiceStrategy
1862 ).defaultWorkerWeight
1863 ).toBeDefined()
1864 expect(
1865 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1866 workerChoiceStrategy
1867 ).workerNodeVirtualTaskRunTime
1868 ).toBeDefined()
1869 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1870 expect(
1871 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1872 pool.workerChoiceStrategyContext.workerChoiceStrategy
1873 ).nextWorkerNodeKey
1874 ).toBe(0)
1875 expect(
1876 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1877 pool.workerChoiceStrategyContext.workerChoiceStrategy
1878 ).previousWorkerNodeKey
1879 ).toBe(0)
1880 expect(
1881 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1882 pool.workerChoiceStrategyContext.workerChoiceStrategy
1883 ).defaultWorkerWeight
1884 ).toBeGreaterThan(0)
1885 expect(
1886 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1887 pool.workerChoiceStrategyContext.workerChoiceStrategy
1888 ).workerNodeVirtualTaskRunTime
1889 ).toBe(0)
1890 // We need to clean up the resources after our test
1891 await pool.destroy()
1892 })
1893
1894 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1895 const workerChoiceStrategy =
1896 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1897 let pool = new FixedThreadPool(
1898 max,
1899 './tests/worker-files/thread/testWorker.mjs',
1900 { workerChoiceStrategy }
1901 )
1902 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1903 dynamicWorkerUsage: false,
1904 dynamicWorkerReady: true
1905 })
1906 await pool.destroy()
1907 pool = new DynamicThreadPool(
1908 min,
1909 max,
1910 './tests/worker-files/thread/testWorker.mjs',
1911 { workerChoiceStrategy }
1912 )
1913 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1914 dynamicWorkerUsage: false,
1915 dynamicWorkerReady: true
1916 })
1917 // We need to clean up the resources after our test
1918 await pool.destroy()
1919 })
1920
1921 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1922 const workerChoiceStrategy =
1923 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1924 let pool = new FixedThreadPool(
1925 max,
1926 './tests/worker-files/thread/testWorker.mjs',
1927 { workerChoiceStrategy }
1928 )
1929 expect(
1930 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1931 ).toStrictEqual({
1932 runTime: {
1933 aggregate: true,
1934 average: true,
1935 median: false
1936 },
1937 waitTime: {
1938 aggregate: false,
1939 average: false,
1940 median: false
1941 },
1942 elu: {
1943 aggregate: false,
1944 average: false,
1945 median: false
1946 }
1947 })
1948 await pool.destroy()
1949 pool = new DynamicThreadPool(
1950 min,
1951 max,
1952 './tests/worker-files/thread/testWorker.mjs',
1953 { workerChoiceStrategy }
1954 )
1955 expect(
1956 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1957 ).toStrictEqual({
1958 runTime: {
1959 aggregate: true,
1960 average: true,
1961 median: false
1962 },
1963 waitTime: {
1964 aggregate: false,
1965 average: false,
1966 median: false
1967 },
1968 elu: {
1969 aggregate: false,
1970 average: false,
1971 median: false
1972 }
1973 })
1974 // We need to clean up the resources after our test
1975 await pool.destroy()
1976 })
1977
1978 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1979 const pool = new FixedThreadPool(
1980 max,
1981 './tests/worker-files/thread/testWorker.mjs',
1982 {
1983 workerChoiceStrategy:
1984 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1985 }
1986 )
1987 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1988 const promises = new Set()
1989 const maxMultiplier = 2
1990 for (let i = 0; i < max * maxMultiplier; i++) {
1991 promises.add(pool.execute())
1992 }
1993 await Promise.all(promises)
1994 for (const workerNode of pool.workerNodes) {
1995 expect(workerNode.usage).toStrictEqual({
1996 tasks: {
1997 executed: expect.any(Number),
1998 executing: 0,
1999 queued: 0,
2000 maxQueued: 0,
2001 stolen: 0,
2002 failed: 0
2003 },
2004 runTime: expect.objectContaining({
2005 history: expect.any(CircularArray)
2006 }),
2007 waitTime: {
2008 history: new CircularArray()
2009 },
2010 elu: {
2011 idle: {
2012 history: new CircularArray()
2013 },
2014 active: {
2015 history: new CircularArray()
2016 }
2017 }
2018 })
2019 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2020 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2021 max * maxMultiplier
2022 )
2023 }
2024 expect(
2025 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2026 pool.workerChoiceStrategyContext.workerChoiceStrategy
2027 ).defaultWorkerWeight
2028 ).toBeGreaterThan(0)
2029 expect(
2030 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2031 pool.workerChoiceStrategyContext.workerChoiceStrategy
2032 ).roundId
2033 ).toBe(0)
2034 expect(
2035 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2036 pool.workerChoiceStrategyContext.workerChoiceStrategy
2037 ).workerNodeId
2038 ).toBe(0)
2039 expect(
2040 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2041 pool.workerChoiceStrategyContext.workerChoiceStrategy
2042 ).nextWorkerNodeKey
2043 ).toBe(0)
2044 expect(
2045 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2046 pool.workerChoiceStrategyContext.workerChoiceStrategy
2047 ).previousWorkerNodeKey
2048 ).toEqual(expect.any(Number))
2049 expect(
2050 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2051 pool.workerChoiceStrategyContext.workerChoiceStrategy
2052 ).roundWeights
2053 ).toStrictEqual([
2054 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2055 pool.workerChoiceStrategyContext.workerChoiceStrategy
2056 ).defaultWorkerWeight
2057 ])
2058 // We need to clean up the resources after our test
2059 await pool.destroy()
2060 })
2061
2062 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2063 const pool = new DynamicThreadPool(
2064 min,
2065 max,
2066 './tests/worker-files/thread/testWorker.mjs',
2067 {
2068 workerChoiceStrategy:
2069 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2070 }
2071 )
2072 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2073 const promises = new Set()
2074 const maxMultiplier = 2
2075 for (let i = 0; i < max * maxMultiplier; i++) {
2076 promises.add(pool.execute())
2077 }
2078 await Promise.all(promises)
2079 for (const workerNode of pool.workerNodes) {
2080 expect(workerNode.usage).toStrictEqual({
2081 tasks: {
2082 executed: expect.any(Number),
2083 executing: 0,
2084 queued: 0,
2085 maxQueued: 0,
2086 stolen: 0,
2087 failed: 0
2088 },
2089 runTime: expect.objectContaining({
2090 history: expect.any(CircularArray)
2091 }),
2092 waitTime: {
2093 history: new CircularArray()
2094 },
2095 elu: {
2096 idle: {
2097 history: new CircularArray()
2098 },
2099 active: {
2100 history: new CircularArray()
2101 }
2102 }
2103 })
2104 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2105 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2106 max * maxMultiplier
2107 )
2108 }
2109 expect(
2110 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2111 pool.workerChoiceStrategyContext.workerChoiceStrategy
2112 ).defaultWorkerWeight
2113 ).toBeGreaterThan(0)
2114 expect(
2115 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2116 pool.workerChoiceStrategyContext.workerChoiceStrategy
2117 ).roundId
2118 ).toBe(0)
2119 expect(
2120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2121 pool.workerChoiceStrategyContext.workerChoiceStrategy
2122 ).workerNodeId
2123 ).toBe(0)
2124 expect(
2125 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2126 pool.workerChoiceStrategyContext.workerChoiceStrategy
2127 ).nextWorkerNodeKey
2128 ).toBe(0)
2129 expect(
2130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2131 pool.workerChoiceStrategyContext.workerChoiceStrategy
2132 ).previousWorkerNodeKey
2133 ).toEqual(expect.any(Number))
2134 expect(
2135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2136 pool.workerChoiceStrategyContext.workerChoiceStrategy
2137 ).roundWeights
2138 ).toStrictEqual([
2139 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2140 pool.workerChoiceStrategyContext.workerChoiceStrategy
2141 ).defaultWorkerWeight
2142 ])
2143 // We need to clean up the resources after our test
2144 await pool.destroy()
2145 })
2146
2147 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2148 const workerChoiceStrategy =
2149 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2150 let pool = new FixedThreadPool(
2151 max,
2152 './tests/worker-files/thread/testWorker.mjs'
2153 )
2154 expect(
2155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2156 workerChoiceStrategy
2157 ).roundId
2158 ).toBeDefined()
2159 expect(
2160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2161 workerChoiceStrategy
2162 ).workerNodeId
2163 ).toBeDefined()
2164 expect(
2165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2166 workerChoiceStrategy
2167 ).nextWorkerNodeKey
2168 ).toBeDefined()
2169 expect(
2170 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2171 workerChoiceStrategy
2172 ).previousWorkerNodeKey
2173 ).toBeDefined()
2174 expect(
2175 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2176 workerChoiceStrategy
2177 ).defaultWorkerWeight
2178 ).toBeDefined()
2179 expect(
2180 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2181 workerChoiceStrategy
2182 ).roundWeights
2183 ).toBeDefined()
2184 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2185 expect(
2186 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2187 pool.workerChoiceStrategyContext.workerChoiceStrategy
2188 ).roundId
2189 ).toBe(0)
2190 expect(
2191 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2192 pool.workerChoiceStrategyContext.workerChoiceStrategy
2193 ).workerNodeId
2194 ).toBe(0)
2195 expect(
2196 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2197 pool.workerChoiceStrategyContext.workerChoiceStrategy
2198 ).nextWorkerNodeKey
2199 ).toBe(0)
2200 expect(
2201 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2202 pool.workerChoiceStrategyContext.workerChoiceStrategy
2203 ).previousWorkerNodeKey
2204 ).toBe(0)
2205 expect(
2206 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2207 pool.workerChoiceStrategyContext.workerChoiceStrategy
2208 ).defaultWorkerWeight
2209 ).toBeGreaterThan(0)
2210 expect(
2211 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2212 pool.workerChoiceStrategyContext.workerChoiceStrategy
2213 ).roundWeights
2214 ).toStrictEqual([
2215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2216 pool.workerChoiceStrategyContext.workerChoiceStrategy
2217 ).defaultWorkerWeight
2218 ])
2219 await pool.destroy()
2220 pool = new DynamicThreadPool(
2221 min,
2222 max,
2223 './tests/worker-files/thread/testWorker.mjs'
2224 )
2225 expect(
2226 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2227 workerChoiceStrategy
2228 ).roundId
2229 ).toBeDefined()
2230 expect(
2231 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2232 workerChoiceStrategy
2233 ).workerNodeId
2234 ).toBeDefined()
2235 expect(
2236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2237 workerChoiceStrategy
2238 ).nextWorkerNodeKey
2239 ).toBeDefined()
2240 expect(
2241 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2242 workerChoiceStrategy
2243 ).previousWorkerNodeKey
2244 ).toBeDefined()
2245 expect(
2246 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2247 workerChoiceStrategy
2248 ).defaultWorkerWeight
2249 ).toBeDefined()
2250 expect(
2251 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2252 workerChoiceStrategy
2253 ).roundWeights
2254 ).toBeDefined()
2255 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2256 expect(
2257 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2258 pool.workerChoiceStrategyContext.workerChoiceStrategy
2259 ).roundId
2260 ).toBe(0)
2261 expect(
2262 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2263 pool.workerChoiceStrategyContext.workerChoiceStrategy
2264 ).workerNodeId
2265 ).toBe(0)
2266 expect(
2267 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2268 pool.workerChoiceStrategyContext.workerChoiceStrategy
2269 ).nextWorkerNodeKey
2270 ).toBe(0)
2271 expect(
2272 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2273 pool.workerChoiceStrategyContext.workerChoiceStrategy
2274 ).previousWorkerNodeKey
2275 ).toBe(0)
2276 expect(
2277 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2278 pool.workerChoiceStrategyContext.workerChoiceStrategy
2279 ).defaultWorkerWeight
2280 ).toBeGreaterThan(0)
2281 expect(
2282 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2283 pool.workerChoiceStrategyContext.workerChoiceStrategy
2284 ).roundWeights
2285 ).toStrictEqual([
2286 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2287 pool.workerChoiceStrategyContext.workerChoiceStrategy
2288 ).defaultWorkerWeight
2289 ])
2290 // We need to clean up the resources after our test
2291 await pool.destroy()
2292 })
2293
2294 it('Verify unknown strategy throw error', () => {
2295 expect(
2296 () =>
2297 new DynamicThreadPool(
2298 min,
2299 max,
2300 './tests/worker-files/thread/testWorker.mjs',
2301 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2302 )
2303 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2304 })
2305 })