1 import { expect } from 'expect'
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
11 describe('Selection strategies test suite', () => {
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
33 './tests/worker-files/thread/testWorker.mjs'
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
38 // We need to clean up the resources after our test
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
62 './tests/worker-files/thread/testWorker.mjs'
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
69 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
71 runTime: { median: false },
72 waitTime: { median: false },
73 elu: { median: false }
75 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
77 runTime: { median: false },
78 waitTime: { median: false },
79 elu: { median: false }
83 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
84 const pool = new DynamicClusterPool(
87 './tests/worker-files/cluster/testWorker.js'
89 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
90 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
91 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
100 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
102 runTime: { median: false },
103 waitTime: { median: false },
104 elu: { median: false }
110 it('Verify available strategies default internals at pool creation', async () => {
111 const pool = new FixedThreadPool(
113 './tests/worker-files/thread/testWorker.mjs'
115 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
124 ).previousWorkerNodeKey
127 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
130 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
132 ).defaultWorkerWeight
135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
137 ).workerNodeVirtualTaskRunTime
140 workerChoiceStrategy ===
141 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
146 ).defaultWorkerWeight
149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
151 ).workerNodeVirtualTaskRunTime
154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
159 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
168 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
170 ).defaultWorkerWeight
177 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
178 const pool = new DynamicThreadPool(
181 './tests/worker-files/thread/testWorker.mjs'
183 expect(pool.starting).toBe(false)
184 expect(pool.workerNodes.length).toBe(min)
185 const maxMultiplier = 10000
186 const promises = new Set()
187 for (let i = 0; i < max * maxMultiplier; i++) {
188 promises.add(pool.execute())
190 await Promise.all(promises)
191 expect(pool.workerNodes.length).toBe(max)
192 // We need to clean up the resources after our test
196 it('Verify ROUND_ROBIN strategy default policy', async () => {
197 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
198 let pool = new FixedThreadPool(
200 './tests/worker-files/thread/testWorker.mjs',
201 { workerChoiceStrategy }
203 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
204 dynamicWorkerUsage: false,
205 dynamicWorkerReady: true
208 pool = new DynamicThreadPool(
211 './tests/worker-files/thread/testWorker.mjs',
212 { workerChoiceStrategy }
214 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
215 dynamicWorkerUsage: false,
216 dynamicWorkerReady: true
218 // We need to clean up the resources after our test
222 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
223 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
224 let pool = new FixedThreadPool(
226 './tests/worker-files/thread/testWorker.mjs',
227 { workerChoiceStrategy }
230 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
249 pool = new DynamicThreadPool(
252 './tests/worker-files/thread/testWorker.mjs',
253 { workerChoiceStrategy }
256 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
274 // We need to clean up the resources after our test
278 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
279 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
280 const pool = new FixedThreadPool(
282 './tests/worker-files/thread/testWorker.mjs',
283 { workerChoiceStrategy }
285 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
286 const promises = new Set()
287 const maxMultiplier = 2
288 for (let i = 0; i < max * maxMultiplier; i++) {
289 promises.add(pool.execute())
291 await Promise.all(promises)
292 for (const workerNode of pool.workerNodes) {
293 expect(workerNode.usage).toStrictEqual({
295 executed: maxMultiplier,
299 sequentiallyStolen: 0,
304 history: new CircularArray()
307 history: new CircularArray()
311 history: new CircularArray()
314 history: new CircularArray()
320 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
321 pool.workerChoiceStrategyContext.workerChoiceStrategy
325 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
326 pool.workerChoiceStrategyContext.workerChoiceStrategy
327 ).previousWorkerNodeKey
328 ).toBe(pool.workerNodes.length - 1)
329 // We need to clean up the resources after our test
333 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
334 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
335 const pool = new DynamicThreadPool(
338 './tests/worker-files/thread/testWorker.mjs',
339 { workerChoiceStrategy }
341 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
342 const promises = new Set()
343 const maxMultiplier = 2
344 for (let i = 0; i < max * maxMultiplier; i++) {
345 promises.add(pool.execute())
347 await Promise.all(promises)
348 for (const workerNode of pool.workerNodes) {
349 expect(workerNode.usage).toStrictEqual({
351 executed: expect.any(Number),
355 sequentiallyStolen: 0,
360 history: new CircularArray()
363 history: new CircularArray()
367 history: new CircularArray()
370 history: new CircularArray()
374 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
375 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
380 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
381 pool.workerChoiceStrategyContext.workerChoiceStrategy
385 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
386 pool.workerChoiceStrategyContext.workerChoiceStrategy
387 ).previousWorkerNodeKey
388 ).toBe(pool.workerNodes.length - 1)
389 // We need to clean up the resources after our test
393 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
394 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
395 let pool = new FixedClusterPool(
397 './tests/worker-files/cluster/testWorker.js',
398 { workerChoiceStrategy }
400 let results = new Set()
401 for (let i = 0; i < max; i++) {
402 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
404 expect(results.size).toBe(max)
406 pool = new FixedThreadPool(
408 './tests/worker-files/thread/testWorker.mjs',
409 { workerChoiceStrategy }
412 for (let i = 0; i < max; i++) {
413 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
415 expect(results.size).toBe(max)
419 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
420 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
421 let pool = new FixedThreadPool(
423 './tests/worker-files/thread/testWorker.mjs',
424 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
427 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
428 pool.workerChoiceStrategyContext.workerChoiceStrategy
432 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
433 pool.workerChoiceStrategyContext.workerChoiceStrategy
434 ).previousWorkerNodeKey
436 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
438 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
439 pool.workerChoiceStrategyContext.workerChoiceStrategy
443 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
444 pool.workerChoiceStrategyContext.workerChoiceStrategy
445 ).previousWorkerNodeKey
448 pool = new DynamicThreadPool(
451 './tests/worker-files/thread/testWorker.mjs',
452 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
455 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
456 pool.workerChoiceStrategyContext.workerChoiceStrategy
460 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
461 pool.workerChoiceStrategyContext.workerChoiceStrategy
462 ).previousWorkerNodeKey
464 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
466 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
467 pool.workerChoiceStrategyContext.workerChoiceStrategy
471 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
472 pool.workerChoiceStrategyContext.workerChoiceStrategy
473 ).previousWorkerNodeKey
475 // We need to clean up the resources after our test
479 it('Verify LEAST_USED strategy default policy', async () => {
480 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
481 let pool = new FixedThreadPool(
483 './tests/worker-files/thread/testWorker.mjs',
484 { workerChoiceStrategy }
486 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
487 dynamicWorkerUsage: false,
488 dynamicWorkerReady: true
491 pool = new DynamicThreadPool(
494 './tests/worker-files/thread/testWorker.mjs',
495 { workerChoiceStrategy }
497 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
498 dynamicWorkerUsage: false,
499 dynamicWorkerReady: true
501 // We need to clean up the resources after our test
505 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
506 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
507 let pool = new FixedThreadPool(
509 './tests/worker-files/thread/testWorker.mjs',
510 { workerChoiceStrategy }
513 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
532 pool = new DynamicThreadPool(
535 './tests/worker-files/thread/testWorker.mjs',
536 { workerChoiceStrategy }
539 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
557 // We need to clean up the resources after our test
561 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
562 const pool = new FixedThreadPool(
564 './tests/worker-files/thread/testWorker.mjs',
565 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
567 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
568 const promises = new Set()
569 const maxMultiplier = 2
570 for (let i = 0; i < max * maxMultiplier; i++) {
571 promises.add(pool.execute())
573 await Promise.all(promises)
574 for (const workerNode of pool.workerNodes) {
575 expect(workerNode.usage).toStrictEqual({
577 executed: expect.any(Number),
581 sequentiallyStolen: 0,
586 history: new CircularArray()
589 history: new CircularArray()
593 history: new CircularArray()
596 history: new CircularArray()
600 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
601 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
606 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
607 pool.workerChoiceStrategyContext.workerChoiceStrategy
609 ).toEqual(expect.any(Number))
611 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
612 pool.workerChoiceStrategyContext.workerChoiceStrategy
613 ).previousWorkerNodeKey
614 ).toEqual(expect.any(Number))
615 // We need to clean up the resources after our test
619 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
620 const pool = new DynamicThreadPool(
623 './tests/worker-files/thread/testWorker.mjs',
624 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
626 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
627 const promises = new Set()
628 const maxMultiplier = 2
629 for (let i = 0; i < max * maxMultiplier; i++) {
630 promises.add(pool.execute())
632 await Promise.all(promises)
633 for (const workerNode of pool.workerNodes) {
634 expect(workerNode.usage).toStrictEqual({
636 executed: expect.any(Number),
640 sequentiallyStolen: 0,
645 history: new CircularArray()
648 history: new CircularArray()
652 history: new CircularArray()
655 history: new CircularArray()
659 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
660 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
665 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
666 pool.workerChoiceStrategyContext.workerChoiceStrategy
668 ).toEqual(expect.any(Number))
670 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
671 pool.workerChoiceStrategyContext.workerChoiceStrategy
672 ).previousWorkerNodeKey
673 ).toEqual(expect.any(Number))
674 // We need to clean up the resources after our test
678 it('Verify LEAST_BUSY strategy default policy', async () => {
679 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
680 let pool = new FixedThreadPool(
682 './tests/worker-files/thread/testWorker.mjs',
683 { workerChoiceStrategy }
685 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
686 dynamicWorkerUsage: false,
687 dynamicWorkerReady: true
690 pool = new DynamicThreadPool(
693 './tests/worker-files/thread/testWorker.mjs',
694 { workerChoiceStrategy }
696 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
697 dynamicWorkerUsage: false,
698 dynamicWorkerReady: true
700 // We need to clean up the resources after our test
704 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
705 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
706 let pool = new FixedThreadPool(
708 './tests/worker-files/thread/testWorker.mjs',
709 { workerChoiceStrategy }
712 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
731 pool = new DynamicThreadPool(
734 './tests/worker-files/thread/testWorker.mjs',
735 { workerChoiceStrategy }
738 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
756 // We need to clean up the resources after our test
760 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
761 const pool = new FixedThreadPool(
763 './tests/worker-files/thread/testWorker.mjs',
764 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
766 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
767 const promises = new Set()
768 const maxMultiplier = 2
769 for (let i = 0; i < max * maxMultiplier; i++) {
770 promises.add(pool.execute())
772 await Promise.all(promises)
773 for (const workerNode of pool.workerNodes) {
774 expect(workerNode.usage).toStrictEqual({
776 executed: expect.any(Number),
780 sequentiallyStolen: 0,
784 runTime: expect.objectContaining({
785 history: expect.any(CircularArray)
787 waitTime: expect.objectContaining({
788 history: expect.any(CircularArray)
792 history: new CircularArray()
795 history: new CircularArray()
799 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
800 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
803 if (workerNode.usage.runTime.aggregate == null) {
804 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
806 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
808 if (workerNode.usage.waitTime.aggregate == null) {
809 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
811 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
815 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
816 pool.workerChoiceStrategyContext.workerChoiceStrategy
818 ).toEqual(expect.any(Number))
820 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
821 pool.workerChoiceStrategyContext.workerChoiceStrategy
822 ).previousWorkerNodeKey
823 ).toEqual(expect.any(Number))
824 // We need to clean up the resources after our test
828 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
829 const pool = new DynamicThreadPool(
832 './tests/worker-files/thread/testWorker.mjs',
833 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
835 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
836 const promises = new Set()
837 const maxMultiplier = 2
838 for (let i = 0; i < max * maxMultiplier; i++) {
839 promises.add(pool.execute())
841 await Promise.all(promises)
842 for (const workerNode of pool.workerNodes) {
843 expect(workerNode.usage).toStrictEqual({
845 executed: expect.any(Number),
849 sequentiallyStolen: 0,
853 runTime: expect.objectContaining({
854 history: expect.any(CircularArray)
856 waitTime: expect.objectContaining({
857 history: expect.any(CircularArray)
861 history: new CircularArray()
864 history: new CircularArray()
868 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
869 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
872 if (workerNode.usage.runTime.aggregate == null) {
873 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
875 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
877 if (workerNode.usage.waitTime.aggregate == null) {
878 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
880 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
884 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
885 pool.workerChoiceStrategyContext.workerChoiceStrategy
887 ).toEqual(expect.any(Number))
889 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
890 pool.workerChoiceStrategyContext.workerChoiceStrategy
891 ).previousWorkerNodeKey
892 ).toEqual(expect.any(Number))
893 // We need to clean up the resources after our test
897 it('Verify LEAST_ELU strategy default policy', async () => {
898 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
899 let pool = new FixedThreadPool(
901 './tests/worker-files/thread/testWorker.mjs',
902 { workerChoiceStrategy }
904 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
905 dynamicWorkerUsage: false,
906 dynamicWorkerReady: true
909 pool = new DynamicThreadPool(
912 './tests/worker-files/thread/testWorker.mjs',
913 { workerChoiceStrategy }
915 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
916 dynamicWorkerUsage: false,
917 dynamicWorkerReady: true
919 // We need to clean up the resources after our test
923 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
924 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
925 let pool = new FixedThreadPool(
927 './tests/worker-files/thread/testWorker.mjs',
928 { workerChoiceStrategy }
931 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
950 pool = new DynamicThreadPool(
953 './tests/worker-files/thread/testWorker.mjs',
954 { workerChoiceStrategy }
957 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
975 // We need to clean up the resources after our test
979 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
980 const pool = new FixedThreadPool(
982 './tests/worker-files/thread/testWorker.mjs',
983 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
985 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
986 const promises = new Set()
987 const maxMultiplier = 2
988 for (let i = 0; i < max * maxMultiplier; i++) {
989 promises.add(pool.execute())
991 await Promise.all(promises)
992 for (const workerNode of pool.workerNodes) {
993 expect(workerNode.usage).toStrictEqual({
995 executed: expect.any(Number),
999 sequentiallyStolen: 0,
1004 history: new CircularArray()
1007 history: new CircularArray()
1009 elu: expect.objectContaining({
1010 idle: expect.objectContaining({
1011 history: expect.any(CircularArray)
1013 active: expect.objectContaining({
1014 history: expect.any(CircularArray)
1018 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1019 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1022 if (workerNode.usage.elu.active.aggregate == null) {
1023 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1025 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1027 if (workerNode.usage.elu.idle.aggregate == null) {
1028 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1030 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1032 if (workerNode.usage.elu.utilization == null) {
1033 expect(workerNode.usage.elu.utilization).toBeUndefined()
1035 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1036 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1040 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1041 pool.workerChoiceStrategyContext.workerChoiceStrategy
1043 ).toEqual(expect.any(Number))
1045 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1046 pool.workerChoiceStrategyContext.workerChoiceStrategy
1047 ).previousWorkerNodeKey
1048 ).toEqual(expect.any(Number))
1049 // We need to clean up the resources after our test
1050 await pool.destroy()
1053 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1054 const pool = new DynamicThreadPool(
1057 './tests/worker-files/thread/testWorker.mjs',
1058 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1060 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1061 const promises = new Set()
1062 const maxMultiplier = 2
1063 for (let i = 0; i < max * maxMultiplier; i++) {
1064 promises.add(pool.execute())
1066 await Promise.all(promises)
1067 for (const workerNode of pool.workerNodes) {
1068 expect(workerNode.usage).toStrictEqual({
1070 executed: expect.any(Number),
1074 sequentiallyStolen: 0,
1079 history: new CircularArray()
1082 history: new CircularArray()
1084 elu: expect.objectContaining({
1085 idle: expect.objectContaining({
1086 history: expect.any(CircularArray)
1088 active: expect.objectContaining({
1089 history: expect.any(CircularArray)
1093 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1094 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1097 if (workerNode.usage.elu.active.aggregate == null) {
1098 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1100 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1102 if (workerNode.usage.elu.idle.aggregate == null) {
1103 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1105 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1107 if (workerNode.usage.elu.utilization == null) {
1108 expect(workerNode.usage.elu.utilization).toBeUndefined()
1110 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1111 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1115 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1116 pool.workerChoiceStrategyContext.workerChoiceStrategy
1118 ).toEqual(expect.any(Number))
1120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1121 pool.workerChoiceStrategyContext.workerChoiceStrategy
1122 ).previousWorkerNodeKey
1123 ).toEqual(expect.any(Number))
1124 // We need to clean up the resources after our test
1125 await pool.destroy()
1128 it('Verify FAIR_SHARE strategy default policy', async () => {
1129 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1130 let pool = new FixedThreadPool(
1132 './tests/worker-files/thread/testWorker.mjs',
1133 { workerChoiceStrategy }
1135 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1136 dynamicWorkerUsage: false,
1137 dynamicWorkerReady: true
1139 await pool.destroy()
1140 pool = new DynamicThreadPool(
1143 './tests/worker-files/thread/testWorker.mjs',
1144 { workerChoiceStrategy }
1146 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1147 dynamicWorkerUsage: false,
1148 dynamicWorkerReady: true
1150 // We need to clean up the resources after our test
1151 await pool.destroy()
1154 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1155 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1156 let pool = new FixedThreadPool(
1158 './tests/worker-files/thread/testWorker.mjs',
1159 { workerChoiceStrategy }
1162 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1180 await pool.destroy()
1181 pool = new DynamicThreadPool(
1184 './tests/worker-files/thread/testWorker.mjs',
1185 { workerChoiceStrategy }
1188 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1206 // We need to clean up the resources after our test
1207 await pool.destroy()
1210 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1211 const pool = new FixedThreadPool(
1213 './tests/worker-files/thread/testWorker.mjs',
1214 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1216 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1217 const promises = new Set()
1218 const maxMultiplier = 2
1219 for (let i = 0; i < max * maxMultiplier; i++) {
1220 promises.add(pool.execute())
1222 await Promise.all(promises)
1223 for (const workerNode of pool.workerNodes) {
1224 expect(workerNode.usage).toStrictEqual({
1226 executed: expect.any(Number),
1230 sequentiallyStolen: 0,
1234 runTime: expect.objectContaining({
1235 history: expect.any(CircularArray)
1238 history: new CircularArray()
1240 elu: expect.objectContaining({
1241 idle: expect.objectContaining({
1242 history: expect.any(CircularArray)
1244 active: expect.objectContaining({
1245 history: expect.any(CircularArray)
1249 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1250 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1253 if (workerNode.usage.runTime.aggregate == null) {
1254 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1256 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1258 if (workerNode.usage.runTime.average == null) {
1259 expect(workerNode.usage.runTime.average).toBeUndefined()
1261 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1263 if (workerNode.usage.elu.active.aggregate == null) {
1264 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1266 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1268 if (workerNode.usage.elu.idle.aggregate == null) {
1269 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1271 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1273 if (workerNode.usage.elu.utilization == null) {
1274 expect(workerNode.usage.elu.utilization).toBeUndefined()
1276 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1277 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1279 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1282 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1283 pool.workerChoiceStrategyContext.workerChoiceStrategy
1285 ).toEqual(expect.any(Number))
1287 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1288 pool.workerChoiceStrategyContext.workerChoiceStrategy
1289 ).previousWorkerNodeKey
1290 ).toEqual(expect.any(Number))
1291 // We need to clean up the resources after our test
1292 await pool.destroy()
1295 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1296 const pool = new DynamicThreadPool(
1299 './tests/worker-files/thread/testWorker.mjs',
1300 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1302 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1303 const promises = new Set()
1304 const maxMultiplier = 2
1305 for (let i = 0; i < max * maxMultiplier; i++) {
1306 promises.add(pool.execute())
1308 await Promise.all(promises)
1309 for (const workerNode of pool.workerNodes) {
1310 expect(workerNode.usage).toStrictEqual({
1312 executed: expect.any(Number),
1316 sequentiallyStolen: 0,
1320 runTime: expect.objectContaining({
1321 history: expect.any(CircularArray)
1324 history: new CircularArray()
1326 elu: expect.objectContaining({
1327 idle: expect.objectContaining({
1328 history: expect.any(CircularArray)
1330 active: expect.objectContaining({
1331 history: expect.any(CircularArray)
1335 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1336 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1339 if (workerNode.usage.runTime.aggregate == null) {
1340 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1342 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1344 if (workerNode.usage.runTime.average == null) {
1345 expect(workerNode.usage.runTime.average).toBeUndefined()
1347 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1349 if (workerNode.usage.elu.active.aggregate == null) {
1350 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1352 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1354 if (workerNode.usage.elu.idle.aggregate == null) {
1355 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1357 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1359 if (workerNode.usage.elu.utilization == null) {
1360 expect(workerNode.usage.elu.utilization).toBeUndefined()
1362 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1363 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1365 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1368 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1369 pool.workerChoiceStrategyContext.workerChoiceStrategy
1371 ).toEqual(expect.any(Number))
1373 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1374 pool.workerChoiceStrategyContext.workerChoiceStrategy
1375 ).previousWorkerNodeKey
1376 ).toEqual(expect.any(Number))
1377 // We need to clean up the resources after our test
1378 await pool.destroy()
1381 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1382 const pool = new DynamicThreadPool(
1385 './tests/worker-files/thread/testWorker.mjs',
1387 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1388 workerChoiceStrategyOptions: {
1389 runTime: { median: true }
1393 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1394 const promises = new Set()
1395 const maxMultiplier = 2
1396 for (let i = 0; i < max * maxMultiplier; i++) {
1397 promises.add(pool.execute())
1399 await Promise.all(promises)
1400 for (const workerNode of pool.workerNodes) {
1401 expect(workerNode.usage).toStrictEqual({
1403 executed: expect.any(Number),
1407 sequentiallyStolen: 0,
1411 runTime: expect.objectContaining({
1412 history: expect.any(CircularArray)
1415 history: new CircularArray()
1417 elu: expect.objectContaining({
1418 idle: expect.objectContaining({
1419 history: expect.any(CircularArray)
1421 active: expect.objectContaining({
1422 history: expect.any(CircularArray)
1426 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1427 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1430 if (workerNode.usage.runTime.aggregate == null) {
1431 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1433 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1435 if (workerNode.usage.runTime.median == null) {
1436 expect(workerNode.usage.runTime.median).toBeUndefined()
1438 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1440 if (workerNode.usage.elu.active.aggregate == null) {
1441 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1443 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1445 if (workerNode.usage.elu.idle.aggregate == null) {
1446 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1448 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1450 if (workerNode.usage.elu.utilization == null) {
1451 expect(workerNode.usage.elu.utilization).toBeUndefined()
1453 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1454 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1456 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1459 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1460 pool.workerChoiceStrategyContext.workerChoiceStrategy
1462 ).toEqual(expect.any(Number))
1464 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1465 pool.workerChoiceStrategyContext.workerChoiceStrategy
1466 ).previousWorkerNodeKey
1467 ).toEqual(expect.any(Number))
1468 // We need to clean up the resources after our test
1469 await pool.destroy()
1472 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1473 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1474 let pool = new FixedThreadPool(
1476 './tests/worker-files/thread/testWorker.mjs'
1478 for (const workerNode of pool.workerNodes) {
1479 workerNode.strategyData = {
1480 virtualTaskEndTimestamp: performance.now()
1483 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1484 for (const workerNode of pool.workerNodes) {
1485 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1487 await pool.destroy()
1488 pool = new DynamicThreadPool(
1491 './tests/worker-files/thread/testWorker.mjs'
1493 for (const workerNode of pool.workerNodes) {
1494 workerNode.strategyData = {
1495 virtualTaskEndTimestamp: performance.now()
1498 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1499 for (const workerNode of pool.workerNodes) {
1500 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1502 // We need to clean up the resources after our test
1503 await pool.destroy()
1506 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1507 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1508 let pool = new FixedThreadPool(
1510 './tests/worker-files/thread/testWorker.mjs',
1511 { workerChoiceStrategy }
1513 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1514 dynamicWorkerUsage: false,
1515 dynamicWorkerReady: true
1517 await pool.destroy()
1518 pool = new DynamicThreadPool(
1521 './tests/worker-files/thread/testWorker.mjs',
1522 { workerChoiceStrategy }
1524 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1525 dynamicWorkerUsage: false,
1526 dynamicWorkerReady: true
1528 // We need to clean up the resources after our test
1529 await pool.destroy()
1532 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1533 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1534 let pool = new FixedThreadPool(
1536 './tests/worker-files/thread/testWorker.mjs',
1537 { workerChoiceStrategy }
1540 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1558 await pool.destroy()
1559 pool = new DynamicThreadPool(
1562 './tests/worker-files/thread/testWorker.mjs',
1563 { workerChoiceStrategy }
1566 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1584 // We need to clean up the resources after our test
1585 await pool.destroy()
1588 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1589 const pool = new FixedThreadPool(
1591 './tests/worker-files/thread/testWorker.mjs',
1592 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1594 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1595 const promises = new Set()
1596 const maxMultiplier = 2
1597 for (let i = 0; i < max * maxMultiplier; i++) {
1598 promises.add(pool.execute())
1600 await Promise.all(promises)
1601 for (const workerNode of pool.workerNodes) {
1602 expect(workerNode.usage).toStrictEqual({
1604 executed: expect.any(Number),
1608 sequentiallyStolen: 0,
1612 runTime: expect.objectContaining({
1613 history: expect.any(CircularArray)
1616 history: new CircularArray()
1620 history: new CircularArray()
1623 history: new CircularArray()
1627 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1628 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1631 if (workerNode.usage.runTime.aggregate == null) {
1632 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1634 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1636 if (workerNode.usage.runTime.average == null) {
1637 expect(workerNode.usage.runTime.average).toBeUndefined()
1639 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 pool.workerChoiceStrategyContext.workerChoiceStrategy
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 ).previousWorkerNodeKey
1653 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1654 pool.workerChoiceStrategyContext.workerChoiceStrategy
1655 ).defaultWorkerWeight
1656 ).toBeGreaterThan(0)
1658 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1659 pool.workerChoiceStrategyContext.workerChoiceStrategy
1660 ).workerNodeVirtualTaskRunTime
1661 ).toBeGreaterThanOrEqual(0)
1662 // We need to clean up the resources after our test
1663 await pool.destroy()
1666 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1667 const pool = new DynamicThreadPool(
1670 './tests/worker-files/thread/testWorker.mjs',
1671 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1673 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1674 const promises = new Set()
1675 const maxMultiplier = 2
1676 for (let i = 0; i < max * maxMultiplier; i++) {
1677 promises.add(pool.execute())
1679 await Promise.all(promises)
1680 for (const workerNode of pool.workerNodes) {
1681 expect(workerNode.usage).toStrictEqual({
1683 executed: expect.any(Number),
1687 sequentiallyStolen: 0,
1691 runTime: expect.objectContaining({
1692 history: expect.any(CircularArray)
1695 history: new CircularArray()
1699 history: new CircularArray()
1702 history: new CircularArray()
1706 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1707 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1710 if (workerNode.usage.runTime.aggregate == null) {
1711 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1713 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1715 if (workerNode.usage.runTime.average == null) {
1716 expect(workerNode.usage.runTime.average).toBeUndefined()
1718 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1722 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1723 pool.workerChoiceStrategyContext.workerChoiceStrategy
1727 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1728 pool.workerChoiceStrategyContext.workerChoiceStrategy
1729 ).previousWorkerNodeKey
1732 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1733 pool.workerChoiceStrategyContext.workerChoiceStrategy
1734 ).defaultWorkerWeight
1735 ).toBeGreaterThan(0)
1737 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1738 pool.workerChoiceStrategyContext.workerChoiceStrategy
1739 ).workerNodeVirtualTaskRunTime
1740 ).toBeGreaterThanOrEqual(0)
1741 // We need to clean up the resources after our test
1742 await pool.destroy()
1745 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1746 const pool = new DynamicThreadPool(
1749 './tests/worker-files/thread/testWorker.mjs',
1751 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1752 workerChoiceStrategyOptions: {
1753 runTime: { median: true }
1757 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1758 const promises = new Set()
1759 const maxMultiplier = 2
1760 for (let i = 0; i < max * maxMultiplier; i++) {
1761 promises.add(pool.execute())
1763 await Promise.all(promises)
1764 for (const workerNode of pool.workerNodes) {
1765 expect(workerNode.usage).toStrictEqual({
1767 executed: expect.any(Number),
1771 sequentiallyStolen: 0,
1775 runTime: expect.objectContaining({
1776 history: expect.any(CircularArray)
1779 history: new CircularArray()
1783 history: new CircularArray()
1786 history: new CircularArray()
1790 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1791 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1794 if (workerNode.usage.runTime.aggregate == null) {
1795 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1797 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1799 if (workerNode.usage.runTime.median == null) {
1800 expect(workerNode.usage.runTime.median).toBeUndefined()
1802 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1806 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1807 pool.workerChoiceStrategyContext.workerChoiceStrategy
1811 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1812 pool.workerChoiceStrategyContext.workerChoiceStrategy
1813 ).previousWorkerNodeKey
1816 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1817 pool.workerChoiceStrategyContext.workerChoiceStrategy
1818 ).defaultWorkerWeight
1819 ).toBeGreaterThan(0)
1821 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1822 pool.workerChoiceStrategyContext.workerChoiceStrategy
1823 ).workerNodeVirtualTaskRunTime
1824 ).toBeGreaterThanOrEqual(0)
1825 // We need to clean up the resources after our test
1826 await pool.destroy()
1829 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1830 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1831 let pool = new FixedThreadPool(
1833 './tests/worker-files/thread/testWorker.mjs'
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 workerChoiceStrategy
1841 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1842 workerChoiceStrategy
1843 ).previousWorkerNodeKey
1846 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1847 workerChoiceStrategy
1848 ).defaultWorkerWeight
1851 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1852 workerChoiceStrategy
1853 ).workerNodeVirtualTaskRunTime
1855 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1857 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1858 pool.workerChoiceStrategyContext.workerChoiceStrategy
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 pool.workerChoiceStrategyContext.workerChoiceStrategy
1864 ).previousWorkerNodeKey
1867 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1868 pool.workerChoiceStrategyContext.workerChoiceStrategy
1869 ).defaultWorkerWeight
1870 ).toBeGreaterThan(0)
1872 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1873 pool.workerChoiceStrategyContext.workerChoiceStrategy
1874 ).workerNodeVirtualTaskRunTime
1876 await pool.destroy()
1877 pool = new DynamicThreadPool(
1880 './tests/worker-files/thread/testWorker.mjs'
1883 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1884 workerChoiceStrategy
1888 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1889 workerChoiceStrategy
1890 ).previousWorkerNodeKey
1893 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1894 workerChoiceStrategy
1895 ).defaultWorkerWeight
1898 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1899 workerChoiceStrategy
1900 ).workerNodeVirtualTaskRunTime
1902 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1904 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1905 pool.workerChoiceStrategyContext.workerChoiceStrategy
1909 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1910 pool.workerChoiceStrategyContext.workerChoiceStrategy
1911 ).previousWorkerNodeKey
1914 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1915 pool.workerChoiceStrategyContext.workerChoiceStrategy
1916 ).defaultWorkerWeight
1917 ).toBeGreaterThan(0)
1919 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1920 pool.workerChoiceStrategyContext.workerChoiceStrategy
1921 ).workerNodeVirtualTaskRunTime
1923 // We need to clean up the resources after our test
1924 await pool.destroy()
1927 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1928 const workerChoiceStrategy =
1929 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1930 let pool = new FixedThreadPool(
1932 './tests/worker-files/thread/testWorker.mjs',
1933 { workerChoiceStrategy }
1935 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1936 dynamicWorkerUsage: false,
1937 dynamicWorkerReady: true
1939 await pool.destroy()
1940 pool = new DynamicThreadPool(
1943 './tests/worker-files/thread/testWorker.mjs',
1944 { workerChoiceStrategy }
1946 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1947 dynamicWorkerUsage: false,
1948 dynamicWorkerReady: true
1950 // We need to clean up the resources after our test
1951 await pool.destroy()
1954 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1955 const workerChoiceStrategy =
1956 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1957 let pool = new FixedThreadPool(
1959 './tests/worker-files/thread/testWorker.mjs',
1960 { workerChoiceStrategy }
1963 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1981 await pool.destroy()
1982 pool = new DynamicThreadPool(
1985 './tests/worker-files/thread/testWorker.mjs',
1986 { workerChoiceStrategy }
1989 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
2007 // We need to clean up the resources after our test
2008 await pool.destroy()
2011 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
2012 const pool = new FixedThreadPool(
2014 './tests/worker-files/thread/testWorker.mjs',
2016 workerChoiceStrategy:
2017 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2020 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2021 const promises = new Set()
2022 const maxMultiplier = 2
2023 for (let i = 0; i < max * maxMultiplier; i++) {
2024 promises.add(pool.execute())
2026 await Promise.all(promises)
2027 for (const workerNode of pool.workerNodes) {
2028 expect(workerNode.usage).toStrictEqual({
2030 executed: expect.any(Number),
2034 sequentiallyStolen: 0,
2038 runTime: expect.objectContaining({
2039 history: expect.any(CircularArray)
2042 history: new CircularArray()
2046 history: new CircularArray()
2049 history: new CircularArray()
2053 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2054 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2059 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2060 pool.workerChoiceStrategyContext.workerChoiceStrategy
2061 ).defaultWorkerWeight
2062 ).toBeGreaterThan(0)
2064 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2065 pool.workerChoiceStrategyContext.workerChoiceStrategy
2069 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategyContext.workerChoiceStrategy
2074 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2075 pool.workerChoiceStrategyContext.workerChoiceStrategy
2079 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2080 pool.workerChoiceStrategyContext.workerChoiceStrategy
2081 ).previousWorkerNodeKey
2082 ).toEqual(expect.any(Number))
2084 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2085 pool.workerChoiceStrategyContext.workerChoiceStrategy
2088 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2089 pool.workerChoiceStrategyContext.workerChoiceStrategy
2090 ).defaultWorkerWeight
2092 // We need to clean up the resources after our test
2093 await pool.destroy()
2096 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2097 const pool = new DynamicThreadPool(
2100 './tests/worker-files/thread/testWorker.mjs',
2102 workerChoiceStrategy:
2103 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2106 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2107 const promises = new Set()
2108 const maxMultiplier = 2
2109 for (let i = 0; i < max * maxMultiplier; i++) {
2110 promises.add(pool.execute())
2112 await Promise.all(promises)
2113 for (const workerNode of pool.workerNodes) {
2114 expect(workerNode.usage).toStrictEqual({
2116 executed: expect.any(Number),
2120 sequentiallyStolen: 0,
2124 runTime: expect.objectContaining({
2125 history: expect.any(CircularArray)
2128 history: new CircularArray()
2132 history: new CircularArray()
2135 history: new CircularArray()
2139 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2140 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2145 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2146 pool.workerChoiceStrategyContext.workerChoiceStrategy
2147 ).defaultWorkerWeight
2148 ).toBeGreaterThan(0)
2150 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategy
2155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategy
2160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2161 pool.workerChoiceStrategyContext.workerChoiceStrategy
2165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2166 pool.workerChoiceStrategyContext.workerChoiceStrategy
2167 ).previousWorkerNodeKey
2168 ).toEqual(expect.any(Number))
2170 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2171 pool.workerChoiceStrategyContext.workerChoiceStrategy
2174 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2175 pool.workerChoiceStrategyContext.workerChoiceStrategy
2176 ).defaultWorkerWeight
2178 // We need to clean up the resources after our test
2179 await pool.destroy()
2182 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2183 const workerChoiceStrategy =
2184 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2185 let pool = new FixedThreadPool(
2187 './tests/worker-files/thread/testWorker.mjs'
2190 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2191 workerChoiceStrategy
2195 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2196 workerChoiceStrategy
2200 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2201 workerChoiceStrategy
2205 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2206 workerChoiceStrategy
2207 ).previousWorkerNodeKey
2210 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2211 workerChoiceStrategy
2212 ).defaultWorkerWeight
2215 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2216 workerChoiceStrategy
2219 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2221 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2222 pool.workerChoiceStrategyContext.workerChoiceStrategy
2226 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2227 pool.workerChoiceStrategyContext.workerChoiceStrategy
2231 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2232 pool.workerChoiceStrategyContext.workerChoiceStrategy
2236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2237 pool.workerChoiceStrategyContext.workerChoiceStrategy
2238 ).previousWorkerNodeKey
2241 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2242 pool.workerChoiceStrategyContext.workerChoiceStrategy
2243 ).defaultWorkerWeight
2244 ).toBeGreaterThan(0)
2246 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2247 pool.workerChoiceStrategyContext.workerChoiceStrategy
2250 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2251 pool.workerChoiceStrategyContext.workerChoiceStrategy
2252 ).defaultWorkerWeight
2254 await pool.destroy()
2255 pool = new DynamicThreadPool(
2258 './tests/worker-files/thread/testWorker.mjs'
2261 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2262 workerChoiceStrategy
2266 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2267 workerChoiceStrategy
2271 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2272 workerChoiceStrategy
2276 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2277 workerChoiceStrategy
2278 ).previousWorkerNodeKey
2281 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2282 workerChoiceStrategy
2283 ).defaultWorkerWeight
2286 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2287 workerChoiceStrategy
2290 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2292 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2293 pool.workerChoiceStrategyContext.workerChoiceStrategy
2297 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2298 pool.workerChoiceStrategyContext.workerChoiceStrategy
2302 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2303 pool.workerChoiceStrategyContext.workerChoiceStrategy
2307 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2308 pool.workerChoiceStrategyContext.workerChoiceStrategy
2309 ).previousWorkerNodeKey
2312 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2313 pool.workerChoiceStrategyContext.workerChoiceStrategy
2314 ).defaultWorkerWeight
2315 ).toBeGreaterThan(0)
2317 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2318 pool.workerChoiceStrategyContext.workerChoiceStrategy
2321 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2322 pool.workerChoiceStrategyContext.workerChoiceStrategy
2323 ).defaultWorkerWeight
2325 // We need to clean up the resources after our test
2326 await pool.destroy()
2329 it('Verify unknown strategy throw error', () => {
2332 new DynamicThreadPool(
2335 './tests/worker-files/thread/testWorker.mjs',
2336 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2338 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")