From: Jérôme Benoit Date: Mon, 10 Jul 2023 07:04:06 +0000 (+0200) Subject: feat: worker node readiness aware worker choice strategies X-Git-Tag: v2.6.15~21 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=19dbc45b0e2975f938aaae8274902ebe82c48cad;p=poolifier.git feat: worker node readiness aware worker choice strategies Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index fdda81a5..774ec5f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Take into account worker node readiness in worker choice strategies. + ## [2.6.14] - 2023-07-10 ### Fixed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c200d531..812a2087 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -950,9 +950,6 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - if (this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(worker) - } if (this.opts.restartWorkerOnError === true && !this.starting) { if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { this.createAndSetupDynamicWorker() @@ -960,6 +957,9 @@ export abstract class AbstractPool< this.createAndSetupWorker() } } + if (this.opts.enableTasksQueue === true) { + this.redistributeQueuedTasks(worker) + } }) worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 7f26706d..a6589d09 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -128,6 +128,10 @@ export abstract class AbstractWorkerChoiceStrategy< this.setTaskStatisticsRequirements(this.opts) } + protected workerNodeReady (workerNodeKey: number): boolean { + return this.pool.workerNodes[workerNodeKey].info.ready + } + // /** // * Finds a free worker node key. // * diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index f678f83b..351f3d91 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -77,7 +77,10 @@ export class FairShareWorkerChoiceStrategy< } const workerVirtualTaskEndTimestamp = this.workersVirtualTaskEndTimestamp[workerNodeKey] - if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) { + if ( + this.workerNodeReady(workerNodeKey) && + workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp + ) { minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp this.nextWorkerNodeId = workerNodeKey } diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 928b1b2a..b61bca1c 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -66,10 +66,10 @@ export class LeastBusyWorkerChoiceStrategy< const workerTime = (workerNode.usage.runTime?.aggregate ?? 0) + (workerNode.usage.waitTime?.aggregate ?? 0) - if (workerTime === 0) { + if (this.workerNodeReady(workerNodeKey) && workerTime === 0) { this.nextWorkerNodeId = workerNodeKey break - } else if (workerTime < minTime) { + } else if (this.workerNodeReady(workerNodeKey) && workerTime < minTime) { minTime = workerTime this.nextWorkerNodeId = workerNodeKey } diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index 54d0e7cb..57e805e1 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -61,10 +61,13 @@ export class LeastEluWorkerChoiceStrategy< for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { const workerUsage = workerNode.usage const workerElu = workerUsage.elu?.active?.aggregate ?? 0 - if (workerElu === 0) { + if (this.workerNodeReady(workerNodeKey) && workerElu === 0) { this.nextWorkerNodeId = workerNodeKey break - } else if (workerElu < minWorkerElu) { + } else if ( + this.workerNodeReady(workerNodeKey) && + workerElu < minWorkerElu + ) { minWorkerElu = workerElu this.nextWorkerNodeId = workerNodeKey } diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index 53aa05ee..8c138470 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -49,10 +49,13 @@ export class LeastUsedWorkerChoiceStrategy< workerTaskStatistics.executed + workerTaskStatistics.executing + workerTaskStatistics.queued - if (workerTasks === 0) { + if (this.workerNodeReady(workerNodeKey) && workerTasks === 0) { this.nextWorkerNodeId = workerNodeKey break - } else if (workerTasks < minNumberOfTasks) { + } else if ( + this.workerNodeReady(workerNodeKey) && + workerTasks < minNumberOfTasks + ) { minNumberOfTasks = workerTasks this.nextWorkerNodeId = workerNodeKey } diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 167736ed..2e1a49b6 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -623,7 +623,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toStrictEqual({ + expect(workerNode.usage).toMatchObject({ tasks: { executed: expect.any(Number), executing: 0, @@ -632,15 +632,9 @@ describe('Selection strategies test suite', () => { failed: 0 }, runTime: { - aggregate: expect.any(Number), - maximum: expect.any(Number), - minimum: expect.any(Number), history: expect.any(CircularArray) }, waitTime: { - aggregate: expect.any(Number), - maximum: expect.any(Number), - minimum: expect.any(Number), history: expect.any(CircularArray) }, elu: { @@ -656,8 +650,16 @@ describe('Selection strategies test suite', () => { expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.waitTime.aggregate == null) { + expect(workerNode.usage.waitTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0) + } } // We need to clean up the resources after our test await pool.destroy() @@ -678,7 +680,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toStrictEqual({ + expect(workerNode.usage).toMatchObject({ tasks: { executed: expect.any(Number), executing: 0, @@ -687,15 +689,9 @@ describe('Selection strategies test suite', () => { failed: 0 }, runTime: { - aggregate: expect.any(Number), - maximum: expect.any(Number), - minimum: expect.any(Number), history: expect.any(CircularArray) }, waitTime: { - aggregate: expect.any(Number), - maximum: expect.any(Number), - minimum: expect.any(Number), history: expect.any(CircularArray) }, elu: { @@ -711,8 +707,16 @@ describe('Selection strategies test suite', () => { expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.waitTime.aggregate == null) { + expect(workerNode.usage.waitTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0) + } } // We need to clean up the resources after our test await pool.destroy()