X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fselection-strategies%2Fselection-strategies.test.js;h=9b581ff57a9f052d93bca4edfe67e46c266ce8b6;hb=5df69fabd77b3ec4137a6382e2d84791bf0fe85d;hp=352eb7f636e30788ce90c80d4e6d98c1255ded77;hpb=88a7a5ea2abeeee0fdafb3e21a0982f9a7a1e12a;p=poolifier.git diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 352eb7f6..9b581ff5 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -15,6 +15,7 @@ describe('Selection strategies test suite', () => { expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN') expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED') expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY') + expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU') expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE') expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe( 'WEIGHTED_ROUND_ROBIN' @@ -122,14 +123,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -139,14 +149,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -166,20 +185,47 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: maxMultiplier, - running: 0, - runTime: 0, - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).nextWorkerNodeId + ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) @@ -199,20 +245,47 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: maxMultiplier, - running: 0, - runTime: 0, - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).nextWorkerNodeId + ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) @@ -291,14 +364,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -308,14 +390,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -335,18 +426,40 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: maxMultiplier, - running: 0, - runTime: 0, - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) } // We need to clean up the resources after our test @@ -368,18 +481,40 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: maxMultiplier, - running: 0, - runTime: 0, - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) } // We need to clean up the resources after our test @@ -394,14 +529,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: true, + average: false, + median: false + }, + waitTime: { + aggregate: true, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -411,14 +555,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: true, + average: false, + median: false + }, + waitTime: { + aggregate: true, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -438,22 +591,49 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: expect.any(Number), - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) - expect(workerNode.tasksUsage.run).toBeGreaterThan(0) - expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier) - expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual( + 0 + ) } // We need to clean up the resources after our test await pool.destroy() @@ -474,22 +654,224 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: expect.any(Number), - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } + }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThan(0) + } + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: false, + median: false + } + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } + ) + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: expect.any(Number) + } + }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1) + } + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } + ) + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: expect.any(Number) + } }) - expect(workerNode.tasksUsage.run).toBeGreaterThan(0) - expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier) - expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1) } // We need to clean up the resources after our test await pool.destroy() @@ -503,14 +885,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -520,14 +911,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -547,21 +947,43 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: maxMultiplier, - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: expect.any(Number), - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) - expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0) - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -587,21 +1009,43 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: maxMultiplier, - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: expect.any(Number), - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) - expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0) - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -620,7 +1064,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE, workerChoiceStrategyOptions: { - medRunTime: true + runTime: { median: true } } } ) @@ -632,21 +1076,43 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: maxMultiplier, - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: expect.any(Number), - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: 0, + median: expect.any(Number), + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) - expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0) - expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -739,14 +1205,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -756,14 +1231,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -783,23 +1267,47 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: expect.any(Number), - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: expect.any(Number), - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) - expect(workerNode.tasksUsage.run).toBeGreaterThanOrEqual(0) - expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier) - expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0) - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -830,23 +1338,47 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: expect.any(Number), - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: expect.any(Number), - medRunTime: 0, - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) - expect(workerNode.tasksUsage.run).toBeGreaterThan(0) - expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier) - expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0) - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -870,7 +1402,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN, workerChoiceStrategyOptions: { - medRunTime: true + runTime: { median: true } } } ) @@ -882,23 +1414,47 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage).toStrictEqual({ - run: expect.any(Number), - running: 0, - runTime: expect.any(Number), - runTimeHistory: expect.any(CircularArray), - avgRunTime: 0, - medRunTime: expect.any(Number), - waitTime: 0, - waitTimeHistory: expect.any(CircularArray), - avgWaitTime: 0, - medWaitTime: 0, - error: 0 + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: expect.any(Number), + average: 0, + median: expect.any(Number), + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } }) - expect(workerNode.tasksUsage.run).toBeGreaterThan(0) - expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier) - expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0) - expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1001,14 +1557,23 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -1018,19 +1583,293 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() }) + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategy: + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + } + ) + // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } + }) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentRoundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategy: + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + } + ) + // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + utilization: 0 + } + }) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentRoundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js' + ) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentRoundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentWorkerNodeId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toBeDefined() + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentRoundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' + ) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentRoundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentWorkerNodeId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toBeDefined() + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify unknown strategy throw error', () => { expect( () =>