1 import { expect } from 'expect'
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
11 describe('Selection strategies test suite', () => {
15 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
16 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
17 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
18 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
19 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
20 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
21 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
22 'WEIGHTED_ROUND_ROBIN'
24 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
25 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
29 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
30 const pool = new DynamicThreadPool(
33 './tests/worker-files/thread/testWorker.mjs'
35 expect(pool.opts.workerChoiceStrategy).toBe(
36 WorkerChoiceStrategies.ROUND_ROBIN
38 // We need to clean up the resources after our test
42 it('Verify available strategies are taken at pool creation', async () => {
43 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
44 const pool = new FixedThreadPool(
46 './tests/worker-files/thread/testWorker.mjs',
47 { workerChoiceStrategy }
49 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
50 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
57 it('Verify available strategies can be set after pool creation', async () => {
58 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
59 const pool = new DynamicThreadPool(
62 './tests/worker-files/thread/testWorker.mjs'
64 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
65 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
66 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
69 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
70 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
71 retries: pool.info.maxSize,
72 runTime: { median: false },
73 waitTime: { median: false },
74 elu: { median: false }
78 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
79 const pool = new DynamicClusterPool(
82 './tests/worker-files/cluster/testWorker.js'
84 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
85 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
86 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
89 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
90 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
91 retries: pool.info.maxSize,
92 runTime: { median: false },
93 waitTime: { median: false },
94 elu: { median: false }
100 it('Verify available strategies default internals at pool creation', async () => {
101 const pool = new FixedThreadPool(
103 './tests/worker-files/thread/testWorker.mjs'
105 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
107 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
112 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
114 ).previousWorkerNodeKey
117 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
120 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
122 ).defaultWorkerWeight
125 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
127 ).workerNodeVirtualTaskRunTime
130 workerChoiceStrategy ===
131 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
134 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
136 ).defaultWorkerWeight
139 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
141 ).workerNodeVirtualTaskRunTime
144 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
149 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
154 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
158 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
160 ).defaultWorkerWeight
167 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
168 const pool = new DynamicThreadPool(
171 './tests/worker-files/thread/testWorker.mjs'
173 expect(pool.starting).toBe(false)
174 expect(pool.workerNodes.length).toBe(min)
175 const maxMultiplier = 10000
176 const promises = new Set()
177 for (let i = 0; i < max * maxMultiplier; i++) {
178 promises.add(pool.execute())
180 await Promise.all(promises)
181 expect(pool.workerNodes.length).toBe(max)
182 // We need to clean up the resources after our test
186 it('Verify ROUND_ROBIN strategy default policy', async () => {
187 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
188 let pool = new FixedThreadPool(
190 './tests/worker-files/thread/testWorker.mjs',
191 { workerChoiceStrategy }
193 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
194 dynamicWorkerUsage: false,
195 dynamicWorkerReady: true
198 pool = new DynamicThreadPool(
201 './tests/worker-files/thread/testWorker.mjs',
202 { workerChoiceStrategy }
204 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
205 dynamicWorkerUsage: false,
206 dynamicWorkerReady: true
208 // We need to clean up the resources after our test
212 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
213 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
214 let pool = new FixedThreadPool(
216 './tests/worker-files/thread/testWorker.mjs',
217 { workerChoiceStrategy }
220 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
239 pool = new DynamicThreadPool(
242 './tests/worker-files/thread/testWorker.mjs',
243 { workerChoiceStrategy }
246 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
264 // We need to clean up the resources after our test
268 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
269 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
270 const pool = new FixedThreadPool(
272 './tests/worker-files/thread/testWorker.mjs',
273 { workerChoiceStrategy }
275 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
276 const promises = new Set()
277 const maxMultiplier = 2
278 for (let i = 0; i < max * maxMultiplier; i++) {
279 promises.add(pool.execute())
281 await Promise.all(promises)
282 for (const workerNode of pool.workerNodes) {
283 expect(workerNode.usage).toStrictEqual({
285 executed: maxMultiplier,
289 sequentiallyStolen: 0,
294 history: new CircularArray()
297 history: new CircularArray()
301 history: new CircularArray()
304 history: new CircularArray()
310 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
311 pool.workerChoiceStrategyContext.workerChoiceStrategy
315 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
316 pool.workerChoiceStrategyContext.workerChoiceStrategy
317 ).previousWorkerNodeKey
318 ).toBe(pool.workerNodes.length - 1)
319 // We need to clean up the resources after our test
323 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
324 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
325 const pool = new DynamicThreadPool(
328 './tests/worker-files/thread/testWorker.mjs',
329 { workerChoiceStrategy }
331 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
332 const promises = new Set()
333 const maxMultiplier = 2
334 for (let i = 0; i < max * maxMultiplier; i++) {
335 promises.add(pool.execute())
337 await Promise.all(promises)
338 for (const workerNode of pool.workerNodes) {
339 expect(workerNode.usage).toStrictEqual({
341 executed: expect.any(Number),
345 sequentiallyStolen: 0,
350 history: new CircularArray()
353 history: new CircularArray()
357 history: new CircularArray()
360 history: new CircularArray()
364 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
365 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
370 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
371 pool.workerChoiceStrategyContext.workerChoiceStrategy
375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
376 pool.workerChoiceStrategyContext.workerChoiceStrategy
377 ).previousWorkerNodeKey
378 ).toBe(pool.workerNodes.length - 1)
379 // We need to clean up the resources after our test
383 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
384 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
385 let pool = new FixedClusterPool(
387 './tests/worker-files/cluster/testWorker.js',
388 { workerChoiceStrategy }
390 let results = new Set()
391 for (let i = 0; i < max; i++) {
392 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
394 expect(results.size).toBe(max)
396 pool = new FixedThreadPool(
398 './tests/worker-files/thread/testWorker.mjs',
399 { workerChoiceStrategy }
402 for (let i = 0; i < max; i++) {
403 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
405 expect(results.size).toBe(max)
409 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
410 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
411 let pool = new FixedThreadPool(
413 './tests/worker-files/thread/testWorker.mjs',
414 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
417 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
418 pool.workerChoiceStrategyContext.workerChoiceStrategy
422 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
423 pool.workerChoiceStrategyContext.workerChoiceStrategy
424 ).previousWorkerNodeKey
426 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
428 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
429 pool.workerChoiceStrategyContext.workerChoiceStrategy
433 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
434 pool.workerChoiceStrategyContext.workerChoiceStrategy
435 ).previousWorkerNodeKey
438 pool = new DynamicThreadPool(
441 './tests/worker-files/thread/testWorker.mjs',
442 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
450 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
451 pool.workerChoiceStrategyContext.workerChoiceStrategy
452 ).previousWorkerNodeKey
454 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
456 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
457 pool.workerChoiceStrategyContext.workerChoiceStrategy
461 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
462 pool.workerChoiceStrategyContext.workerChoiceStrategy
463 ).previousWorkerNodeKey
465 // We need to clean up the resources after our test
469 it('Verify LEAST_USED strategy default policy', async () => {
470 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
471 let pool = new FixedThreadPool(
473 './tests/worker-files/thread/testWorker.mjs',
474 { workerChoiceStrategy }
476 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
477 dynamicWorkerUsage: false,
478 dynamicWorkerReady: true
481 pool = new DynamicThreadPool(
484 './tests/worker-files/thread/testWorker.mjs',
485 { workerChoiceStrategy }
487 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
488 dynamicWorkerUsage: false,
489 dynamicWorkerReady: true
491 // We need to clean up the resources after our test
495 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
496 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
497 let pool = new FixedThreadPool(
499 './tests/worker-files/thread/testWorker.mjs',
500 { workerChoiceStrategy }
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
522 pool = new DynamicThreadPool(
525 './tests/worker-files/thread/testWorker.mjs',
526 { workerChoiceStrategy }
529 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
547 // We need to clean up the resources after our test
551 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
552 const pool = new FixedThreadPool(
554 './tests/worker-files/thread/testWorker.mjs',
555 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
557 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
558 const promises = new Set()
559 const maxMultiplier = 2
560 for (let i = 0; i < max * maxMultiplier; i++) {
561 promises.add(pool.execute())
563 await Promise.all(promises)
564 for (const workerNode of pool.workerNodes) {
565 expect(workerNode.usage).toStrictEqual({
567 executed: expect.any(Number),
571 sequentiallyStolen: 0,
576 history: new CircularArray()
579 history: new CircularArray()
583 history: new CircularArray()
586 history: new CircularArray()
590 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
591 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
596 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
597 pool.workerChoiceStrategyContext.workerChoiceStrategy
599 ).toEqual(expect.any(Number))
601 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
602 pool.workerChoiceStrategyContext.workerChoiceStrategy
603 ).previousWorkerNodeKey
604 ).toEqual(expect.any(Number))
605 // We need to clean up the resources after our test
609 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
610 const pool = new DynamicThreadPool(
613 './tests/worker-files/thread/testWorker.mjs',
614 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
616 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
617 const promises = new Set()
618 const maxMultiplier = 2
619 for (let i = 0; i < max * maxMultiplier; i++) {
620 promises.add(pool.execute())
622 await Promise.all(promises)
623 for (const workerNode of pool.workerNodes) {
624 expect(workerNode.usage).toStrictEqual({
626 executed: expect.any(Number),
630 sequentiallyStolen: 0,
635 history: new CircularArray()
638 history: new CircularArray()
642 history: new CircularArray()
645 history: new CircularArray()
649 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
650 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
655 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
656 pool.workerChoiceStrategyContext.workerChoiceStrategy
658 ).toEqual(expect.any(Number))
660 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
661 pool.workerChoiceStrategyContext.workerChoiceStrategy
662 ).previousWorkerNodeKey
663 ).toEqual(expect.any(Number))
664 // We need to clean up the resources after our test
668 it('Verify LEAST_BUSY strategy default policy', async () => {
669 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
670 let pool = new FixedThreadPool(
672 './tests/worker-files/thread/testWorker.mjs',
673 { workerChoiceStrategy }
675 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
676 dynamicWorkerUsage: false,
677 dynamicWorkerReady: true
680 pool = new DynamicThreadPool(
683 './tests/worker-files/thread/testWorker.mjs',
684 { workerChoiceStrategy }
686 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
687 dynamicWorkerUsage: false,
688 dynamicWorkerReady: true
690 // We need to clean up the resources after our test
694 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
695 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
696 let pool = new FixedThreadPool(
698 './tests/worker-files/thread/testWorker.mjs',
699 { workerChoiceStrategy }
702 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
721 pool = new DynamicThreadPool(
724 './tests/worker-files/thread/testWorker.mjs',
725 { workerChoiceStrategy }
728 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
746 // We need to clean up the resources after our test
750 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
751 const pool = new FixedThreadPool(
753 './tests/worker-files/thread/testWorker.mjs',
754 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
756 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
757 const promises = new Set()
758 const maxMultiplier = 2
759 for (let i = 0; i < max * maxMultiplier; i++) {
760 promises.add(pool.execute())
762 await Promise.all(promises)
763 for (const workerNode of pool.workerNodes) {
764 expect(workerNode.usage).toStrictEqual({
766 executed: expect.any(Number),
770 sequentiallyStolen: 0,
774 runTime: expect.objectContaining({
775 history: expect.any(CircularArray)
777 waitTime: expect.objectContaining({
778 history: expect.any(CircularArray)
782 history: new CircularArray()
785 history: new CircularArray()
789 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
790 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
793 if (workerNode.usage.runTime.aggregate == null) {
794 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
796 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
798 if (workerNode.usage.waitTime.aggregate == null) {
799 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
801 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
805 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
806 pool.workerChoiceStrategyContext.workerChoiceStrategy
808 ).toEqual(expect.any(Number))
810 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
811 pool.workerChoiceStrategyContext.workerChoiceStrategy
812 ).previousWorkerNodeKey
813 ).toEqual(expect.any(Number))
814 // We need to clean up the resources after our test
818 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
819 const pool = new DynamicThreadPool(
822 './tests/worker-files/thread/testWorker.mjs',
823 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
825 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
826 const promises = new Set()
827 const maxMultiplier = 2
828 for (let i = 0; i < max * maxMultiplier; i++) {
829 promises.add(pool.execute())
831 await Promise.all(promises)
832 for (const workerNode of pool.workerNodes) {
833 expect(workerNode.usage).toStrictEqual({
835 executed: expect.any(Number),
839 sequentiallyStolen: 0,
843 runTime: expect.objectContaining({
844 history: expect.any(CircularArray)
846 waitTime: expect.objectContaining({
847 history: expect.any(CircularArray)
851 history: new CircularArray()
854 history: new CircularArray()
858 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
859 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
862 if (workerNode.usage.runTime.aggregate == null) {
863 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
865 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
867 if (workerNode.usage.waitTime.aggregate == null) {
868 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
870 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
874 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
875 pool.workerChoiceStrategyContext.workerChoiceStrategy
877 ).toEqual(expect.any(Number))
879 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
880 pool.workerChoiceStrategyContext.workerChoiceStrategy
881 ).previousWorkerNodeKey
882 ).toEqual(expect.any(Number))
883 // We need to clean up the resources after our test
887 it('Verify LEAST_ELU strategy default policy', async () => {
888 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
889 let pool = new FixedThreadPool(
891 './tests/worker-files/thread/testWorker.mjs',
892 { workerChoiceStrategy }
894 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
895 dynamicWorkerUsage: false,
896 dynamicWorkerReady: true
899 pool = new DynamicThreadPool(
902 './tests/worker-files/thread/testWorker.mjs',
903 { workerChoiceStrategy }
905 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
906 dynamicWorkerUsage: false,
907 dynamicWorkerReady: true
909 // We need to clean up the resources after our test
913 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
914 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
915 let pool = new FixedThreadPool(
917 './tests/worker-files/thread/testWorker.mjs',
918 { workerChoiceStrategy }
921 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
940 pool = new DynamicThreadPool(
943 './tests/worker-files/thread/testWorker.mjs',
944 { workerChoiceStrategy }
947 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
965 // We need to clean up the resources after our test
969 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
970 const pool = new FixedThreadPool(
972 './tests/worker-files/thread/testWorker.mjs',
973 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
975 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
976 const promises = new Set()
977 const maxMultiplier = 2
978 for (let i = 0; i < max * maxMultiplier; i++) {
979 promises.add(pool.execute())
981 await Promise.all(promises)
982 for (const workerNode of pool.workerNodes) {
983 expect(workerNode.usage).toStrictEqual({
985 executed: expect.any(Number),
989 sequentiallyStolen: 0,
994 history: new CircularArray()
997 history: new CircularArray()
999 elu: expect.objectContaining({
1000 idle: expect.objectContaining({
1001 history: expect.any(CircularArray)
1003 active: expect.objectContaining({
1004 history: expect.any(CircularArray)
1008 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1009 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1012 if (workerNode.usage.elu.active.aggregate == null) {
1013 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1015 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1017 if (workerNode.usage.elu.idle.aggregate == null) {
1018 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1020 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1022 if (workerNode.usage.elu.utilization == null) {
1023 expect(workerNode.usage.elu.utilization).toBeUndefined()
1025 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1026 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1030 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1031 pool.workerChoiceStrategyContext.workerChoiceStrategy
1033 ).toEqual(expect.any(Number))
1035 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1036 pool.workerChoiceStrategyContext.workerChoiceStrategy
1037 ).previousWorkerNodeKey
1038 ).toEqual(expect.any(Number))
1039 // We need to clean up the resources after our test
1040 await pool.destroy()
1043 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1044 const pool = new DynamicThreadPool(
1047 './tests/worker-files/thread/testWorker.mjs',
1048 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1050 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1051 const promises = new Set()
1052 const maxMultiplier = 2
1053 for (let i = 0; i < max * maxMultiplier; i++) {
1054 promises.add(pool.execute())
1056 await Promise.all(promises)
1057 for (const workerNode of pool.workerNodes) {
1058 expect(workerNode.usage).toStrictEqual({
1060 executed: expect.any(Number),
1064 sequentiallyStolen: 0,
1069 history: new CircularArray()
1072 history: new CircularArray()
1074 elu: expect.objectContaining({
1075 idle: expect.objectContaining({
1076 history: expect.any(CircularArray)
1078 active: expect.objectContaining({
1079 history: expect.any(CircularArray)
1083 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1084 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1087 if (workerNode.usage.elu.active.aggregate == null) {
1088 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1090 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1092 if (workerNode.usage.elu.idle.aggregate == null) {
1093 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1095 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1097 if (workerNode.usage.elu.utilization == null) {
1098 expect(workerNode.usage.elu.utilization).toBeUndefined()
1100 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1101 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1105 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1106 pool.workerChoiceStrategyContext.workerChoiceStrategy
1108 ).toEqual(expect.any(Number))
1110 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1111 pool.workerChoiceStrategyContext.workerChoiceStrategy
1112 ).previousWorkerNodeKey
1113 ).toEqual(expect.any(Number))
1114 // We need to clean up the resources after our test
1115 await pool.destroy()
1118 it('Verify FAIR_SHARE strategy default policy', async () => {
1119 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1120 let pool = new FixedThreadPool(
1122 './tests/worker-files/thread/testWorker.mjs',
1123 { workerChoiceStrategy }
1125 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1126 dynamicWorkerUsage: false,
1127 dynamicWorkerReady: true
1129 await pool.destroy()
1130 pool = new DynamicThreadPool(
1133 './tests/worker-files/thread/testWorker.mjs',
1134 { workerChoiceStrategy }
1136 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1137 dynamicWorkerUsage: false,
1138 dynamicWorkerReady: true
1140 // We need to clean up the resources after our test
1141 await pool.destroy()
1144 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1145 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1146 let pool = new FixedThreadPool(
1148 './tests/worker-files/thread/testWorker.mjs',
1149 { workerChoiceStrategy }
1152 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1170 await pool.destroy()
1171 pool = new DynamicThreadPool(
1174 './tests/worker-files/thread/testWorker.mjs',
1175 { workerChoiceStrategy }
1178 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1196 // We need to clean up the resources after our test
1197 await pool.destroy()
1200 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1201 const pool = new FixedThreadPool(
1203 './tests/worker-files/thread/testWorker.mjs',
1204 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1206 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1207 const promises = new Set()
1208 const maxMultiplier = 2
1209 for (let i = 0; i < max * maxMultiplier; i++) {
1210 promises.add(pool.execute())
1212 await Promise.all(promises)
1213 for (const workerNode of pool.workerNodes) {
1214 expect(workerNode.usage).toStrictEqual({
1216 executed: expect.any(Number),
1220 sequentiallyStolen: 0,
1224 runTime: expect.objectContaining({
1225 history: expect.any(CircularArray)
1228 history: new CircularArray()
1230 elu: expect.objectContaining({
1231 idle: expect.objectContaining({
1232 history: expect.any(CircularArray)
1234 active: expect.objectContaining({
1235 history: expect.any(CircularArray)
1239 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1240 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1243 if (workerNode.usage.runTime.aggregate == null) {
1244 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1246 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1248 if (workerNode.usage.runTime.average == null) {
1249 expect(workerNode.usage.runTime.average).toBeUndefined()
1251 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1253 if (workerNode.usage.elu.active.aggregate == null) {
1254 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1256 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1258 if (workerNode.usage.elu.idle.aggregate == null) {
1259 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1261 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1263 if (workerNode.usage.elu.utilization == null) {
1264 expect(workerNode.usage.elu.utilization).toBeUndefined()
1266 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1267 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1269 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1272 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1273 pool.workerChoiceStrategyContext.workerChoiceStrategy
1275 ).toEqual(expect.any(Number))
1277 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1278 pool.workerChoiceStrategyContext.workerChoiceStrategy
1279 ).previousWorkerNodeKey
1280 ).toEqual(expect.any(Number))
1281 // We need to clean up the resources after our test
1282 await pool.destroy()
1285 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1286 const pool = new DynamicThreadPool(
1289 './tests/worker-files/thread/testWorker.mjs',
1290 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1292 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1293 const promises = new Set()
1294 const maxMultiplier = 2
1295 for (let i = 0; i < max * maxMultiplier; i++) {
1296 promises.add(pool.execute())
1298 await Promise.all(promises)
1299 for (const workerNode of pool.workerNodes) {
1300 expect(workerNode.usage).toStrictEqual({
1302 executed: expect.any(Number),
1306 sequentiallyStolen: 0,
1310 runTime: expect.objectContaining({
1311 history: expect.any(CircularArray)
1314 history: new CircularArray()
1316 elu: expect.objectContaining({
1317 idle: expect.objectContaining({
1318 history: expect.any(CircularArray)
1320 active: expect.objectContaining({
1321 history: expect.any(CircularArray)
1325 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1326 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1329 if (workerNode.usage.runTime.aggregate == null) {
1330 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1332 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1334 if (workerNode.usage.runTime.average == null) {
1335 expect(workerNode.usage.runTime.average).toBeUndefined()
1337 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1339 if (workerNode.usage.elu.active.aggregate == null) {
1340 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1342 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1344 if (workerNode.usage.elu.idle.aggregate == null) {
1345 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1347 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1349 if (workerNode.usage.elu.utilization == null) {
1350 expect(workerNode.usage.elu.utilization).toBeUndefined()
1352 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1353 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1355 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1358 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1359 pool.workerChoiceStrategyContext.workerChoiceStrategy
1361 ).toEqual(expect.any(Number))
1363 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1364 pool.workerChoiceStrategyContext.workerChoiceStrategy
1365 ).previousWorkerNodeKey
1366 ).toEqual(expect.any(Number))
1367 // We need to clean up the resources after our test
1368 await pool.destroy()
1371 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1372 const pool = new DynamicThreadPool(
1375 './tests/worker-files/thread/testWorker.mjs',
1377 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1378 workerChoiceStrategyOptions: {
1379 runTime: { median: true }
1383 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1384 const promises = new Set()
1385 const maxMultiplier = 2
1386 for (let i = 0; i < max * maxMultiplier; i++) {
1387 promises.add(pool.execute())
1389 await Promise.all(promises)
1390 for (const workerNode of pool.workerNodes) {
1391 expect(workerNode.usage).toStrictEqual({
1393 executed: expect.any(Number),
1397 sequentiallyStolen: 0,
1401 runTime: expect.objectContaining({
1402 history: expect.any(CircularArray)
1405 history: new CircularArray()
1407 elu: expect.objectContaining({
1408 idle: expect.objectContaining({
1409 history: expect.any(CircularArray)
1411 active: expect.objectContaining({
1412 history: expect.any(CircularArray)
1416 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1417 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1420 if (workerNode.usage.runTime.aggregate == null) {
1421 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1423 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1425 if (workerNode.usage.runTime.median == null) {
1426 expect(workerNode.usage.runTime.median).toBeUndefined()
1428 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1430 if (workerNode.usage.elu.active.aggregate == null) {
1431 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1433 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1435 if (workerNode.usage.elu.idle.aggregate == null) {
1436 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1438 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1440 if (workerNode.usage.elu.utilization == null) {
1441 expect(workerNode.usage.elu.utilization).toBeUndefined()
1443 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1444 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1446 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1449 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1450 pool.workerChoiceStrategyContext.workerChoiceStrategy
1452 ).toEqual(expect.any(Number))
1454 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1455 pool.workerChoiceStrategyContext.workerChoiceStrategy
1456 ).previousWorkerNodeKey
1457 ).toEqual(expect.any(Number))
1458 // We need to clean up the resources after our test
1459 await pool.destroy()
1462 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1463 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1464 let pool = new FixedThreadPool(
1466 './tests/worker-files/thread/testWorker.mjs'
1468 for (const workerNode of pool.workerNodes) {
1469 workerNode.strategyData = {
1470 virtualTaskEndTimestamp: performance.now()
1473 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1474 for (const workerNode of pool.workerNodes) {
1475 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1477 await pool.destroy()
1478 pool = new DynamicThreadPool(
1481 './tests/worker-files/thread/testWorker.mjs'
1483 for (const workerNode of pool.workerNodes) {
1484 workerNode.strategyData = {
1485 virtualTaskEndTimestamp: performance.now()
1488 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1489 for (const workerNode of pool.workerNodes) {
1490 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1492 // We need to clean up the resources after our test
1493 await pool.destroy()
1496 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1497 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1498 let pool = new FixedThreadPool(
1500 './tests/worker-files/thread/testWorker.mjs',
1501 { workerChoiceStrategy }
1503 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1504 dynamicWorkerUsage: false,
1505 dynamicWorkerReady: true
1507 await pool.destroy()
1508 pool = new DynamicThreadPool(
1511 './tests/worker-files/thread/testWorker.mjs',
1512 { workerChoiceStrategy }
1514 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1515 dynamicWorkerUsage: false,
1516 dynamicWorkerReady: true
1518 // We need to clean up the resources after our test
1519 await pool.destroy()
1522 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1523 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1524 let pool = new FixedThreadPool(
1526 './tests/worker-files/thread/testWorker.mjs',
1527 { workerChoiceStrategy }
1530 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1548 await pool.destroy()
1549 pool = new DynamicThreadPool(
1552 './tests/worker-files/thread/testWorker.mjs',
1553 { workerChoiceStrategy }
1556 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1574 // We need to clean up the resources after our test
1575 await pool.destroy()
1578 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1579 const pool = new FixedThreadPool(
1581 './tests/worker-files/thread/testWorker.mjs',
1582 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1584 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1585 const promises = new Set()
1586 const maxMultiplier = 2
1587 for (let i = 0; i < max * maxMultiplier; i++) {
1588 promises.add(pool.execute())
1590 await Promise.all(promises)
1591 for (const workerNode of pool.workerNodes) {
1592 expect(workerNode.usage).toStrictEqual({
1594 executed: expect.any(Number),
1598 sequentiallyStolen: 0,
1602 runTime: expect.objectContaining({
1603 history: expect.any(CircularArray)
1606 history: new CircularArray()
1610 history: new CircularArray()
1613 history: new CircularArray()
1617 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1618 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1621 if (workerNode.usage.runTime.aggregate == null) {
1622 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1624 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1626 if (workerNode.usage.runTime.average == null) {
1627 expect(workerNode.usage.runTime.average).toBeUndefined()
1629 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1633 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1634 pool.workerChoiceStrategyContext.workerChoiceStrategy
1638 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1639 pool.workerChoiceStrategyContext.workerChoiceStrategy
1640 ).previousWorkerNodeKey
1643 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1644 pool.workerChoiceStrategyContext.workerChoiceStrategy
1645 ).defaultWorkerWeight
1646 ).toBeGreaterThan(0)
1648 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1649 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 ).workerNodeVirtualTaskRunTime
1651 ).toBeGreaterThanOrEqual(0)
1652 // We need to clean up the resources after our test
1653 await pool.destroy()
1656 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1657 const pool = new DynamicThreadPool(
1660 './tests/worker-files/thread/testWorker.mjs',
1661 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1663 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1664 const promises = new Set()
1665 const maxMultiplier = 2
1666 for (let i = 0; i < max * maxMultiplier; i++) {
1667 promises.add(pool.execute())
1669 await Promise.all(promises)
1670 for (const workerNode of pool.workerNodes) {
1671 expect(workerNode.usage).toStrictEqual({
1673 executed: expect.any(Number),
1677 sequentiallyStolen: 0,
1681 runTime: expect.objectContaining({
1682 history: expect.any(CircularArray)
1685 history: new CircularArray()
1689 history: new CircularArray()
1692 history: new CircularArray()
1696 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1697 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1700 if (workerNode.usage.runTime.aggregate == null) {
1701 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1703 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1705 if (workerNode.usage.runTime.average == null) {
1706 expect(workerNode.usage.runTime.average).toBeUndefined()
1708 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1712 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1713 pool.workerChoiceStrategyContext.workerChoiceStrategy
1717 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1718 pool.workerChoiceStrategyContext.workerChoiceStrategy
1719 ).previousWorkerNodeKey
1722 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1723 pool.workerChoiceStrategyContext.workerChoiceStrategy
1724 ).defaultWorkerWeight
1725 ).toBeGreaterThan(0)
1727 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1728 pool.workerChoiceStrategyContext.workerChoiceStrategy
1729 ).workerNodeVirtualTaskRunTime
1730 ).toBeGreaterThanOrEqual(0)
1731 // We need to clean up the resources after our test
1732 await pool.destroy()
1735 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1736 const pool = new DynamicThreadPool(
1739 './tests/worker-files/thread/testWorker.mjs',
1741 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1742 workerChoiceStrategyOptions: {
1743 runTime: { median: true }
1747 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1748 const promises = new Set()
1749 const maxMultiplier = 2
1750 for (let i = 0; i < max * maxMultiplier; i++) {
1751 promises.add(pool.execute())
1753 await Promise.all(promises)
1754 for (const workerNode of pool.workerNodes) {
1755 expect(workerNode.usage).toStrictEqual({
1757 executed: expect.any(Number),
1761 sequentiallyStolen: 0,
1765 runTime: expect.objectContaining({
1766 history: expect.any(CircularArray)
1769 history: new CircularArray()
1773 history: new CircularArray()
1776 history: new CircularArray()
1780 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1781 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1784 if (workerNode.usage.runTime.aggregate == null) {
1785 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1787 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1789 if (workerNode.usage.runTime.median == null) {
1790 expect(workerNode.usage.runTime.median).toBeUndefined()
1792 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1796 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1797 pool.workerChoiceStrategyContext.workerChoiceStrategy
1801 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1802 pool.workerChoiceStrategyContext.workerChoiceStrategy
1803 ).previousWorkerNodeKey
1806 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1807 pool.workerChoiceStrategyContext.workerChoiceStrategy
1808 ).defaultWorkerWeight
1809 ).toBeGreaterThan(0)
1811 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1812 pool.workerChoiceStrategyContext.workerChoiceStrategy
1813 ).workerNodeVirtualTaskRunTime
1814 ).toBeGreaterThanOrEqual(0)
1815 // We need to clean up the resources after our test
1816 await pool.destroy()
1819 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1820 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1821 let pool = new FixedThreadPool(
1823 './tests/worker-files/thread/testWorker.mjs'
1826 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1827 workerChoiceStrategy
1831 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1832 workerChoiceStrategy
1833 ).previousWorkerNodeKey
1836 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1837 workerChoiceStrategy
1838 ).defaultWorkerWeight
1841 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1842 workerChoiceStrategy
1843 ).workerNodeVirtualTaskRunTime
1845 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1847 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1848 pool.workerChoiceStrategyContext.workerChoiceStrategy
1852 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1853 pool.workerChoiceStrategyContext.workerChoiceStrategy
1854 ).previousWorkerNodeKey
1857 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1858 pool.workerChoiceStrategyContext.workerChoiceStrategy
1859 ).defaultWorkerWeight
1860 ).toBeGreaterThan(0)
1862 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1863 pool.workerChoiceStrategyContext.workerChoiceStrategy
1864 ).workerNodeVirtualTaskRunTime
1866 await pool.destroy()
1867 pool = new DynamicThreadPool(
1870 './tests/worker-files/thread/testWorker.mjs'
1873 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1874 workerChoiceStrategy
1878 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1879 workerChoiceStrategy
1880 ).previousWorkerNodeKey
1883 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1884 workerChoiceStrategy
1885 ).defaultWorkerWeight
1888 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1889 workerChoiceStrategy
1890 ).workerNodeVirtualTaskRunTime
1892 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1894 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1895 pool.workerChoiceStrategyContext.workerChoiceStrategy
1899 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1900 pool.workerChoiceStrategyContext.workerChoiceStrategy
1901 ).previousWorkerNodeKey
1904 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1905 pool.workerChoiceStrategyContext.workerChoiceStrategy
1906 ).defaultWorkerWeight
1907 ).toBeGreaterThan(0)
1909 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1910 pool.workerChoiceStrategyContext.workerChoiceStrategy
1911 ).workerNodeVirtualTaskRunTime
1913 // We need to clean up the resources after our test
1914 await pool.destroy()
1917 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1918 const workerChoiceStrategy =
1919 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1920 let pool = new FixedThreadPool(
1922 './tests/worker-files/thread/testWorker.mjs',
1923 { workerChoiceStrategy }
1925 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1926 dynamicWorkerUsage: false,
1927 dynamicWorkerReady: true
1929 await pool.destroy()
1930 pool = new DynamicThreadPool(
1933 './tests/worker-files/thread/testWorker.mjs',
1934 { workerChoiceStrategy }
1936 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1937 dynamicWorkerUsage: false,
1938 dynamicWorkerReady: true
1940 // We need to clean up the resources after our test
1941 await pool.destroy()
1944 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1945 const workerChoiceStrategy =
1946 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1947 let pool = new FixedThreadPool(
1949 './tests/worker-files/thread/testWorker.mjs',
1950 { workerChoiceStrategy }
1953 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1971 await pool.destroy()
1972 pool = new DynamicThreadPool(
1975 './tests/worker-files/thread/testWorker.mjs',
1976 { workerChoiceStrategy }
1979 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1997 // We need to clean up the resources after our test
1998 await pool.destroy()
2001 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
2002 const pool = new FixedThreadPool(
2004 './tests/worker-files/thread/testWorker.mjs',
2006 workerChoiceStrategy:
2007 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2010 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2011 const promises = new Set()
2012 const maxMultiplier = 2
2013 for (let i = 0; i < max * maxMultiplier; i++) {
2014 promises.add(pool.execute())
2016 await Promise.all(promises)
2017 for (const workerNode of pool.workerNodes) {
2018 expect(workerNode.usage).toStrictEqual({
2020 executed: expect.any(Number),
2024 sequentiallyStolen: 0,
2028 runTime: expect.objectContaining({
2029 history: expect.any(CircularArray)
2032 history: new CircularArray()
2036 history: new CircularArray()
2039 history: new CircularArray()
2043 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2044 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2049 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2050 pool.workerChoiceStrategyContext.workerChoiceStrategy
2051 ).defaultWorkerWeight
2052 ).toBeGreaterThan(0)
2054 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2055 pool.workerChoiceStrategyContext.workerChoiceStrategy
2059 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2060 pool.workerChoiceStrategyContext.workerChoiceStrategy
2064 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2065 pool.workerChoiceStrategyContext.workerChoiceStrategy
2069 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategyContext.workerChoiceStrategy
2071 ).previousWorkerNodeKey
2072 ).toEqual(expect.any(Number))
2074 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2075 pool.workerChoiceStrategyContext.workerChoiceStrategy
2078 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2079 pool.workerChoiceStrategyContext.workerChoiceStrategy
2080 ).defaultWorkerWeight
2082 // We need to clean up the resources after our test
2083 await pool.destroy()
2086 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2087 const pool = new DynamicThreadPool(
2090 './tests/worker-files/thread/testWorker.mjs',
2092 workerChoiceStrategy:
2093 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2096 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2097 const promises = new Set()
2098 const maxMultiplier = 2
2099 for (let i = 0; i < max * maxMultiplier; i++) {
2100 promises.add(pool.execute())
2102 await Promise.all(promises)
2103 for (const workerNode of pool.workerNodes) {
2104 expect(workerNode.usage).toStrictEqual({
2106 executed: expect.any(Number),
2110 sequentiallyStolen: 0,
2114 runTime: expect.objectContaining({
2115 history: expect.any(CircularArray)
2118 history: new CircularArray()
2122 history: new CircularArray()
2125 history: new CircularArray()
2129 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2130 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2135 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2136 pool.workerChoiceStrategyContext.workerChoiceStrategy
2137 ).defaultWorkerWeight
2138 ).toBeGreaterThan(0)
2140 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2141 pool.workerChoiceStrategyContext.workerChoiceStrategy
2145 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2146 pool.workerChoiceStrategyContext.workerChoiceStrategy
2150 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2151 pool.workerChoiceStrategyContext.workerChoiceStrategy
2155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2156 pool.workerChoiceStrategyContext.workerChoiceStrategy
2157 ).previousWorkerNodeKey
2158 ).toEqual(expect.any(Number))
2160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2161 pool.workerChoiceStrategyContext.workerChoiceStrategy
2164 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2165 pool.workerChoiceStrategyContext.workerChoiceStrategy
2166 ).defaultWorkerWeight
2168 // We need to clean up the resources after our test
2169 await pool.destroy()
2172 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2173 const workerChoiceStrategy =
2174 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2175 let pool = new FixedThreadPool(
2177 './tests/worker-files/thread/testWorker.mjs'
2180 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2181 workerChoiceStrategy
2185 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2186 workerChoiceStrategy
2190 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2191 workerChoiceStrategy
2195 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2196 workerChoiceStrategy
2197 ).previousWorkerNodeKey
2200 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2201 workerChoiceStrategy
2202 ).defaultWorkerWeight
2205 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2206 workerChoiceStrategy
2209 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2211 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2212 pool.workerChoiceStrategyContext.workerChoiceStrategy
2216 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2217 pool.workerChoiceStrategyContext.workerChoiceStrategy
2221 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2222 pool.workerChoiceStrategyContext.workerChoiceStrategy
2226 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2227 pool.workerChoiceStrategyContext.workerChoiceStrategy
2228 ).previousWorkerNodeKey
2231 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2232 pool.workerChoiceStrategyContext.workerChoiceStrategy
2233 ).defaultWorkerWeight
2234 ).toBeGreaterThan(0)
2236 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2237 pool.workerChoiceStrategyContext.workerChoiceStrategy
2240 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2241 pool.workerChoiceStrategyContext.workerChoiceStrategy
2242 ).defaultWorkerWeight
2244 await pool.destroy()
2245 pool = new DynamicThreadPool(
2248 './tests/worker-files/thread/testWorker.mjs'
2251 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2252 workerChoiceStrategy
2256 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2257 workerChoiceStrategy
2261 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2262 workerChoiceStrategy
2266 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2267 workerChoiceStrategy
2268 ).previousWorkerNodeKey
2271 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2272 workerChoiceStrategy
2273 ).defaultWorkerWeight
2276 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2277 workerChoiceStrategy
2280 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2282 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2283 pool.workerChoiceStrategyContext.workerChoiceStrategy
2287 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2288 pool.workerChoiceStrategyContext.workerChoiceStrategy
2292 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2293 pool.workerChoiceStrategyContext.workerChoiceStrategy
2297 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2298 pool.workerChoiceStrategyContext.workerChoiceStrategy
2299 ).previousWorkerNodeKey
2302 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2303 pool.workerChoiceStrategyContext.workerChoiceStrategy
2304 ).defaultWorkerWeight
2305 ).toBeGreaterThan(0)
2307 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2308 pool.workerChoiceStrategyContext.workerChoiceStrategy
2311 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2312 pool.workerChoiceStrategyContext.workerChoiceStrategy
2313 ).defaultWorkerWeight
2315 // We need to clean up the resources after our test
2316 await pool.destroy()
2319 it('Verify unknown strategy throw error', () => {
2322 new DynamicThreadPool(
2325 './tests/worker-files/thread/testWorker.mjs',
2326 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2328 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")