1 import { expect } from 'expect'
8 } from '../../../lib/index.js'
9 import { CircularArray } from '../../../lib/circular-array.js'
10 import { sleep } from '../../test-utils.js'
12 describe('Selection strategies test suite', () => {
16 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
17 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
18 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
19 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
20 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
21 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
22 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
23 'WEIGHTED_ROUND_ROBIN'
25 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
26 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
30 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
31 const pool = new DynamicThreadPool(
34 './tests/worker-files/thread/testWorker.mjs'
36 expect(pool.opts.workerChoiceStrategy).toBe(
37 WorkerChoiceStrategies.ROUND_ROBIN
39 // We need to clean up the resources after our test
43 it('Verify available strategies are taken at pool creation', async () => {
44 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
45 const pool = new FixedThreadPool(
47 './tests/worker-files/thread/testWorker.mjs',
48 { workerChoiceStrategy }
50 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
51 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
58 it('Verify available strategies can be set after pool creation', async () => {
59 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
60 const pool = new DynamicThreadPool(
63 './tests/worker-files/thread/testWorker.mjs'
65 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
66 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
67 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
70 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
72 runTime: { median: false },
73 waitTime: { median: false },
74 elu: { median: false }
76 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
78 runTime: { median: false },
79 waitTime: { median: false },
80 elu: { median: false }
84 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
85 const pool = new DynamicClusterPool(
88 './tests/worker-files/cluster/testWorker.js'
90 pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
91 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
92 expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
97 runTime: { median: false },
98 waitTime: { median: false },
99 elu: { median: false }
101 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
103 runTime: { median: false },
104 waitTime: { median: false },
105 elu: { median: false }
111 it('Verify available strategies default internals at pool creation', async () => {
112 const pool = new FixedThreadPool(
114 './tests/worker-files/thread/testWorker.mjs'
116 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
118 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
123 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
125 ).previousWorkerNodeKey
128 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
131 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
133 ).defaultWorkerWeight
136 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
138 ).workerNodeVirtualTaskRunTime
141 workerChoiceStrategy ===
142 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
145 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
147 ).defaultWorkerWeight
150 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
152 ).workerNodeVirtualTaskRunTime
155 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
160 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
165 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
169 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
171 ).defaultWorkerWeight
178 it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
179 const pool = new DynamicThreadPool(
182 './tests/worker-files/thread/testWorker.mjs'
185 expect(pool.starting).toBe(false)
186 expect(pool.workerNodes.length).toBe(min)
187 const maxMultiplier = 10000
188 const promises = new Set()
189 for (let i = 0; i < max * maxMultiplier; i++) {
190 promises.add(pool.execute())
192 await Promise.all(promises)
193 expect(pool.workerNodes.length).toBe(max)
194 // We need to clean up the resources after our test
198 it('Verify ROUND_ROBIN strategy default policy', async () => {
199 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
200 let pool = new FixedThreadPool(
202 './tests/worker-files/thread/testWorker.mjs',
203 { workerChoiceStrategy }
205 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
206 dynamicWorkerUsage: false,
207 dynamicWorkerReady: true
210 pool = new DynamicThreadPool(
213 './tests/worker-files/thread/testWorker.mjs',
214 { workerChoiceStrategy }
216 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
217 dynamicWorkerUsage: false,
218 dynamicWorkerReady: true
220 // We need to clean up the resources after our test
224 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
225 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
226 let pool = new FixedThreadPool(
228 './tests/worker-files/thread/testWorker.mjs',
229 { workerChoiceStrategy }
232 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
251 pool = new DynamicThreadPool(
254 './tests/worker-files/thread/testWorker.mjs',
255 { workerChoiceStrategy }
258 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
276 // We need to clean up the resources after our test
280 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
281 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
282 const pool = new FixedThreadPool(
284 './tests/worker-files/thread/testWorker.mjs',
285 { workerChoiceStrategy }
287 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
288 const promises = new Set()
289 const maxMultiplier = 2
290 for (let i = 0; i < max * maxMultiplier; i++) {
291 promises.add(pool.execute())
293 await Promise.all(promises)
294 for (const workerNode of pool.workerNodes) {
295 expect(workerNode.usage).toStrictEqual({
297 executed: maxMultiplier,
301 sequentiallyStolen: 0,
306 history: new CircularArray()
309 history: new CircularArray()
313 history: new CircularArray()
316 history: new CircularArray()
322 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
323 pool.workerChoiceStrategyContext.workerChoiceStrategy
327 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
328 pool.workerChoiceStrategyContext.workerChoiceStrategy
329 ).previousWorkerNodeKey
330 ).toBe(pool.workerNodes.length - 1)
331 // We need to clean up the resources after our test
335 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
336 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
337 const pool = new DynamicThreadPool(
340 './tests/worker-files/thread/testWorker.mjs',
341 { workerChoiceStrategy }
343 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
344 const promises = new Set()
345 const maxMultiplier = 2
346 for (let i = 0; i < max * maxMultiplier; i++) {
347 promises.add(pool.execute())
349 await Promise.all(promises)
350 for (const workerNode of pool.workerNodes) {
351 expect(workerNode.usage).toStrictEqual({
353 executed: expect.any(Number),
357 sequentiallyStolen: 0,
362 history: new CircularArray()
365 history: new CircularArray()
369 history: new CircularArray()
372 history: new CircularArray()
376 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
377 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
382 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
383 pool.workerChoiceStrategyContext.workerChoiceStrategy
387 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
388 pool.workerChoiceStrategyContext.workerChoiceStrategy
389 ).previousWorkerNodeKey
390 ).toBe(pool.workerNodes.length - 1)
391 // We need to clean up the resources after our test
395 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
396 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
397 let pool = new FixedClusterPool(
399 './tests/worker-files/cluster/testWorker.js',
400 { workerChoiceStrategy }
402 let results = new Set()
403 for (let i = 0; i < max; i++) {
404 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
406 expect(results.size).toBe(max)
408 pool = new FixedThreadPool(
410 './tests/worker-files/thread/testWorker.mjs',
411 { workerChoiceStrategy }
414 for (let i = 0; i < max; i++) {
415 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
417 expect(results.size).toBe(max)
421 it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
422 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
423 let pool = new FixedThreadPool(
425 './tests/worker-files/thread/testWorker.mjs',
426 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
429 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
430 pool.workerChoiceStrategyContext.workerChoiceStrategy
434 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
435 pool.workerChoiceStrategyContext.workerChoiceStrategy
436 ).previousWorkerNodeKey
438 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
440 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
441 pool.workerChoiceStrategyContext.workerChoiceStrategy
445 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
446 pool.workerChoiceStrategyContext.workerChoiceStrategy
447 ).previousWorkerNodeKey
450 pool = new DynamicThreadPool(
453 './tests/worker-files/thread/testWorker.mjs',
454 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
457 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
458 pool.workerChoiceStrategyContext.workerChoiceStrategy
462 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
463 pool.workerChoiceStrategyContext.workerChoiceStrategy
464 ).previousWorkerNodeKey
466 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
468 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
469 pool.workerChoiceStrategyContext.workerChoiceStrategy
473 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
474 pool.workerChoiceStrategyContext.workerChoiceStrategy
475 ).previousWorkerNodeKey
477 // We need to clean up the resources after our test
481 it('Verify LEAST_USED strategy default policy', async () => {
482 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
483 let pool = new FixedThreadPool(
485 './tests/worker-files/thread/testWorker.mjs',
486 { workerChoiceStrategy }
488 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
489 dynamicWorkerUsage: false,
490 dynamicWorkerReady: true
493 pool = new DynamicThreadPool(
496 './tests/worker-files/thread/testWorker.mjs',
497 { workerChoiceStrategy }
499 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
500 dynamicWorkerUsage: false,
501 dynamicWorkerReady: true
503 // We need to clean up the resources after our test
507 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
508 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
509 let pool = new FixedThreadPool(
511 './tests/worker-files/thread/testWorker.mjs',
512 { workerChoiceStrategy }
515 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
534 pool = new DynamicThreadPool(
537 './tests/worker-files/thread/testWorker.mjs',
538 { workerChoiceStrategy }
541 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
559 // We need to clean up the resources after our test
563 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
564 const pool = new FixedThreadPool(
566 './tests/worker-files/thread/testWorker.mjs',
567 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
569 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
570 const promises = new Set()
571 const maxMultiplier = 2
572 for (let i = 0; i < max * maxMultiplier; i++) {
573 promises.add(pool.execute())
575 await Promise.all(promises)
576 for (const workerNode of pool.workerNodes) {
577 expect(workerNode.usage).toStrictEqual({
579 executed: expect.any(Number),
583 sequentiallyStolen: 0,
588 history: new CircularArray()
591 history: new CircularArray()
595 history: new CircularArray()
598 history: new CircularArray()
602 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
603 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
608 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
609 pool.workerChoiceStrategyContext.workerChoiceStrategy
611 ).toEqual(expect.any(Number))
613 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
614 pool.workerChoiceStrategyContext.workerChoiceStrategy
615 ).previousWorkerNodeKey
616 ).toEqual(expect.any(Number))
617 // We need to clean up the resources after our test
621 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
622 const pool = new DynamicThreadPool(
625 './tests/worker-files/thread/testWorker.mjs',
626 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
628 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
629 const promises = new Set()
630 const maxMultiplier = 2
631 for (let i = 0; i < max * maxMultiplier; i++) {
632 promises.add(pool.execute())
634 await Promise.all(promises)
635 for (const workerNode of pool.workerNodes) {
636 expect(workerNode.usage).toStrictEqual({
638 executed: expect.any(Number),
642 sequentiallyStolen: 0,
647 history: new CircularArray()
650 history: new CircularArray()
654 history: new CircularArray()
657 history: new CircularArray()
661 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
662 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
667 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
668 pool.workerChoiceStrategyContext.workerChoiceStrategy
670 ).toEqual(expect.any(Number))
672 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
673 pool.workerChoiceStrategyContext.workerChoiceStrategy
674 ).previousWorkerNodeKey
675 ).toEqual(expect.any(Number))
676 // We need to clean up the resources after our test
680 it('Verify LEAST_BUSY strategy default policy', async () => {
681 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
682 let pool = new FixedThreadPool(
684 './tests/worker-files/thread/testWorker.mjs',
685 { workerChoiceStrategy }
687 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
688 dynamicWorkerUsage: false,
689 dynamicWorkerReady: true
692 pool = new DynamicThreadPool(
695 './tests/worker-files/thread/testWorker.mjs',
696 { workerChoiceStrategy }
698 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
699 dynamicWorkerUsage: false,
700 dynamicWorkerReady: true
702 // We need to clean up the resources after our test
706 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
707 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
708 let pool = new FixedThreadPool(
710 './tests/worker-files/thread/testWorker.mjs',
711 { workerChoiceStrategy }
714 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
733 pool = new DynamicThreadPool(
736 './tests/worker-files/thread/testWorker.mjs',
737 { workerChoiceStrategy }
740 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
758 // We need to clean up the resources after our test
762 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
763 const pool = new FixedThreadPool(
765 './tests/worker-files/thread/testWorker.mjs',
766 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
768 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
769 const promises = new Set()
770 const maxMultiplier = 2
771 for (let i = 0; i < max * maxMultiplier; i++) {
772 promises.add(pool.execute())
774 await Promise.all(promises)
775 for (const workerNode of pool.workerNodes) {
776 expect(workerNode.usage).toStrictEqual({
778 executed: expect.any(Number),
782 sequentiallyStolen: 0,
786 runTime: expect.objectContaining({
787 history: expect.any(CircularArray)
789 waitTime: expect.objectContaining({
790 history: expect.any(CircularArray)
794 history: new CircularArray()
797 history: new CircularArray()
801 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
802 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
805 if (workerNode.usage.runTime.aggregate == null) {
806 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
808 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
810 if (workerNode.usage.waitTime.aggregate == null) {
811 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
813 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
817 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
818 pool.workerChoiceStrategyContext.workerChoiceStrategy
820 ).toEqual(expect.any(Number))
822 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
823 pool.workerChoiceStrategyContext.workerChoiceStrategy
824 ).previousWorkerNodeKey
825 ).toEqual(expect.any(Number))
826 // We need to clean up the resources after our test
830 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
831 const pool = new DynamicThreadPool(
834 './tests/worker-files/thread/testWorker.mjs',
835 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
837 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
838 const promises = new Set()
839 const maxMultiplier = 2
840 for (let i = 0; i < max * maxMultiplier; i++) {
841 promises.add(pool.execute())
843 await Promise.all(promises)
844 for (const workerNode of pool.workerNodes) {
845 expect(workerNode.usage).toStrictEqual({
847 executed: expect.any(Number),
851 sequentiallyStolen: 0,
855 runTime: expect.objectContaining({
856 history: expect.any(CircularArray)
858 waitTime: expect.objectContaining({
859 history: expect.any(CircularArray)
863 history: new CircularArray()
866 history: new CircularArray()
870 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
871 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
874 if (workerNode.usage.runTime.aggregate == null) {
875 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
877 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
879 if (workerNode.usage.waitTime.aggregate == null) {
880 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
882 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
886 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
887 pool.workerChoiceStrategyContext.workerChoiceStrategy
889 ).toEqual(expect.any(Number))
891 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
892 pool.workerChoiceStrategyContext.workerChoiceStrategy
893 ).previousWorkerNodeKey
894 ).toEqual(expect.any(Number))
895 // We need to clean up the resources after our test
899 it('Verify LEAST_ELU strategy default policy', async () => {
900 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
901 let pool = new FixedThreadPool(
903 './tests/worker-files/thread/testWorker.mjs',
904 { workerChoiceStrategy }
906 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
907 dynamicWorkerUsage: false,
908 dynamicWorkerReady: true
911 pool = new DynamicThreadPool(
914 './tests/worker-files/thread/testWorker.mjs',
915 { workerChoiceStrategy }
917 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
918 dynamicWorkerUsage: false,
919 dynamicWorkerReady: true
921 // We need to clean up the resources after our test
925 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
926 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
927 let pool = new FixedThreadPool(
929 './tests/worker-files/thread/testWorker.mjs',
930 { workerChoiceStrategy }
933 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
952 pool = new DynamicThreadPool(
955 './tests/worker-files/thread/testWorker.mjs',
956 { workerChoiceStrategy }
959 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
977 // We need to clean up the resources after our test
981 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
982 const pool = new FixedThreadPool(
984 './tests/worker-files/thread/testWorker.mjs',
985 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
987 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
988 const promises = new Set()
989 const maxMultiplier = 2
990 for (let i = 0; i < max * maxMultiplier; i++) {
991 promises.add(pool.execute())
993 await Promise.all(promises)
994 for (const workerNode of pool.workerNodes) {
995 expect(workerNode.usage).toStrictEqual({
997 executed: expect.any(Number),
1001 sequentiallyStolen: 0,
1006 history: new CircularArray()
1009 history: new CircularArray()
1011 elu: expect.objectContaining({
1012 idle: expect.objectContaining({
1013 history: expect.any(CircularArray)
1015 active: expect.objectContaining({
1016 history: expect.any(CircularArray)
1020 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1021 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1024 if (workerNode.usage.elu.active.aggregate == null) {
1025 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1027 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1029 if (workerNode.usage.elu.idle.aggregate == null) {
1030 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1032 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1034 if (workerNode.usage.elu.utilization == null) {
1035 expect(workerNode.usage.elu.utilization).toBeUndefined()
1037 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1038 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1042 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1043 pool.workerChoiceStrategyContext.workerChoiceStrategy
1045 ).toEqual(expect.any(Number))
1047 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1048 pool.workerChoiceStrategyContext.workerChoiceStrategy
1049 ).previousWorkerNodeKey
1050 ).toEqual(expect.any(Number))
1051 // We need to clean up the resources after our test
1052 await pool.destroy()
1055 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1056 const pool = new DynamicThreadPool(
1059 './tests/worker-files/thread/testWorker.mjs',
1060 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1062 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1063 const promises = new Set()
1064 const maxMultiplier = 2
1065 for (let i = 0; i < max * maxMultiplier; i++) {
1066 promises.add(pool.execute())
1068 await Promise.all(promises)
1069 for (const workerNode of pool.workerNodes) {
1070 expect(workerNode.usage).toStrictEqual({
1072 executed: expect.any(Number),
1076 sequentiallyStolen: 0,
1081 history: new CircularArray()
1084 history: new CircularArray()
1086 elu: expect.objectContaining({
1087 idle: expect.objectContaining({
1088 history: expect.any(CircularArray)
1090 active: expect.objectContaining({
1091 history: expect.any(CircularArray)
1095 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1096 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1099 if (workerNode.usage.elu.active.aggregate == null) {
1100 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1102 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1104 if (workerNode.usage.elu.idle.aggregate == null) {
1105 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1107 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1109 if (workerNode.usage.elu.utilization == null) {
1110 expect(workerNode.usage.elu.utilization).toBeUndefined()
1112 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1113 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1117 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1118 pool.workerChoiceStrategyContext.workerChoiceStrategy
1120 ).toEqual(expect.any(Number))
1122 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1123 pool.workerChoiceStrategyContext.workerChoiceStrategy
1124 ).previousWorkerNodeKey
1125 ).toEqual(expect.any(Number))
1126 // We need to clean up the resources after our test
1127 await pool.destroy()
1130 it('Verify FAIR_SHARE strategy default policy', async () => {
1131 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1132 let pool = new FixedThreadPool(
1134 './tests/worker-files/thread/testWorker.mjs',
1135 { workerChoiceStrategy }
1137 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1138 dynamicWorkerUsage: false,
1139 dynamicWorkerReady: true
1141 await pool.destroy()
1142 pool = new DynamicThreadPool(
1145 './tests/worker-files/thread/testWorker.mjs',
1146 { workerChoiceStrategy }
1148 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1149 dynamicWorkerUsage: false,
1150 dynamicWorkerReady: true
1152 // We need to clean up the resources after our test
1153 await pool.destroy()
1156 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1157 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1158 let pool = new FixedThreadPool(
1160 './tests/worker-files/thread/testWorker.mjs',
1161 { workerChoiceStrategy }
1164 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1182 await pool.destroy()
1183 pool = new DynamicThreadPool(
1186 './tests/worker-files/thread/testWorker.mjs',
1187 { workerChoiceStrategy }
1190 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1208 // We need to clean up the resources after our test
1209 await pool.destroy()
1212 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1213 const pool = new FixedThreadPool(
1215 './tests/worker-files/thread/testWorker.mjs',
1216 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1218 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1219 const promises = new Set()
1220 const maxMultiplier = 2
1221 for (let i = 0; i < max * maxMultiplier; i++) {
1222 promises.add(pool.execute())
1224 await Promise.all(promises)
1225 for (const workerNode of pool.workerNodes) {
1226 expect(workerNode.usage).toStrictEqual({
1228 executed: expect.any(Number),
1232 sequentiallyStolen: 0,
1236 runTime: expect.objectContaining({
1237 history: expect.any(CircularArray)
1240 history: new CircularArray()
1242 elu: expect.objectContaining({
1243 idle: expect.objectContaining({
1244 history: expect.any(CircularArray)
1246 active: expect.objectContaining({
1247 history: expect.any(CircularArray)
1251 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1252 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1255 if (workerNode.usage.runTime.aggregate == null) {
1256 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1258 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1260 if (workerNode.usage.runTime.average == null) {
1261 expect(workerNode.usage.runTime.average).toBeUndefined()
1263 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1265 if (workerNode.usage.elu.active.aggregate == null) {
1266 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1268 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1270 if (workerNode.usage.elu.idle.aggregate == null) {
1271 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1273 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1275 if (workerNode.usage.elu.utilization == null) {
1276 expect(workerNode.usage.elu.utilization).toBeUndefined()
1278 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1279 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1281 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1284 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1285 pool.workerChoiceStrategyContext.workerChoiceStrategy
1287 ).toEqual(expect.any(Number))
1289 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1290 pool.workerChoiceStrategyContext.workerChoiceStrategy
1291 ).previousWorkerNodeKey
1292 ).toEqual(expect.any(Number))
1293 // We need to clean up the resources after our test
1294 await pool.destroy()
1297 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1298 const pool = new DynamicThreadPool(
1301 './tests/worker-files/thread/testWorker.mjs',
1302 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1304 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1305 const promises = new Set()
1306 const maxMultiplier = 2
1307 for (let i = 0; i < max * maxMultiplier; i++) {
1308 promises.add(pool.execute())
1310 await Promise.all(promises)
1311 for (const workerNode of pool.workerNodes) {
1312 expect(workerNode.usage).toStrictEqual({
1314 executed: expect.any(Number),
1318 sequentiallyStolen: 0,
1322 runTime: expect.objectContaining({
1323 history: expect.any(CircularArray)
1326 history: new CircularArray()
1328 elu: expect.objectContaining({
1329 idle: expect.objectContaining({
1330 history: expect.any(CircularArray)
1332 active: expect.objectContaining({
1333 history: expect.any(CircularArray)
1337 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1338 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1341 if (workerNode.usage.runTime.aggregate == null) {
1342 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1344 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1346 if (workerNode.usage.runTime.average == null) {
1347 expect(workerNode.usage.runTime.average).toBeUndefined()
1349 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1351 if (workerNode.usage.elu.active.aggregate == null) {
1352 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1354 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1356 if (workerNode.usage.elu.idle.aggregate == null) {
1357 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1359 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1361 if (workerNode.usage.elu.utilization == null) {
1362 expect(workerNode.usage.elu.utilization).toBeUndefined()
1364 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1365 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1367 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1370 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1371 pool.workerChoiceStrategyContext.workerChoiceStrategy
1373 ).toEqual(expect.any(Number))
1375 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1376 pool.workerChoiceStrategyContext.workerChoiceStrategy
1377 ).previousWorkerNodeKey
1378 ).toEqual(expect.any(Number))
1379 // We need to clean up the resources after our test
1380 await pool.destroy()
1383 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1384 const pool = new DynamicThreadPool(
1387 './tests/worker-files/thread/testWorker.mjs',
1389 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1390 workerChoiceStrategyOptions: {
1391 runTime: { median: true }
1395 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1396 const promises = new Set()
1397 const maxMultiplier = 2
1398 for (let i = 0; i < max * maxMultiplier; i++) {
1399 promises.add(pool.execute())
1401 await Promise.all(promises)
1402 for (const workerNode of pool.workerNodes) {
1403 expect(workerNode.usage).toStrictEqual({
1405 executed: expect.any(Number),
1409 sequentiallyStolen: 0,
1413 runTime: expect.objectContaining({
1414 history: expect.any(CircularArray)
1417 history: new CircularArray()
1419 elu: expect.objectContaining({
1420 idle: expect.objectContaining({
1421 history: expect.any(CircularArray)
1423 active: expect.objectContaining({
1424 history: expect.any(CircularArray)
1428 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1429 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1432 if (workerNode.usage.runTime.aggregate == null) {
1433 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1435 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1437 if (workerNode.usage.runTime.median == null) {
1438 expect(workerNode.usage.runTime.median).toBeUndefined()
1440 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1442 if (workerNode.usage.elu.active.aggregate == null) {
1443 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1445 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1447 if (workerNode.usage.elu.idle.aggregate == null) {
1448 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1450 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1452 if (workerNode.usage.elu.utilization == null) {
1453 expect(workerNode.usage.elu.utilization).toBeUndefined()
1455 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1456 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1458 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1461 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1462 pool.workerChoiceStrategyContext.workerChoiceStrategy
1464 ).toEqual(expect.any(Number))
1466 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1467 pool.workerChoiceStrategyContext.workerChoiceStrategy
1468 ).previousWorkerNodeKey
1469 ).toEqual(expect.any(Number))
1470 // We need to clean up the resources after our test
1471 await pool.destroy()
1474 it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
1475 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1476 let pool = new FixedThreadPool(
1478 './tests/worker-files/thread/testWorker.mjs'
1480 for (const workerNode of pool.workerNodes) {
1481 workerNode.strategyData = {
1482 virtualTaskEndTimestamp: performance.now()
1485 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1486 for (const workerNode of pool.workerNodes) {
1487 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1489 await pool.destroy()
1490 pool = new DynamicThreadPool(
1493 './tests/worker-files/thread/testWorker.mjs'
1495 for (const workerNode of pool.workerNodes) {
1496 workerNode.strategyData = {
1497 virtualTaskEndTimestamp: performance.now()
1500 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1501 for (const workerNode of pool.workerNodes) {
1502 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
1504 // We need to clean up the resources after our test
1505 await pool.destroy()
1508 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1509 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1510 let pool = new FixedThreadPool(
1512 './tests/worker-files/thread/testWorker.mjs',
1513 { workerChoiceStrategy }
1515 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1516 dynamicWorkerUsage: false,
1517 dynamicWorkerReady: true
1519 await pool.destroy()
1520 pool = new DynamicThreadPool(
1523 './tests/worker-files/thread/testWorker.mjs',
1524 { workerChoiceStrategy }
1526 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1527 dynamicWorkerUsage: false,
1528 dynamicWorkerReady: true
1530 // We need to clean up the resources after our test
1531 await pool.destroy()
1534 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1535 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1536 let pool = new FixedThreadPool(
1538 './tests/worker-files/thread/testWorker.mjs',
1539 { workerChoiceStrategy }
1542 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1560 await pool.destroy()
1561 pool = new DynamicThreadPool(
1564 './tests/worker-files/thread/testWorker.mjs',
1565 { workerChoiceStrategy }
1568 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1586 // We need to clean up the resources after our test
1587 await pool.destroy()
1590 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1591 const pool = new FixedThreadPool(
1593 './tests/worker-files/thread/testWorker.mjs',
1594 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1596 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1597 const promises = new Set()
1598 const maxMultiplier = 2
1599 for (let i = 0; i < max * maxMultiplier; i++) {
1600 promises.add(pool.execute())
1602 await Promise.all(promises)
1603 for (const workerNode of pool.workerNodes) {
1604 expect(workerNode.usage).toStrictEqual({
1606 executed: expect.any(Number),
1610 sequentiallyStolen: 0,
1614 runTime: expect.objectContaining({
1615 history: expect.any(CircularArray)
1618 history: new CircularArray()
1622 history: new CircularArray()
1625 history: new CircularArray()
1629 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1630 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1633 if (workerNode.usage.runTime.aggregate == null) {
1634 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1636 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1638 if (workerNode.usage.runTime.average == null) {
1639 expect(workerNode.usage.runTime.average).toBeUndefined()
1641 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1645 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1646 pool.workerChoiceStrategyContext.workerChoiceStrategy
1650 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1651 pool.workerChoiceStrategyContext.workerChoiceStrategy
1652 ).previousWorkerNodeKey
1655 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1656 pool.workerChoiceStrategyContext.workerChoiceStrategy
1657 ).defaultWorkerWeight
1658 ).toBeGreaterThan(0)
1660 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1661 pool.workerChoiceStrategyContext.workerChoiceStrategy
1662 ).workerNodeVirtualTaskRunTime
1663 ).toBeGreaterThanOrEqual(0)
1664 // We need to clean up the resources after our test
1665 await pool.destroy()
1668 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1669 const pool = new DynamicThreadPool(
1672 './tests/worker-files/thread/testWorker.mjs',
1673 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1675 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1676 const promises = new Set()
1677 const maxMultiplier = 2
1678 for (let i = 0; i < max * maxMultiplier; i++) {
1679 promises.add(pool.execute())
1681 await Promise.all(promises)
1682 for (const workerNode of pool.workerNodes) {
1683 expect(workerNode.usage).toStrictEqual({
1685 executed: expect.any(Number),
1689 sequentiallyStolen: 0,
1693 runTime: expect.objectContaining({
1694 history: expect.any(CircularArray)
1697 history: new CircularArray()
1701 history: new CircularArray()
1704 history: new CircularArray()
1708 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1709 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1712 if (workerNode.usage.runTime.aggregate == null) {
1713 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1715 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1717 if (workerNode.usage.runTime.average == null) {
1718 expect(workerNode.usage.runTime.average).toBeUndefined()
1720 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1724 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1725 pool.workerChoiceStrategyContext.workerChoiceStrategy
1729 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1730 pool.workerChoiceStrategyContext.workerChoiceStrategy
1731 ).previousWorkerNodeKey
1734 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1735 pool.workerChoiceStrategyContext.workerChoiceStrategy
1736 ).defaultWorkerWeight
1737 ).toBeGreaterThan(0)
1739 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1740 pool.workerChoiceStrategyContext.workerChoiceStrategy
1741 ).workerNodeVirtualTaskRunTime
1742 ).toBeGreaterThanOrEqual(0)
1743 // We need to clean up the resources after our test
1744 await pool.destroy()
1747 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1748 const pool = new DynamicThreadPool(
1751 './tests/worker-files/thread/testWorker.mjs',
1753 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1754 workerChoiceStrategyOptions: {
1755 runTime: { median: true }
1759 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1760 const promises = new Set()
1761 const maxMultiplier = 2
1762 for (let i = 0; i < max * maxMultiplier; i++) {
1763 promises.add(pool.execute())
1765 await Promise.all(promises)
1766 for (const workerNode of pool.workerNodes) {
1767 expect(workerNode.usage).toStrictEqual({
1769 executed: expect.any(Number),
1773 sequentiallyStolen: 0,
1777 runTime: expect.objectContaining({
1778 history: expect.any(CircularArray)
1781 history: new CircularArray()
1785 history: new CircularArray()
1788 history: new CircularArray()
1792 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1793 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1796 if (workerNode.usage.runTime.aggregate == null) {
1797 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1799 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1801 if (workerNode.usage.runTime.median == null) {
1802 expect(workerNode.usage.runTime.median).toBeUndefined()
1804 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1808 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1809 pool.workerChoiceStrategyContext.workerChoiceStrategy
1813 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1814 pool.workerChoiceStrategyContext.workerChoiceStrategy
1815 ).previousWorkerNodeKey
1818 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1819 pool.workerChoiceStrategyContext.workerChoiceStrategy
1820 ).defaultWorkerWeight
1821 ).toBeGreaterThan(0)
1823 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1824 pool.workerChoiceStrategyContext.workerChoiceStrategy
1825 ).workerNodeVirtualTaskRunTime
1826 ).toBeGreaterThanOrEqual(0)
1827 // We need to clean up the resources after our test
1828 await pool.destroy()
1831 it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
1832 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1833 let pool = new FixedThreadPool(
1835 './tests/worker-files/thread/testWorker.mjs'
1838 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1839 workerChoiceStrategy
1843 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1844 workerChoiceStrategy
1845 ).previousWorkerNodeKey
1848 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1849 workerChoiceStrategy
1850 ).defaultWorkerWeight
1853 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1854 workerChoiceStrategy
1855 ).workerNodeVirtualTaskRunTime
1857 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1859 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1860 pool.workerChoiceStrategyContext.workerChoiceStrategy
1864 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1865 pool.workerChoiceStrategyContext.workerChoiceStrategy
1866 ).previousWorkerNodeKey
1869 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1870 pool.workerChoiceStrategyContext.workerChoiceStrategy
1871 ).defaultWorkerWeight
1872 ).toBeGreaterThan(0)
1874 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1875 pool.workerChoiceStrategyContext.workerChoiceStrategy
1876 ).workerNodeVirtualTaskRunTime
1878 await pool.destroy()
1879 pool = new DynamicThreadPool(
1882 './tests/worker-files/thread/testWorker.mjs'
1885 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1886 workerChoiceStrategy
1890 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1891 workerChoiceStrategy
1892 ).previousWorkerNodeKey
1895 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1896 workerChoiceStrategy
1897 ).defaultWorkerWeight
1900 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1901 workerChoiceStrategy
1902 ).workerNodeVirtualTaskRunTime
1904 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1906 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1907 pool.workerChoiceStrategyContext.workerChoiceStrategy
1911 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1912 pool.workerChoiceStrategyContext.workerChoiceStrategy
1913 ).previousWorkerNodeKey
1916 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1917 pool.workerChoiceStrategyContext.workerChoiceStrategy
1918 ).defaultWorkerWeight
1919 ).toBeGreaterThan(0)
1921 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
1922 pool.workerChoiceStrategyContext.workerChoiceStrategy
1923 ).workerNodeVirtualTaskRunTime
1925 // We need to clean up the resources after our test
1926 await pool.destroy()
1929 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1930 const workerChoiceStrategy =
1931 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1932 let pool = new FixedThreadPool(
1934 './tests/worker-files/thread/testWorker.mjs',
1935 { workerChoiceStrategy }
1937 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1938 dynamicWorkerUsage: false,
1939 dynamicWorkerReady: true
1941 await pool.destroy()
1942 pool = new DynamicThreadPool(
1945 './tests/worker-files/thread/testWorker.mjs',
1946 { workerChoiceStrategy }
1948 expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
1949 dynamicWorkerUsage: false,
1950 dynamicWorkerReady: true
1952 // We need to clean up the resources after our test
1953 await pool.destroy()
1956 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1957 const workerChoiceStrategy =
1958 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1959 let pool = new FixedThreadPool(
1961 './tests/worker-files/thread/testWorker.mjs',
1962 { workerChoiceStrategy }
1965 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1983 await pool.destroy()
1984 pool = new DynamicThreadPool(
1987 './tests/worker-files/thread/testWorker.mjs',
1988 { workerChoiceStrategy }
1991 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
2009 // We need to clean up the resources after our test
2010 await pool.destroy()
2013 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
2014 const pool = new FixedThreadPool(
2016 './tests/worker-files/thread/testWorker.mjs',
2018 workerChoiceStrategy:
2019 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2022 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2023 const promises = new Set()
2024 const maxMultiplier = 2
2025 for (let i = 0; i < max * maxMultiplier; i++) {
2026 promises.add(pool.execute())
2028 await Promise.all(promises)
2029 for (const workerNode of pool.workerNodes) {
2030 expect(workerNode.usage).toStrictEqual({
2032 executed: expect.any(Number),
2036 sequentiallyStolen: 0,
2040 runTime: expect.objectContaining({
2041 history: expect.any(CircularArray)
2044 history: new CircularArray()
2048 history: new CircularArray()
2051 history: new CircularArray()
2055 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2056 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2061 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2062 pool.workerChoiceStrategyContext.workerChoiceStrategy
2063 ).defaultWorkerWeight
2064 ).toBeGreaterThan(0)
2066 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2067 pool.workerChoiceStrategyContext.workerChoiceStrategy
2071 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2072 pool.workerChoiceStrategyContext.workerChoiceStrategy
2076 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2077 pool.workerChoiceStrategyContext.workerChoiceStrategy
2081 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2082 pool.workerChoiceStrategyContext.workerChoiceStrategy
2083 ).previousWorkerNodeKey
2084 ).toEqual(expect.any(Number))
2086 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2087 pool.workerChoiceStrategyContext.workerChoiceStrategy
2090 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2091 pool.workerChoiceStrategyContext.workerChoiceStrategy
2092 ).defaultWorkerWeight
2094 // We need to clean up the resources after our test
2095 await pool.destroy()
2098 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2099 const pool = new DynamicThreadPool(
2102 './tests/worker-files/thread/testWorker.mjs',
2104 workerChoiceStrategy:
2105 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2108 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2109 const promises = new Set()
2110 const maxMultiplier = 2
2111 for (let i = 0; i < max * maxMultiplier; i++) {
2112 promises.add(pool.execute())
2114 await Promise.all(promises)
2115 for (const workerNode of pool.workerNodes) {
2116 expect(workerNode.usage).toStrictEqual({
2118 executed: expect.any(Number),
2122 sequentiallyStolen: 0,
2126 runTime: expect.objectContaining({
2127 history: expect.any(CircularArray)
2130 history: new CircularArray()
2134 history: new CircularArray()
2137 history: new CircularArray()
2141 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2142 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2147 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2148 pool.workerChoiceStrategyContext.workerChoiceStrategy
2149 ).defaultWorkerWeight
2150 ).toBeGreaterThan(0)
2152 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2153 pool.workerChoiceStrategyContext.workerChoiceStrategy
2157 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2158 pool.workerChoiceStrategyContext.workerChoiceStrategy
2162 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2163 pool.workerChoiceStrategyContext.workerChoiceStrategy
2167 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2168 pool.workerChoiceStrategyContext.workerChoiceStrategy
2169 ).previousWorkerNodeKey
2170 ).toEqual(expect.any(Number))
2172 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2173 pool.workerChoiceStrategyContext.workerChoiceStrategy
2176 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2177 pool.workerChoiceStrategyContext.workerChoiceStrategy
2178 ).defaultWorkerWeight
2180 // We need to clean up the resources after our test
2181 await pool.destroy()
2184 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
2185 const workerChoiceStrategy =
2186 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2187 let pool = new FixedThreadPool(
2189 './tests/worker-files/thread/testWorker.mjs'
2192 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2193 workerChoiceStrategy
2197 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2198 workerChoiceStrategy
2202 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2203 workerChoiceStrategy
2207 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2208 workerChoiceStrategy
2209 ).previousWorkerNodeKey
2212 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2213 workerChoiceStrategy
2214 ).defaultWorkerWeight
2217 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2218 workerChoiceStrategy
2221 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2223 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2224 pool.workerChoiceStrategyContext.workerChoiceStrategy
2228 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2229 pool.workerChoiceStrategyContext.workerChoiceStrategy
2233 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2234 pool.workerChoiceStrategyContext.workerChoiceStrategy
2238 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2239 pool.workerChoiceStrategyContext.workerChoiceStrategy
2240 ).previousWorkerNodeKey
2243 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2244 pool.workerChoiceStrategyContext.workerChoiceStrategy
2245 ).defaultWorkerWeight
2246 ).toBeGreaterThan(0)
2248 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2249 pool.workerChoiceStrategyContext.workerChoiceStrategy
2252 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2253 pool.workerChoiceStrategyContext.workerChoiceStrategy
2254 ).defaultWorkerWeight
2256 await pool.destroy()
2257 pool = new DynamicThreadPool(
2260 './tests/worker-files/thread/testWorker.mjs'
2263 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2264 workerChoiceStrategy
2268 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2269 workerChoiceStrategy
2273 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2274 workerChoiceStrategy
2278 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2279 workerChoiceStrategy
2280 ).previousWorkerNodeKey
2283 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2284 workerChoiceStrategy
2285 ).defaultWorkerWeight
2288 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2289 workerChoiceStrategy
2292 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2294 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2295 pool.workerChoiceStrategyContext.workerChoiceStrategy
2299 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2300 pool.workerChoiceStrategyContext.workerChoiceStrategy
2304 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2305 pool.workerChoiceStrategyContext.workerChoiceStrategy
2309 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2310 pool.workerChoiceStrategyContext.workerChoiceStrategy
2311 ).previousWorkerNodeKey
2314 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2315 pool.workerChoiceStrategyContext.workerChoiceStrategy
2316 ).defaultWorkerWeight
2317 ).toBeGreaterThan(0)
2319 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2320 pool.workerChoiceStrategyContext.workerChoiceStrategy
2323 pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
2324 pool.workerChoiceStrategyContext.workerChoiceStrategy
2325 ).defaultWorkerWeight
2327 // We need to clean up the resources after our test
2328 await pool.destroy()
2331 it('Verify unknown strategy throw error', () => {
2334 new DynamicThreadPool(
2337 './tests/worker-files/thread/testWorker.mjs',
2338 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2340 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")