b4f37a50e60fbdf49f4f271d5485753e023d7cb4
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 WorkerChoiceStrategies
8 } = require('../../../lib')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Selection strategies test suite', () => {
12 const min = 0
13 const max = 3
14
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
23 )
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
26 )
27 })
28
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
31 min,
32 max,
33 './tests/worker-files/thread/testWorker.js'
34 )
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
37 )
38 // We need to clean up the resources after our test
39 await pool.destroy()
40 })
41
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
45 max,
46 './tests/worker-files/thread/testWorker.js',
47 { workerChoiceStrategy }
48 )
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
51 workerChoiceStrategy
52 )
53 await pool.destroy()
54 }
55 })
56
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
60 min,
61 max,
62 './tests/worker-files/thread/testWorker.js'
63 )
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
67 workerChoiceStrategy
68 )
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
70 retries: 6,
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
74 })
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
76 retries: 6,
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
80 })
81 await pool.destroy()
82 }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
85 min,
86 max,
87 './tests/worker-files/cluster/testWorker.js'
88 )
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
92 workerChoiceStrategy
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 retries: 3,
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
101 retries: 3,
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
105 })
106 await pool.destroy()
107 }
108 })
109
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
112 max,
113 './tests/worker-files/thread/testWorker.js'
114 )
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
116 if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
117 expect(
118 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
119 workerChoiceStrategy
120 ).nextWorkerNodeKey
121 ).toBe(0)
122 } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
123 expect(
124 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
125 workerChoiceStrategy
126 ).workersVirtualTaskEndTimestamp
127 ).toBeInstanceOf(Array)
128 expect(
129 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
130 workerChoiceStrategy
131 ).workersVirtualTaskEndTimestamp.length
132 ).toBe(0)
133 } else if (
134 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
135 ) {
136 expect(
137 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
138 workerChoiceStrategy
139 ).nextWorkerNodeKey
140 ).toBe(0)
141 expect(
142 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
143 workerChoiceStrategy
144 ).defaultWorkerWeight
145 ).toBeGreaterThan(0)
146 expect(
147 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
148 workerChoiceStrategy
149 ).workerVirtualTaskRunTime
150 ).toBe(0)
151 }
152 }
153 await pool.destroy()
154 })
155
156 it('Verify ROUND_ROBIN strategy default policy', async () => {
157 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
158 let pool = new FixedThreadPool(
159 max,
160 './tests/worker-files/thread/testWorker.js',
161 { workerChoiceStrategy }
162 )
163 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
164 dynamicWorkerUsage: false,
165 dynamicWorkerReady: true
166 })
167 await pool.destroy()
168 pool = new DynamicThreadPool(
169 min,
170 max,
171 './tests/worker-files/thread/testWorker.js',
172 { workerChoiceStrategy }
173 )
174 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
175 dynamicWorkerUsage: false,
176 dynamicWorkerReady: true
177 })
178 // We need to clean up the resources after our test
179 await pool.destroy()
180 })
181
182 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
183 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
184 let pool = new FixedThreadPool(
185 max,
186 './tests/worker-files/thread/testWorker.js',
187 { workerChoiceStrategy }
188 )
189 expect(
190 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
191 ).toStrictEqual({
192 runTime: {
193 aggregate: false,
194 average: false,
195 median: false
196 },
197 waitTime: {
198 aggregate: false,
199 average: false,
200 median: false
201 },
202 elu: {
203 aggregate: false,
204 average: false,
205 median: false
206 }
207 })
208 await pool.destroy()
209 pool = new DynamicThreadPool(
210 min,
211 max,
212 './tests/worker-files/thread/testWorker.js',
213 { workerChoiceStrategy }
214 )
215 expect(
216 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
217 ).toStrictEqual({
218 runTime: {
219 aggregate: false,
220 average: false,
221 median: false
222 },
223 waitTime: {
224 aggregate: false,
225 average: false,
226 median: false
227 },
228 elu: {
229 aggregate: false,
230 average: false,
231 median: false
232 }
233 })
234 // We need to clean up the resources after our test
235 await pool.destroy()
236 })
237
238 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
239 const pool = new FixedThreadPool(
240 max,
241 './tests/worker-files/thread/testWorker.js',
242 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
243 )
244 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
245 const promises = new Set()
246 const maxMultiplier = 2
247 for (let i = 0; i < max * maxMultiplier; i++) {
248 promises.add(pool.execute())
249 }
250 await Promise.all(promises)
251 for (const workerNode of pool.workerNodes) {
252 expect(workerNode.usage).toStrictEqual({
253 tasks: {
254 executed: maxMultiplier,
255 executing: 0,
256 queued: 0,
257 maxQueued: 0,
258 stolen: 0,
259 failed: 0
260 },
261 runTime: {
262 history: expect.any(CircularArray)
263 },
264 waitTime: {
265 history: expect.any(CircularArray)
266 },
267 elu: {
268 idle: {
269 history: expect.any(CircularArray)
270 },
271 active: {
272 history: expect.any(CircularArray)
273 }
274 }
275 })
276 }
277 expect(
278 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
279 WorkerChoiceStrategies.ROUND_ROBIN
280 ).nextWorkerNodeKey
281 ).toBe(0)
282 // We need to clean up the resources after our test
283 await pool.destroy()
284 })
285
286 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
287 const pool = new DynamicThreadPool(
288 min,
289 max,
290 './tests/worker-files/thread/testWorker.js',
291 { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
292 )
293 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
294 const promises = new Set()
295 const maxMultiplier = 2
296 for (let i = 0; i < max * maxMultiplier; i++) {
297 promises.add(pool.execute())
298 }
299 await Promise.all(promises)
300 for (const workerNode of pool.workerNodes) {
301 expect(workerNode.usage).toStrictEqual({
302 tasks: {
303 executed: expect.any(Number),
304 executing: 0,
305 queued: 0,
306 maxQueued: 0,
307 stolen: 0,
308 failed: 0
309 },
310 runTime: {
311 history: expect.any(CircularArray)
312 },
313 waitTime: {
314 history: expect.any(CircularArray)
315 },
316 elu: {
317 idle: {
318 history: expect.any(CircularArray)
319 },
320 active: {
321 history: expect.any(CircularArray)
322 }
323 }
324 })
325 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
326 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
327 max * maxMultiplier
328 )
329 }
330 expect(
331 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
332 WorkerChoiceStrategies.ROUND_ROBIN
333 ).nextWorkerNodeKey
334 ).toBe(0)
335 // We need to clean up the resources after our test
336 await pool.destroy()
337 })
338
339 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
340 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
341 let pool = new FixedClusterPool(
342 max,
343 './tests/worker-files/cluster/testWorker.js',
344 { workerChoiceStrategy }
345 )
346 let results = new Set()
347 for (let i = 0; i < max; i++) {
348 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id)
349 }
350 expect(results.size).toBe(max)
351 await pool.destroy()
352 pool = new FixedThreadPool(
353 max,
354 './tests/worker-files/thread/testWorker.js',
355 { workerChoiceStrategy }
356 )
357 results = new Set()
358 for (let i = 0; i < max; i++) {
359 results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId)
360 }
361 expect(results.size).toBe(max)
362 await pool.destroy()
363 })
364
365 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
366 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
367 let pool = new FixedThreadPool(
368 max,
369 './tests/worker-files/thread/testWorker.js',
370 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
371 )
372 expect(
373 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
374 workerChoiceStrategy
375 ).nextWorkerNodeKey
376 ).toBeDefined()
377 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
378 expect(
379 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
380 pool.workerChoiceStrategyContext.workerChoiceStrategy
381 ).nextWorkerNodeKey
382 ).toBe(0)
383 await pool.destroy()
384 pool = new DynamicThreadPool(
385 min,
386 max,
387 './tests/worker-files/thread/testWorker.js',
388 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
389 )
390 expect(
391 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
392 workerChoiceStrategy
393 ).nextWorkerNodeKey
394 ).toBeDefined()
395 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
396 expect(
397 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
398 pool.workerChoiceStrategyContext.workerChoiceStrategy
399 ).nextWorkerNodeKey
400 ).toBe(0)
401 // We need to clean up the resources after our test
402 await pool.destroy()
403 })
404
405 it('Verify LEAST_USED strategy default policy', async () => {
406 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
407 let pool = new FixedThreadPool(
408 max,
409 './tests/worker-files/thread/testWorker.js',
410 { workerChoiceStrategy }
411 )
412 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
413 dynamicWorkerUsage: false,
414 dynamicWorkerReady: true
415 })
416 await pool.destroy()
417 pool = new DynamicThreadPool(
418 min,
419 max,
420 './tests/worker-files/thread/testWorker.js',
421 { workerChoiceStrategy }
422 )
423 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
424 dynamicWorkerUsage: false,
425 dynamicWorkerReady: true
426 })
427 // We need to clean up the resources after our test
428 await pool.destroy()
429 })
430
431 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
432 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
433 let pool = new FixedThreadPool(
434 max,
435 './tests/worker-files/thread/testWorker.js',
436 { workerChoiceStrategy }
437 )
438 expect(
439 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 ).toStrictEqual({
441 runTime: {
442 aggregate: false,
443 average: false,
444 median: false
445 },
446 waitTime: {
447 aggregate: false,
448 average: false,
449 median: false
450 },
451 elu: {
452 aggregate: false,
453 average: false,
454 median: false
455 }
456 })
457 await pool.destroy()
458 pool = new DynamicThreadPool(
459 min,
460 max,
461 './tests/worker-files/thread/testWorker.js',
462 { workerChoiceStrategy }
463 )
464 expect(
465 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
466 ).toStrictEqual({
467 runTime: {
468 aggregate: false,
469 average: false,
470 median: false
471 },
472 waitTime: {
473 aggregate: false,
474 average: false,
475 median: false
476 },
477 elu: {
478 aggregate: false,
479 average: false,
480 median: false
481 }
482 })
483 // We need to clean up the resources after our test
484 await pool.destroy()
485 })
486
487 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
488 const pool = new FixedThreadPool(
489 max,
490 './tests/worker-files/thread/testWorker.js',
491 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
492 )
493 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
494 const promises = new Set()
495 const maxMultiplier = 2
496 for (let i = 0; i < max * maxMultiplier; i++) {
497 promises.add(pool.execute())
498 }
499 await Promise.all(promises)
500 for (const workerNode of pool.workerNodes) {
501 expect(workerNode.usage).toStrictEqual({
502 tasks: {
503 executed: expect.any(Number),
504 executing: 0,
505 queued: 0,
506 maxQueued: 0,
507 stolen: 0,
508 failed: 0
509 },
510 runTime: {
511 history: expect.any(CircularArray)
512 },
513 waitTime: {
514 history: expect.any(CircularArray)
515 },
516 elu: {
517 idle: {
518 history: expect.any(CircularArray)
519 },
520 active: {
521 history: expect.any(CircularArray)
522 }
523 }
524 })
525 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
526 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
527 max * maxMultiplier
528 )
529 }
530 // We need to clean up the resources after our test
531 await pool.destroy()
532 })
533
534 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
535 const pool = new DynamicThreadPool(
536 min,
537 max,
538 './tests/worker-files/thread/testWorker.js',
539 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
540 )
541 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
542 const promises = new Set()
543 const maxMultiplier = 2
544 for (let i = 0; i < max * maxMultiplier; i++) {
545 promises.add(pool.execute())
546 }
547 await Promise.all(promises)
548 for (const workerNode of pool.workerNodes) {
549 expect(workerNode.usage).toStrictEqual({
550 tasks: {
551 executed: expect.any(Number),
552 executing: 0,
553 queued: 0,
554 maxQueued: 0,
555 stolen: 0,
556 failed: 0
557 },
558 runTime: {
559 history: expect.any(CircularArray)
560 },
561 waitTime: {
562 history: expect.any(CircularArray)
563 },
564 elu: {
565 idle: {
566 history: expect.any(CircularArray)
567 },
568 active: {
569 history: expect.any(CircularArray)
570 }
571 }
572 })
573 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
574 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
575 max * maxMultiplier
576 )
577 }
578 // We need to clean up the resources after our test
579 await pool.destroy()
580 })
581
582 it('Verify LEAST_BUSY strategy default policy', async () => {
583 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
584 let pool = new FixedThreadPool(
585 max,
586 './tests/worker-files/thread/testWorker.js',
587 { workerChoiceStrategy }
588 )
589 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
590 dynamicWorkerUsage: false,
591 dynamicWorkerReady: true
592 })
593 await pool.destroy()
594 pool = new DynamicThreadPool(
595 min,
596 max,
597 './tests/worker-files/thread/testWorker.js',
598 { workerChoiceStrategy }
599 )
600 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
601 dynamicWorkerUsage: false,
602 dynamicWorkerReady: true
603 })
604 // We need to clean up the resources after our test
605 await pool.destroy()
606 })
607
608 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
609 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
610 let pool = new FixedThreadPool(
611 max,
612 './tests/worker-files/thread/testWorker.js',
613 { workerChoiceStrategy }
614 )
615 expect(
616 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
617 ).toStrictEqual({
618 runTime: {
619 aggregate: true,
620 average: false,
621 median: false
622 },
623 waitTime: {
624 aggregate: true,
625 average: false,
626 median: false
627 },
628 elu: {
629 aggregate: false,
630 average: false,
631 median: false
632 }
633 })
634 await pool.destroy()
635 pool = new DynamicThreadPool(
636 min,
637 max,
638 './tests/worker-files/thread/testWorker.js',
639 { workerChoiceStrategy }
640 )
641 expect(
642 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
643 ).toStrictEqual({
644 runTime: {
645 aggregate: true,
646 average: false,
647 median: false
648 },
649 waitTime: {
650 aggregate: true,
651 average: false,
652 median: false
653 },
654 elu: {
655 aggregate: false,
656 average: false,
657 median: false
658 }
659 })
660 // We need to clean up the resources after our test
661 await pool.destroy()
662 })
663
664 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
665 const pool = new FixedThreadPool(
666 max,
667 './tests/worker-files/thread/testWorker.js',
668 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
669 )
670 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
671 const promises = new Set()
672 const maxMultiplier = 2
673 for (let i = 0; i < max * maxMultiplier; i++) {
674 promises.add(pool.execute())
675 }
676 await Promise.all(promises)
677 for (const workerNode of pool.workerNodes) {
678 expect(workerNode.usage).toMatchObject({
679 tasks: {
680 executed: expect.any(Number),
681 executing: 0,
682 queued: 0,
683 maxQueued: 0,
684 stolen: 0,
685 failed: 0
686 },
687 runTime: {
688 history: expect.any(CircularArray)
689 },
690 waitTime: {
691 history: expect.any(CircularArray)
692 },
693 elu: {
694 idle: {
695 history: expect.any(CircularArray)
696 },
697 active: {
698 history: expect.any(CircularArray)
699 }
700 }
701 })
702 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
703 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
704 max * maxMultiplier
705 )
706 if (workerNode.usage.runTime.aggregate == null) {
707 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
708 } else {
709 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
710 }
711 if (workerNode.usage.waitTime.aggregate == null) {
712 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
713 } else {
714 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
715 }
716 }
717 // We need to clean up the resources after our test
718 await pool.destroy()
719 })
720
721 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
722 const pool = new DynamicThreadPool(
723 min,
724 max,
725 './tests/worker-files/thread/testWorker.js',
726 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
727 )
728 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
729 const promises = new Set()
730 const maxMultiplier = 2
731 for (let i = 0; i < max * maxMultiplier; i++) {
732 promises.add(pool.execute())
733 }
734 await Promise.all(promises)
735 for (const workerNode of pool.workerNodes) {
736 expect(workerNode.usage).toMatchObject({
737 tasks: {
738 executed: expect.any(Number),
739 executing: 0,
740 queued: 0,
741 maxQueued: 0,
742 stolen: 0,
743 failed: 0
744 },
745 runTime: {
746 history: expect.any(CircularArray)
747 },
748 waitTime: {
749 history: expect.any(CircularArray)
750 },
751 elu: {
752 idle: {
753 history: expect.any(CircularArray)
754 },
755 active: {
756 history: expect.any(CircularArray)
757 }
758 }
759 })
760 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
761 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
762 max * maxMultiplier
763 )
764 if (workerNode.usage.runTime.aggregate == null) {
765 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
766 } else {
767 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
768 }
769 if (workerNode.usage.waitTime.aggregate == null) {
770 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
771 } else {
772 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
773 }
774 }
775 // We need to clean up the resources after our test
776 await pool.destroy()
777 })
778
779 it('Verify LEAST_ELU strategy default policy', async () => {
780 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
781 let pool = new FixedThreadPool(
782 max,
783 './tests/worker-files/thread/testWorker.js',
784 { workerChoiceStrategy }
785 )
786 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
787 dynamicWorkerUsage: false,
788 dynamicWorkerReady: true
789 })
790 await pool.destroy()
791 pool = new DynamicThreadPool(
792 min,
793 max,
794 './tests/worker-files/thread/testWorker.js',
795 { workerChoiceStrategy }
796 )
797 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
798 dynamicWorkerUsage: false,
799 dynamicWorkerReady: true
800 })
801 // We need to clean up the resources after our test
802 await pool.destroy()
803 })
804
805 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
806 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
807 let pool = new FixedThreadPool(
808 max,
809 './tests/worker-files/thread/testWorker.js',
810 { workerChoiceStrategy }
811 )
812 expect(
813 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
814 ).toStrictEqual({
815 runTime: {
816 aggregate: false,
817 average: false,
818 median: false
819 },
820 waitTime: {
821 aggregate: false,
822 average: false,
823 median: false
824 },
825 elu: {
826 aggregate: true,
827 average: false,
828 median: false
829 }
830 })
831 await pool.destroy()
832 pool = new DynamicThreadPool(
833 min,
834 max,
835 './tests/worker-files/thread/testWorker.js',
836 { workerChoiceStrategy }
837 )
838 expect(
839 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
840 ).toStrictEqual({
841 runTime: {
842 aggregate: false,
843 average: false,
844 median: false
845 },
846 waitTime: {
847 aggregate: false,
848 average: false,
849 median: false
850 },
851 elu: {
852 aggregate: true,
853 average: false,
854 median: false
855 }
856 })
857 // We need to clean up the resources after our test
858 await pool.destroy()
859 })
860
861 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
862 const pool = new FixedThreadPool(
863 max,
864 './tests/worker-files/thread/testWorker.js',
865 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
866 )
867 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
868 const promises = new Set()
869 const maxMultiplier = 2
870 for (let i = 0; i < max * maxMultiplier; i++) {
871 promises.add(pool.execute())
872 }
873 await Promise.all(promises)
874 for (const workerNode of pool.workerNodes) {
875 expect(workerNode.usage).toMatchObject({
876 tasks: {
877 executed: expect.any(Number),
878 executing: 0,
879 queued: 0,
880 maxQueued: 0,
881 stolen: 0,
882 failed: 0
883 },
884 runTime: {
885 history: expect.any(CircularArray)
886 },
887 waitTime: {
888 history: expect.any(CircularArray)
889 },
890 elu: {
891 idle: {
892 history: expect.any(CircularArray)
893 },
894 active: {
895 history: expect.any(CircularArray)
896 }
897 }
898 })
899 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
900 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
901 max * maxMultiplier
902 )
903 if (workerNode.usage.elu.utilization == null) {
904 expect(workerNode.usage.elu.utilization).toBeUndefined()
905 } else {
906 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
907 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
908 }
909 }
910 // We need to clean up the resources after our test
911 await pool.destroy()
912 })
913
914 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
915 const pool = new DynamicThreadPool(
916 min,
917 max,
918 './tests/worker-files/thread/testWorker.js',
919 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
920 )
921 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
922 const promises = new Set()
923 const maxMultiplier = 2
924 for (let i = 0; i < max * maxMultiplier; i++) {
925 promises.add(pool.execute())
926 }
927 await Promise.all(promises)
928 for (const workerNode of pool.workerNodes) {
929 expect(workerNode.usage).toMatchObject({
930 tasks: {
931 executed: expect.any(Number),
932 executing: 0,
933 queued: 0,
934 maxQueued: 0,
935 stolen: 0,
936 failed: 0
937 },
938 runTime: {
939 history: expect.any(CircularArray)
940 },
941 waitTime: {
942 history: expect.any(CircularArray)
943 },
944 elu: {
945 idle: {
946 history: expect.any(CircularArray)
947 },
948 active: {
949 history: expect.any(CircularArray)
950 }
951 }
952 })
953 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
954 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
955 max * maxMultiplier
956 )
957 if (workerNode.usage.elu.utilization == null) {
958 expect(workerNode.usage.elu.utilization).toBeUndefined()
959 } else {
960 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
961 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
962 }
963 }
964 // We need to clean up the resources after our test
965 await pool.destroy()
966 })
967
968 it('Verify FAIR_SHARE strategy default policy', async () => {
969 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
970 let pool = new FixedThreadPool(
971 max,
972 './tests/worker-files/thread/testWorker.js',
973 { workerChoiceStrategy }
974 )
975 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
976 dynamicWorkerUsage: false,
977 dynamicWorkerReady: true
978 })
979 await pool.destroy()
980 pool = new DynamicThreadPool(
981 min,
982 max,
983 './tests/worker-files/thread/testWorker.js',
984 { workerChoiceStrategy }
985 )
986 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
987 dynamicWorkerUsage: false,
988 dynamicWorkerReady: true
989 })
990 // We need to clean up the resources after our test
991 await pool.destroy()
992 })
993
994 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
995 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
996 let pool = new FixedThreadPool(
997 max,
998 './tests/worker-files/thread/testWorker.js',
999 { workerChoiceStrategy }
1000 )
1001 expect(
1002 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1003 ).toStrictEqual({
1004 runTime: {
1005 aggregate: true,
1006 average: true,
1007 median: false
1008 },
1009 waitTime: {
1010 aggregate: false,
1011 average: false,
1012 median: false
1013 },
1014 elu: {
1015 aggregate: true,
1016 average: true,
1017 median: false
1018 }
1019 })
1020 await pool.destroy()
1021 pool = new DynamicThreadPool(
1022 min,
1023 max,
1024 './tests/worker-files/thread/testWorker.js',
1025 { workerChoiceStrategy }
1026 )
1027 expect(
1028 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1029 ).toStrictEqual({
1030 runTime: {
1031 aggregate: true,
1032 average: true,
1033 median: false
1034 },
1035 waitTime: {
1036 aggregate: false,
1037 average: false,
1038 median: false
1039 },
1040 elu: {
1041 aggregate: true,
1042 average: true,
1043 median: false
1044 }
1045 })
1046 // We need to clean up the resources after our test
1047 await pool.destroy()
1048 })
1049
1050 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1051 const pool = new FixedThreadPool(
1052 max,
1053 './tests/worker-files/thread/testWorker.js',
1054 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1055 )
1056 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1057 const promises = new Set()
1058 const maxMultiplier = 2
1059 for (let i = 0; i < max * maxMultiplier; i++) {
1060 promises.add(pool.execute())
1061 }
1062 await Promise.all(promises)
1063 for (const workerNode of pool.workerNodes) {
1064 expect(workerNode.usage).toMatchObject({
1065 tasks: {
1066 executed: expect.any(Number),
1067 executing: 0,
1068 queued: 0,
1069 maxQueued: 0,
1070 stolen: 0,
1071 failed: 0
1072 },
1073 runTime: {
1074 history: expect.any(CircularArray)
1075 },
1076 waitTime: {
1077 history: expect.any(CircularArray)
1078 },
1079 elu: {
1080 idle: {
1081 history: expect.any(CircularArray)
1082 },
1083 active: {
1084 history: expect.any(CircularArray)
1085 }
1086 }
1087 })
1088 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1089 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1090 max * maxMultiplier
1091 )
1092 if (workerNode.usage.runTime.aggregate == null) {
1093 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1094 } else {
1095 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1096 }
1097 if (workerNode.usage.runTime.average == null) {
1098 expect(workerNode.usage.runTime.average).toBeUndefined()
1099 } else {
1100 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1101 }
1102 if (workerNode.usage.elu.utilization == null) {
1103 expect(workerNode.usage.elu.utilization).toBeUndefined()
1104 } else {
1105 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1106 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1107 }
1108 }
1109 expect(
1110 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1111 pool.workerChoiceStrategyContext.workerChoiceStrategy
1112 ).workersVirtualTaskEndTimestamp.length
1113 ).toBe(pool.workerNodes.length)
1114 // We need to clean up the resources after our test
1115 await pool.destroy()
1116 })
1117
1118 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1119 const pool = new DynamicThreadPool(
1120 min,
1121 max,
1122 './tests/worker-files/thread/testWorker.js',
1123 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1124 )
1125 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1126 const promises = new Set()
1127 const maxMultiplier = 2
1128 for (let i = 0; i < max * maxMultiplier; i++) {
1129 promises.add(pool.execute())
1130 }
1131 await Promise.all(promises)
1132 for (const workerNode of pool.workerNodes) {
1133 expect(workerNode.usage).toMatchObject({
1134 tasks: {
1135 executed: expect.any(Number),
1136 executing: 0,
1137 queued: 0,
1138 maxQueued: 0,
1139 stolen: 0,
1140 failed: 0
1141 },
1142 runTime: {
1143 history: expect.any(CircularArray)
1144 },
1145 waitTime: {
1146 history: expect.any(CircularArray)
1147 },
1148 elu: {
1149 idle: {
1150 history: expect.any(CircularArray)
1151 },
1152 active: {
1153 history: expect.any(CircularArray)
1154 }
1155 }
1156 })
1157 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1158 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1159 max * maxMultiplier
1160 )
1161 if (workerNode.usage.runTime.aggregate == null) {
1162 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1163 } else {
1164 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1165 }
1166 if (workerNode.usage.runTime.average == null) {
1167 expect(workerNode.usage.runTime.average).toBeUndefined()
1168 } else {
1169 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1170 }
1171 if (workerNode.usage.elu.utilization == null) {
1172 expect(workerNode.usage.elu.utilization).toBeUndefined()
1173 } else {
1174 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1175 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1176 }
1177 }
1178 expect(
1179 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1180 pool.workerChoiceStrategyContext.workerChoiceStrategy
1181 ).workersVirtualTaskEndTimestamp.length
1182 ).toBe(pool.workerNodes.length)
1183 // We need to clean up the resources after our test
1184 await pool.destroy()
1185 })
1186
1187 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1188 const pool = new DynamicThreadPool(
1189 min,
1190 max,
1191 './tests/worker-files/thread/testWorker.js',
1192 {
1193 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1194 workerChoiceStrategyOptions: {
1195 runTime: { median: true }
1196 }
1197 }
1198 )
1199 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1200 const promises = new Set()
1201 const maxMultiplier = 2
1202 for (let i = 0; i < max * maxMultiplier; i++) {
1203 promises.add(pool.execute())
1204 }
1205 await Promise.all(promises)
1206 for (const workerNode of pool.workerNodes) {
1207 expect(workerNode.usage).toMatchObject({
1208 tasks: {
1209 executed: expect.any(Number),
1210 executing: 0,
1211 queued: 0,
1212 maxQueued: 0,
1213 stolen: 0,
1214 failed: 0
1215 },
1216 runTime: {
1217 history: expect.any(CircularArray)
1218 },
1219 waitTime: {
1220 history: expect.any(CircularArray)
1221 },
1222 elu: {
1223 idle: {
1224 history: expect.any(CircularArray)
1225 },
1226 active: {
1227 history: expect.any(CircularArray)
1228 }
1229 }
1230 })
1231 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1232 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1233 max * maxMultiplier
1234 )
1235 if (workerNode.usage.runTime.aggregate == null) {
1236 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1237 } else {
1238 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1239 }
1240 if (workerNode.usage.runTime.median == null) {
1241 expect(workerNode.usage.runTime.median).toBeUndefined()
1242 } else {
1243 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1244 }
1245 if (workerNode.usage.elu.utilization == null) {
1246 expect(workerNode.usage.elu.utilization).toBeUndefined()
1247 } else {
1248 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1249 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1250 }
1251 }
1252 expect(
1253 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1254 pool.workerChoiceStrategyContext.workerChoiceStrategy
1255 ).workersVirtualTaskEndTimestamp.length
1256 ).toBe(pool.workerNodes.length)
1257 // We need to clean up the resources after our test
1258 await pool.destroy()
1259 })
1260
1261 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1262 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1263 let pool = new FixedThreadPool(
1264 max,
1265 './tests/worker-files/thread/testWorker.js'
1266 )
1267 expect(
1268 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1269 workerChoiceStrategy
1270 ).workersVirtualTaskEndTimestamp
1271 ).toBeInstanceOf(Array)
1272 expect(
1273 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1274 workerChoiceStrategy
1275 ).workersVirtualTaskEndTimestamp.length
1276 ).toBe(0)
1277 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1278 workerChoiceStrategy
1279 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1280 expect(
1281 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1282 workerChoiceStrategy
1283 ).workersVirtualTaskEndTimestamp.length
1284 ).toBe(1)
1285 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1286 expect(
1287 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1288 workerChoiceStrategy
1289 ).workersVirtualTaskEndTimestamp
1290 ).toBeInstanceOf(Array)
1291 expect(
1292 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1293 workerChoiceStrategy
1294 ).workersVirtualTaskEndTimestamp.length
1295 ).toBe(0)
1296 await pool.destroy()
1297 pool = new DynamicThreadPool(
1298 min,
1299 max,
1300 './tests/worker-files/thread/testWorker.js'
1301 )
1302 expect(
1303 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1304 workerChoiceStrategy
1305 ).workersVirtualTaskEndTimestamp
1306 ).toBeInstanceOf(Array)
1307 expect(
1308 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1309 workerChoiceStrategy
1310 ).workersVirtualTaskEndTimestamp.length
1311 ).toBe(0)
1312 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1313 workerChoiceStrategy
1314 ).workersVirtualTaskEndTimestamp[0] = performance.now()
1315 expect(
1316 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1317 workerChoiceStrategy
1318 ).workersVirtualTaskEndTimestamp.length
1319 ).toBe(1)
1320 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1321 expect(
1322 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1323 workerChoiceStrategy
1324 ).workersVirtualTaskEndTimestamp
1325 ).toBeInstanceOf(Array)
1326 expect(
1327 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1328 workerChoiceStrategy
1329 ).workersVirtualTaskEndTimestamp.length
1330 ).toBe(0)
1331 // We need to clean up the resources after our test
1332 await pool.destroy()
1333 })
1334
1335 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1336 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1337 let pool = new FixedThreadPool(
1338 max,
1339 './tests/worker-files/thread/testWorker.js',
1340 { workerChoiceStrategy }
1341 )
1342 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1343 dynamicWorkerUsage: false,
1344 dynamicWorkerReady: true
1345 })
1346 await pool.destroy()
1347 pool = new DynamicThreadPool(
1348 min,
1349 max,
1350 './tests/worker-files/thread/testWorker.js',
1351 { workerChoiceStrategy }
1352 )
1353 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1354 dynamicWorkerUsage: false,
1355 dynamicWorkerReady: true
1356 })
1357 // We need to clean up the resources after our test
1358 await pool.destroy()
1359 })
1360
1361 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1362 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1363 let pool = new FixedThreadPool(
1364 max,
1365 './tests/worker-files/thread/testWorker.js',
1366 { workerChoiceStrategy }
1367 )
1368 expect(
1369 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1370 ).toStrictEqual({
1371 runTime: {
1372 aggregate: true,
1373 average: true,
1374 median: false
1375 },
1376 waitTime: {
1377 aggregate: false,
1378 average: false,
1379 median: false
1380 },
1381 elu: {
1382 aggregate: false,
1383 average: false,
1384 median: false
1385 }
1386 })
1387 await pool.destroy()
1388 pool = new DynamicThreadPool(
1389 min,
1390 max,
1391 './tests/worker-files/thread/testWorker.js',
1392 { workerChoiceStrategy }
1393 )
1394 expect(
1395 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1396 ).toStrictEqual({
1397 runTime: {
1398 aggregate: true,
1399 average: true,
1400 median: false
1401 },
1402 waitTime: {
1403 aggregate: false,
1404 average: false,
1405 median: false
1406 },
1407 elu: {
1408 aggregate: false,
1409 average: false,
1410 median: false
1411 }
1412 })
1413 // We need to clean up the resources after our test
1414 await pool.destroy()
1415 })
1416
1417 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1418 const pool = new FixedThreadPool(
1419 max,
1420 './tests/worker-files/thread/testWorker.js',
1421 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1422 )
1423 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1424 const promises = new Set()
1425 const maxMultiplier = 2
1426 for (let i = 0; i < max * maxMultiplier; i++) {
1427 promises.add(pool.execute())
1428 }
1429 await Promise.all(promises)
1430 for (const workerNode of pool.workerNodes) {
1431 expect(workerNode.usage).toStrictEqual({
1432 tasks: {
1433 executed: expect.any(Number),
1434 executing: 0,
1435 queued: 0,
1436 maxQueued: 0,
1437 stolen: 0,
1438 failed: 0
1439 },
1440 runTime: expect.objectContaining({
1441 history: expect.any(CircularArray)
1442 }),
1443 waitTime: {
1444 history: expect.any(CircularArray)
1445 },
1446 elu: {
1447 idle: {
1448 history: expect.any(CircularArray)
1449 },
1450 active: {
1451 history: expect.any(CircularArray)
1452 }
1453 }
1454 })
1455 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1456 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1457 max * maxMultiplier
1458 )
1459 if (workerNode.usage.runTime.aggregate == null) {
1460 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1461 } else {
1462 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1463 }
1464 if (workerNode.usage.runTime.average == null) {
1465 expect(workerNode.usage.runTime.average).toBeUndefined()
1466 } else {
1467 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1468 }
1469 }
1470 expect(
1471 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1472 pool.workerChoiceStrategyContext.workerChoiceStrategy
1473 ).defaultWorkerWeight
1474 ).toBeGreaterThan(0)
1475 expect(
1476 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1477 pool.workerChoiceStrategyContext.workerChoiceStrategy
1478 ).workerVirtualTaskRunTime
1479 ).toBeGreaterThanOrEqual(0)
1480 // We need to clean up the resources after our test
1481 await pool.destroy()
1482 })
1483
1484 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1485 const pool = new DynamicThreadPool(
1486 min,
1487 max,
1488 './tests/worker-files/thread/testWorker.js',
1489 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1490 )
1491 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1492 const promises = new Set()
1493 const maxMultiplier = 2
1494 for (let i = 0; i < max * maxMultiplier; i++) {
1495 promises.add(pool.execute())
1496 }
1497 await Promise.all(promises)
1498 for (const workerNode of pool.workerNodes) {
1499 expect(workerNode.usage).toStrictEqual({
1500 tasks: {
1501 executed: expect.any(Number),
1502 executing: 0,
1503 queued: 0,
1504 maxQueued: 0,
1505 stolen: 0,
1506 failed: 0
1507 },
1508 runTime: expect.objectContaining({
1509 history: expect.any(CircularArray)
1510 }),
1511 waitTime: {
1512 history: expect.any(CircularArray)
1513 },
1514 elu: {
1515 idle: {
1516 history: expect.any(CircularArray)
1517 },
1518 active: {
1519 history: expect.any(CircularArray)
1520 }
1521 }
1522 })
1523 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1524 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1525 max * maxMultiplier
1526 )
1527 if (workerNode.usage.runTime.aggregate == null) {
1528 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1529 } else {
1530 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1531 }
1532 if (workerNode.usage.runTime.average == null) {
1533 expect(workerNode.usage.runTime.average).toBeUndefined()
1534 } else {
1535 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1536 }
1537 }
1538 expect(
1539 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1540 pool.workerChoiceStrategyContext.workerChoiceStrategy
1541 ).defaultWorkerWeight
1542 ).toBeGreaterThan(0)
1543 expect(
1544 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1545 pool.workerChoiceStrategyContext.workerChoiceStrategy
1546 ).workerVirtualTaskRunTime
1547 ).toBeGreaterThanOrEqual(0)
1548 // We need to clean up the resources after our test
1549 await pool.destroy()
1550 })
1551
1552 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1553 const pool = new DynamicThreadPool(
1554 min,
1555 max,
1556 './tests/worker-files/thread/testWorker.js',
1557 {
1558 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1559 workerChoiceStrategyOptions: {
1560 runTime: { median: true }
1561 }
1562 }
1563 )
1564 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1565 const promises = new Set()
1566 const maxMultiplier = 2
1567 for (let i = 0; i < max * maxMultiplier; i++) {
1568 promises.add(pool.execute())
1569 }
1570 await Promise.all(promises)
1571 for (const workerNode of pool.workerNodes) {
1572 expect(workerNode.usage).toStrictEqual({
1573 tasks: {
1574 executed: expect.any(Number),
1575 executing: 0,
1576 queued: 0,
1577 maxQueued: 0,
1578 stolen: 0,
1579 failed: 0
1580 },
1581 runTime: expect.objectContaining({
1582 history: expect.any(CircularArray)
1583 }),
1584 waitTime: {
1585 history: expect.any(CircularArray)
1586 },
1587 elu: {
1588 idle: {
1589 history: expect.any(CircularArray)
1590 },
1591 active: {
1592 history: expect.any(CircularArray)
1593 }
1594 }
1595 })
1596 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1597 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1598 max * maxMultiplier
1599 )
1600 if (workerNode.usage.runTime.aggregate == null) {
1601 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1602 } else {
1603 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1604 }
1605 if (workerNode.usage.runTime.median == null) {
1606 expect(workerNode.usage.runTime.median).toBeUndefined()
1607 } else {
1608 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1609 }
1610 }
1611 expect(
1612 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1613 pool.workerChoiceStrategyContext.workerChoiceStrategy
1614 ).defaultWorkerWeight
1615 ).toBeGreaterThan(0)
1616 expect(
1617 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1618 pool.workerChoiceStrategyContext.workerChoiceStrategy
1619 ).workerVirtualTaskRunTime
1620 ).toBeGreaterThanOrEqual(0)
1621 // We need to clean up the resources after our test
1622 await pool.destroy()
1623 })
1624
1625 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1626 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1627 let pool = new FixedThreadPool(
1628 max,
1629 './tests/worker-files/thread/testWorker.js'
1630 )
1631 expect(
1632 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1633 workerChoiceStrategy
1634 ).nextWorkerNodeKey
1635 ).toBeDefined()
1636 expect(
1637 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1638 workerChoiceStrategy
1639 ).defaultWorkerWeight
1640 ).toBeDefined()
1641 expect(
1642 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1643 workerChoiceStrategy
1644 ).workerVirtualTaskRunTime
1645 ).toBeDefined()
1646 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1647 expect(
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 ).nextWorkerNodeKey
1651 ).toBe(0)
1652 expect(
1653 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1654 pool.workerChoiceStrategyContext.workerChoiceStrategy
1655 ).defaultWorkerWeight
1656 ).toBeGreaterThan(0)
1657 expect(
1658 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1659 workerChoiceStrategy
1660 ).workerVirtualTaskRunTime
1661 ).toBe(0)
1662 await pool.destroy()
1663 pool = new DynamicThreadPool(
1664 min,
1665 max,
1666 './tests/worker-files/thread/testWorker.js'
1667 )
1668 expect(
1669 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1670 workerChoiceStrategy
1671 ).nextWorkerNodeKey
1672 ).toBeDefined()
1673 expect(
1674 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1675 workerChoiceStrategy
1676 ).defaultWorkerWeight
1677 ).toBeDefined()
1678 expect(
1679 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1680 workerChoiceStrategy
1681 ).workerVirtualTaskRunTime
1682 ).toBeDefined()
1683 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1684 expect(
1685 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1686 pool.workerChoiceStrategyContext.workerChoiceStrategy
1687 ).nextWorkerNodeKey
1688 ).toBe(0)
1689 expect(
1690 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1691 pool.workerChoiceStrategyContext.workerChoiceStrategy
1692 ).defaultWorkerWeight
1693 ).toBeGreaterThan(0)
1694 expect(
1695 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1696 workerChoiceStrategy
1697 ).workerVirtualTaskRunTime
1698 ).toBe(0)
1699 // We need to clean up the resources after our test
1700 await pool.destroy()
1701 })
1702
1703 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1704 const workerChoiceStrategy =
1705 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1706 let pool = new FixedThreadPool(
1707 max,
1708 './tests/worker-files/thread/testWorker.js',
1709 { workerChoiceStrategy }
1710 )
1711 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1712 dynamicWorkerUsage: false,
1713 dynamicWorkerReady: true
1714 })
1715 await pool.destroy()
1716 pool = new DynamicThreadPool(
1717 min,
1718 max,
1719 './tests/worker-files/thread/testWorker.js',
1720 { workerChoiceStrategy }
1721 )
1722 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1723 dynamicWorkerUsage: false,
1724 dynamicWorkerReady: true
1725 })
1726 // We need to clean up the resources after our test
1727 await pool.destroy()
1728 })
1729
1730 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1731 const workerChoiceStrategy =
1732 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1733 let pool = new FixedThreadPool(
1734 max,
1735 './tests/worker-files/thread/testWorker.js',
1736 { workerChoiceStrategy }
1737 )
1738 expect(
1739 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1740 ).toStrictEqual({
1741 runTime: {
1742 aggregate: false,
1743 average: false,
1744 median: false
1745 },
1746 waitTime: {
1747 aggregate: false,
1748 average: false,
1749 median: false
1750 },
1751 elu: {
1752 aggregate: false,
1753 average: false,
1754 median: false
1755 }
1756 })
1757 await pool.destroy()
1758 pool = new DynamicThreadPool(
1759 min,
1760 max,
1761 './tests/worker-files/thread/testWorker.js',
1762 { workerChoiceStrategy }
1763 )
1764 expect(
1765 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1766 ).toStrictEqual({
1767 runTime: {
1768 aggregate: false,
1769 average: false,
1770 median: false
1771 },
1772 waitTime: {
1773 aggregate: false,
1774 average: false,
1775 median: false
1776 },
1777 elu: {
1778 aggregate: false,
1779 average: false,
1780 median: false
1781 }
1782 })
1783 // We need to clean up the resources after our test
1784 await pool.destroy()
1785 })
1786
1787 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1788 const pool = new FixedThreadPool(
1789 max,
1790 './tests/worker-files/thread/testWorker.js',
1791 {
1792 workerChoiceStrategy:
1793 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1794 }
1795 )
1796 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1797 const promises = new Set()
1798 const maxMultiplier = 2
1799 for (let i = 0; i < max * maxMultiplier; i++) {
1800 promises.add(pool.execute())
1801 }
1802 await Promise.all(promises)
1803 for (const workerNode of pool.workerNodes) {
1804 expect(workerNode.usage).toStrictEqual({
1805 tasks: {
1806 executed: maxMultiplier,
1807 executing: 0,
1808 queued: 0,
1809 maxQueued: 0,
1810 stolen: 0,
1811 failed: 0
1812 },
1813 runTime: {
1814 history: expect.any(CircularArray)
1815 },
1816 waitTime: {
1817 history: expect.any(CircularArray)
1818 },
1819 elu: {
1820 idle: {
1821 history: expect.any(CircularArray)
1822 },
1823 active: {
1824 history: expect.any(CircularArray)
1825 }
1826 }
1827 })
1828 }
1829 expect(
1830 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1831 pool.workerChoiceStrategyContext.workerChoiceStrategy
1832 ).defaultWorkerWeight
1833 ).toBeGreaterThan(0)
1834 expect(
1835 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1836 pool.workerChoiceStrategyContext.workerChoiceStrategy
1837 ).roundId
1838 ).toBe(0)
1839 expect(
1840 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1841 pool.workerChoiceStrategyContext.workerChoiceStrategy
1842 ).nextWorkerNodeKey
1843 ).toBe(0)
1844 expect(
1845 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1846 pool.workerChoiceStrategyContext.workerChoiceStrategy
1847 ).roundWeights
1848 ).toStrictEqual([
1849 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1850 pool.workerChoiceStrategyContext.workerChoiceStrategy
1851 ).defaultWorkerWeight
1852 ])
1853 // We need to clean up the resources after our test
1854 await pool.destroy()
1855 })
1856
1857 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1858 const pool = new DynamicThreadPool(
1859 min,
1860 max,
1861 './tests/worker-files/thread/testWorker.js',
1862 {
1863 workerChoiceStrategy:
1864 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1865 }
1866 )
1867 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1868 const promises = new Set()
1869 const maxMultiplier = 2
1870 for (let i = 0; i < max * maxMultiplier; i++) {
1871 promises.add(pool.execute())
1872 }
1873 await Promise.all(promises)
1874 for (const workerNode of pool.workerNodes) {
1875 expect(workerNode.usage).toStrictEqual({
1876 tasks: {
1877 executed: expect.any(Number),
1878 executing: 0,
1879 queued: 0,
1880 maxQueued: 0,
1881 stolen: 0,
1882 failed: 0
1883 },
1884 runTime: {
1885 history: expect.any(CircularArray)
1886 },
1887 waitTime: {
1888 history: expect.any(CircularArray)
1889 },
1890 elu: {
1891 idle: {
1892 history: expect.any(CircularArray)
1893 },
1894 active: {
1895 history: expect.any(CircularArray)
1896 }
1897 }
1898 })
1899 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1900 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1901 max * maxMultiplier
1902 )
1903 }
1904 expect(
1905 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1906 pool.workerChoiceStrategyContext.workerChoiceStrategy
1907 ).defaultWorkerWeight
1908 ).toBeGreaterThan(0)
1909 expect(
1910 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1911 pool.workerChoiceStrategyContext.workerChoiceStrategy
1912 ).roundId
1913 ).toBe(0)
1914 expect(
1915 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1916 pool.workerChoiceStrategyContext.workerChoiceStrategy
1917 ).nextWorkerNodeKey
1918 ).toBe(0)
1919 expect(
1920 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1921 pool.workerChoiceStrategyContext.workerChoiceStrategy
1922 ).roundWeights
1923 ).toStrictEqual([
1924 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1925 pool.workerChoiceStrategyContext.workerChoiceStrategy
1926 ).defaultWorkerWeight
1927 ])
1928 // We need to clean up the resources after our test
1929 await pool.destroy()
1930 })
1931
1932 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1933 const workerChoiceStrategy =
1934 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1935 let pool = new FixedThreadPool(
1936 max,
1937 './tests/worker-files/thread/testWorker.js'
1938 )
1939 expect(
1940 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1941 workerChoiceStrategy
1942 ).roundId
1943 ).toBeDefined()
1944 expect(
1945 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1946 workerChoiceStrategy
1947 ).nextWorkerNodeKey
1948 ).toBeDefined()
1949 expect(
1950 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1951 workerChoiceStrategy
1952 ).defaultWorkerWeight
1953 ).toBeDefined()
1954 expect(
1955 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1956 workerChoiceStrategy
1957 ).roundWeights
1958 ).toBeDefined()
1959 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1960 expect(
1961 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1962 workerChoiceStrategy
1963 ).roundId
1964 ).toBe(0)
1965 expect(
1966 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1967 pool.workerChoiceStrategyContext.workerChoiceStrategy
1968 ).nextWorkerNodeKey
1969 ).toBe(0)
1970 expect(
1971 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1972 pool.workerChoiceStrategyContext.workerChoiceStrategy
1973 ).defaultWorkerWeight
1974 ).toBeGreaterThan(0)
1975 expect(
1976 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1977 workerChoiceStrategy
1978 ).roundWeights
1979 ).toStrictEqual([
1980 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1981 pool.workerChoiceStrategyContext.workerChoiceStrategy
1982 ).defaultWorkerWeight
1983 ])
1984 await pool.destroy()
1985 pool = new DynamicThreadPool(
1986 min,
1987 max,
1988 './tests/worker-files/thread/testWorker.js'
1989 )
1990 expect(
1991 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1992 workerChoiceStrategy
1993 ).roundId
1994 ).toBeDefined()
1995 expect(
1996 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1997 workerChoiceStrategy
1998 ).nextWorkerNodeKey
1999 ).toBeDefined()
2000 expect(
2001 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2002 workerChoiceStrategy
2003 ).defaultWorkerWeight
2004 ).toBeDefined()
2005 expect(
2006 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2007 workerChoiceStrategy
2008 ).roundWeights
2009 ).toBeDefined()
2010 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2011 expect(
2012 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2013 pool.workerChoiceStrategyContext.workerChoiceStrategy
2014 ).nextWorkerNodeKey
2015 ).toBe(0)
2016 expect(
2017 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2018 pool.workerChoiceStrategyContext.workerChoiceStrategy
2019 ).defaultWorkerWeight
2020 ).toBeGreaterThan(0)
2021 expect(
2022 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2023 workerChoiceStrategy
2024 ).roundWeights
2025 ).toStrictEqual([
2026 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2027 pool.workerChoiceStrategyContext.workerChoiceStrategy
2028 ).defaultWorkerWeight
2029 ])
2030 // We need to clean up the resources after our test
2031 await pool.destroy()
2032 })
2033
2034 it('Verify unknown strategy throw error', () => {
2035 expect(
2036 () =>
2037 new DynamicThreadPool(
2038 min,
2039 max,
2040 './tests/worker-files/thread/testWorker.js',
2041 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2042 )
2043 ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
2044 })
2045 })