X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fselection-strategies%2Fselection-strategies.test.js;h=51478a0620767d03730d8c441baf9e5ab4c18ea4;hb=58f3aa65e471f66ad930fcf876a97fcdea7b0f94;hp=a1eeaeaf8b4e977ea9e8c962a2d74b25c42da711;hpb=053026470771eacfae5923c499a64a59feb06366;p=poolifier.git diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index a1eeaeaf..51478a06 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -1,9 +1,10 @@ const { expect } = require('expect') const { - WorkerChoiceStrategies, + DynamicClusterPool, DynamicThreadPool, + FixedClusterPool, FixedThreadPool, - FixedClusterPool + WorkerChoiceStrategies } = require('../../../lib') const { CircularArray } = require('../../../lib/circular-array') @@ -65,6 +66,43 @@ describe('Selection strategies test suite', () => { expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( workerChoiceStrategy ) + expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + await pool.destroy() + } + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + const pool = new DynamicClusterPool( + min, + max, + './tests/worker-files/cluster/testWorker.js' + ) + pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 }) + expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) + expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + retries: 3, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + retries: 3, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) await pool.destroy() } }) @@ -75,47 +113,100 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { - if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(0) + if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).nextWorkerNodeId - ).toBe(0) - } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) { + ).workersVirtualTaskEndTimestamp + ).toStrictEqual([]) + } else if ( + workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + ) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskEndTimestamp - ).toBeInstanceOf(Array) + ).defaultWorkerWeight + ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskEndTimestamp.length + ).workerVirtualTaskRunTime ).toBe(0) } else if ( - workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy === + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN ) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerVirtualTaskRunTime ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).defaultWorkerWeight - ).toBeGreaterThan(0) + ).roundId + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workerVirtualTaskRunTime + ).workerNodeId ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ]) } } await pool.destroy() }) - it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify ROUND_ROBIN strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, @@ -125,13 +216,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -143,23 +242,32 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() }) it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN const pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + { workerChoiceStrategy } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` const promises = new Set() @@ -169,43 +277,52 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: maxMultiplier, executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN - ).nextWorkerNodeId + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(pool.workerNodes.length - 1) // We need to clean up the resources after our test await pool.destroy() }) it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN const pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + { workerChoiceStrategy } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` const promises = new Set() @@ -215,33 +332,45 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN - ).nextWorkerNodeId + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(pool.workerNodes.length - 1) // We need to clean up the resources after our test await pool.destroy() }) @@ -281,14 +410,24 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeId + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBeDefined() pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeId + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) await pool.destroy() pool = new DynamicThreadPool( @@ -299,20 +438,56 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeId + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBeDefined() pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeId + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LEAST_USED strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_USED strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_USED strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED let pool = new FixedThreadPool( max, @@ -322,13 +497,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -340,13 +523,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -366,28 +557,45 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) // We need to clean up the resources after our test await pool.destroy() }) @@ -407,34 +615,76 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_BUSY strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LEAST_BUSY strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY let pool = new FixedThreadPool( max, @@ -444,13 +694,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: false, + median: false + }, + waitTime: { + aggregate: true, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -462,13 +720,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: false, + median: false + }, + waitTime: { + aggregate: true, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -488,35 +754,55 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: 0, - median: 0, + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, - waitTime: { - aggregation: 0, - average: 0, - median: 0, + }), + waitTime: expect.objectContaining({ history: expect.any(CircularArray) - }, - elu: undefined + }), + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual( - 0 - ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.waitTime.aggregate == null) { + expect(workerNode.usage.waitTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) // We need to clean up the resources after our test await pool.destroy() }) @@ -536,54 +822,298 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: 0, - median: 0, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: expect.objectContaining({ history: expect.any(CircularArray) + }), + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.waitTime.aggregate == null) { + expect(workerNode.usage.waitTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0) + } + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: false, + median: false + } + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } + ) + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() }, waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() + }, + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } + ) + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 }, - elu: undefined + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => { - const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + it('Verify FAIR_SHARE strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() - ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: true + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true }) await pool.destroy() pool = new DynamicThreadPool( @@ -592,22 +1122,15 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() - ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: true + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { + it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( max, @@ -617,13 +1140,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: true, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -635,13 +1166,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: true, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -661,30 +1200,71 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: expect.any(Number), - median: 0, + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) }) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -709,30 +1289,71 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: expect.any(Number), - median: 0, + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) }) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -750,7 +1371,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE, workerChoiceStrategyOptions: { - medRunTime: true + runTime: { median: true } } } ) @@ -762,30 +1383,71 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: 0, - median: expect.any(Number), + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) }) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) - expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.median == null) { + expect(workerNode.usage.runTime.median).toBeUndefined() + } else { + expect(workerNode.usage.runTime.median).toBeGreaterThan(0) + } + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -822,12 +1484,12 @@ describe('Selection strategies test suite', () => { pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp.length ).toBe(0) await pool.destroy() @@ -857,19 +1519,45 @@ describe('Selection strategies test suite', () => { pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp.length ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, @@ -879,13 +1567,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -897,13 +1593,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: true, - avgRunTime: true, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -923,36 +1627,55 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: expect.any(Number), - median: 0, + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual( - 0 - ) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -982,34 +1705,55 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: expect.any(Number), - median: 0, + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -1032,7 +1776,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN, workerChoiceStrategyOptions: { - medRunTime: true + runTime: { median: true } } } ) @@ -1044,34 +1788,55 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: expect.any(Number), - average: 0, - median: expect.any(Number), + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) - expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.median == null) { + expect(workerNode.usage.runTime.median).toBeUndefined() + } else { + expect(workerNode.usage.runTime.median).toBeGreaterThan(0) + } } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -1095,7 +1860,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1111,7 +1881,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1120,7 +1895,7 @@ describe('Selection strategies test suite', () => { ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workerVirtualTaskRunTime ).toBe(0) await pool.destroy() @@ -1132,7 +1907,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1148,7 +1928,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1157,14 +1942,41 @@ describe('Selection strategies test suite', () => { ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workerVirtualTaskRunTime ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( @@ -1175,13 +1987,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) await pool.destroy() pool = new DynamicThreadPool( @@ -1193,13 +2013,21 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ - runTime: false, - avgRunTime: false, - medRunTime: false, - waitTime: false, - avgWaitTime: false, - medWaitTime: false, - elu: false + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } }) // We need to clean up the resources after our test await pool.destroy() @@ -1222,27 +2050,34 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: 0, - average: 0, - median: 0, + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1252,13 +2087,23 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentRoundId + ).roundId ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).workerNodeId ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -1290,27 +2135,34 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: { - aggregation: 0, - average: 0, - median: 0, + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: undefined + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1320,13 +2172,23 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentRoundId + ).roundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -1350,12 +2212,22 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentRoundId + ).roundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).previousWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1370,13 +2242,23 @@ describe('Selection strategies test suite', () => { pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).currentRoundId + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundId ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).workerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1385,7 +2267,7 @@ describe('Selection strategies test suite', () => { ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).roundWeights ).toStrictEqual([ pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1401,12 +2283,22 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentRoundId + ).roundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).previousWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1422,7 +2314,22 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).roundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1431,7 +2338,7 @@ describe('Selection strategies test suite', () => { ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).roundWeights ).toStrictEqual([ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(