From: Jérôme Benoit Date: Thu, 8 Jun 2023 21:56:04 +0000 (+0200) Subject: fix: fix task wait time computation X-Git-Tag: v2.6.0~11 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=1c6fe997dcb16b510da6587b992cd3f66d62a259;p=poolifier.git fix: fix task wait time computation Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index fd67064c..01c1dab4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Refactor pool worker node usage internals. + +### Fixed + +- Fix wait time accounting. +- Ensure worker choice strategy `LEAST_BUSY` accounts also tasks wait time. +- Ensure worker choice strategy `LEAST_USED` accounts also queued tasks. + ## [2.5.4] - 2023-06-07 ### Added @@ -142,7 +152,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Ensure one task at a time is executed per worker with tasks queueing enabled. -- Properly count worker running tasks with tasks queueing enabled. +- Properly count worker executing tasks with tasks queueing enabled. ## [2.4.5] - 2023-04-09 diff --git a/README.md b/README.md index 4c7b33a9..5383863d 100644 --- a/README.md +++ b/README.md @@ -161,8 +161,8 @@ An object with these properties: - `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool: - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion - - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of running and ran tasks - - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution time + - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks + - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution and wait time - `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time - `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental) - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time @@ -238,8 +238,8 @@ This method will call the terminate method on each worker. Default: `60000` - `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it. - **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted. - **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted. + **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **won't** be deleted. + **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted. This option only apply to the newly created workers. Default: `KillBehaviors.SOFT` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index db0d5455..ca660951 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -462,9 +462,15 @@ export abstract class AbstractPool< * Can be overridden. * * @param workerNodeKey - The worker node key. + * @param task - The task to execute. */ - protected beforeTaskExecutionHook (workerNodeKey: number): void { - ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing + protected beforeTaskExecutionHook ( + workerNodeKey: number, + task: Task + ): void { + const workerUsage = this.workerNodes[workerNodeKey].workerUsage + ++workerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(workerUsage, task) } /** @@ -486,9 +492,7 @@ export abstract class AbstractPool< if (message.taskError != null) { ++workerTaskStatistics.failed } - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateWaitTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) } @@ -521,12 +525,14 @@ export abstract class AbstractPool< private updateWaitTimeWorkerUsage ( workerUsage: WorkerUsage, - message: MessageValue + task: Task ): void { + const timestamp = performance.now() + const taskWaitTime = timestamp - (task.timestamp ?? timestamp) if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime ) { - workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0 + workerUsage.waitTime.aggregation += taskWaitTime ?? 0 if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .avgWaitTime && @@ -538,9 +544,9 @@ export abstract class AbstractPool< if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .medWaitTime && - message.taskPerformance?.waitTime != null + taskWaitTime != null ) { - workerUsage.waitTime.history.push(message.taskPerformance.waitTime) + workerUsage.waitTime.history.push(taskWaitTime) workerUsage.waitTime.median = median(workerUsage.waitTime.history) } } @@ -781,7 +787,7 @@ export abstract class AbstractPool< } private executeTask (workerNodeKey: number, task: Task): void { - this.beforeTaskExecutionHook(workerNodeKey) + this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) } @@ -820,9 +826,6 @@ export abstract class AbstractPool< runTime: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime, - waitTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .waitTime, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu } @@ -831,7 +834,7 @@ export abstract class AbstractPool< private getWorkerUsage (worker: Worker): WorkerUsage { return { - tasks: this.getTaskStatistics(this, worker), + tasks: this.getTaskStatistics(worker), runTime: { aggregation: 0, average: 0, @@ -848,15 +851,14 @@ export abstract class AbstractPool< } } - private getTaskStatistics ( - self: AbstractPool, - worker: Worker - ): TaskStatistics { + private getTaskStatistics (worker: Worker): TaskStatistics { + const queueSize = + this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size return { executed: 0, executing: 0, get queued (): number { - return self.tasksQueueSize(self.getWorkerNodeKey(worker)) + return queueSize ?? 0 }, failed: 0 } diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index b5b77ff6..41573ab4 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -154,7 +154,7 @@ export abstract class AbstractWorkerChoiceStrategy< /** * Finds the first free worker node key based on the number of tasks the worker has applied. * - * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned. + * If a worker is found with `0` executing tasks, it is detected as free and its worker node key is returned. * * If no free worker is found, `-1` is returned. * @@ -169,7 +169,7 @@ export abstract class AbstractWorkerChoiceStrategy< /** * Finds the last free worker node key based on the number of tasks the worker has applied. * - * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned. + * If a worker is found with `0` executing tasks, it is detected as free and its worker node key is returned. * * If no free worker is found, `-1` is returned. * diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index f171ab5f..3c085685 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -27,7 +27,7 @@ export class LeastBusyWorkerChoiceStrategy< runTime: true, avgRunTime: false, medRunTime: false, - waitTime: false, + waitTime: true, avgWaitTime: false, medWaitTime: false, elu: false @@ -54,14 +54,16 @@ export class LeastBusyWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number { - let minRunTime = Infinity + let minTime = Infinity let leastBusyWorkerNodeKey!: number for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { - const workerRunTime = workerNode.workerUsage.runTime.aggregation - if (workerRunTime === 0) { + const workerTime = + workerNode.workerUsage.runTime.aggregation + + workerNode.workerUsage.waitTime.aggregation + if (workerTime === 0) { return workerNodeKey - } else if (workerRunTime < minRunTime) { - minRunTime = workerRunTime + } else if (workerTime < minTime) { + minTime = workerTime leastBusyWorkerNodeKey = workerNodeKey } } diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index ee7834a6..528bca1d 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -51,7 +51,9 @@ export class LeastUsedWorkerChoiceStrategy< for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { const workerTaskStatistics = workerNode.workerUsage.tasks const workerTasks = - workerTaskStatistics.executed + workerTaskStatistics.executing + workerTaskStatistics.executed + + workerTaskStatistics.executing + + workerTaskStatistics.queued if (workerTasks === 0) { return workerNodeKey } else if (workerTasks < minNumberOfTasks) { diff --git a/src/utility-types.ts b/src/utility-types.ts index 57ff0003..bfbc2a84 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -39,10 +39,6 @@ export interface TaskPerformance { * Task runtime. */ runTime?: number - /** - * Task wait time. - */ - waitTime?: number /** * Task event loop utilization. */ @@ -54,7 +50,6 @@ export interface TaskPerformance { */ export interface WorkerStatistics { runTime: boolean - waitTime: boolean elu: boolean } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 76a7ee70..a84bb069 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -224,7 +224,7 @@ export abstract class AbstractWorker< message: MessageValue ): void { try { - let taskPerformance = this.beginTaskPerformance(message) + let taskPerformance = this.beginTaskPerformance() const res = fn(message.data) taskPerformance = this.endTaskPerformance(taskPerformance) this.sendToMainWorker({ @@ -256,7 +256,7 @@ export abstract class AbstractWorker< fn: WorkerAsyncFunction, message: MessageValue ): void { - let taskPerformance = this.beginTaskPerformance(message) + let taskPerformance = this.beginTaskPerformance() fn(message.data) .then(res => { taskPerformance = this.endTaskPerformance(taskPerformance) @@ -297,13 +297,9 @@ export abstract class AbstractWorker< return fn } - private beginTaskPerformance (message: MessageValue): TaskPerformance { - const timestamp = performance.now() + private beginTaskPerformance (): TaskPerformance { return { - timestamp, - ...(this.statistics.waitTime && { - waitTime: timestamp - (message.timestamp ?? timestamp) - }), + timestamp: performance.now(), ...(this.statistics.elu && { elu: performance.eventLoopUtilization() }) } } diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index e0cd392f..b71a90e5 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -3,11 +3,11 @@ */ export const KillBehaviors = Object.freeze({ /** - * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **wont** be deleted. + * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **wont** be deleted. */ SOFT: 'SOFT', /** - * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted. + * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted. */ HARD: 'HARD' } as const) @@ -59,8 +59,8 @@ export interface WorkerOptions { /** * `killBehavior` dictates if your async unit (worker/process) will be deleted in case that a task is active on it. * - * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted. - * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted. + * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **won't** be deleted. + * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted. * * This option only apply to the newly created workers. * diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 291629f5..a5250e53 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -82,15 +82,15 @@ describe('Dynamic cluster pool test suite', () => { await pool1.destroy() }) - it('Verify scale processes up and down is working when long running task is used:hard', async () => { + it('Verify scale processes up and down is working when long executing task is used:hard', async () => { const longRunningPool = new DynamicClusterPool( min, max, './tests/worker-files/cluster/longRunningWorkerHardBehavior.js', { errorHandler: e => console.error(e), - onlineHandler: () => console.log('long running worker is online'), - exitHandler: () => console.log('long running worker exited') + onlineHandler: () => console.log('long executing worker is online'), + exitHandler: () => console.log('long executing worker exited') } ) expect(longRunningPool.workerNodes.length).toBe(min) @@ -109,15 +109,15 @@ describe('Dynamic cluster pool test suite', () => { await longRunningPool.destroy() }) - it('Verify scale processes up and down is working when long running task is used:soft', async () => { + it('Verify scale processes up and down is working when long executing task is used:soft', async () => { const longRunningPool = new DynamicClusterPool( min, max, './tests/worker-files/cluster/longRunningWorkerSoftBehavior.js', { errorHandler: e => console.error(e), - onlineHandler: () => console.log('long running worker is online'), - exitHandler: () => console.log('long running worker exited') + onlineHandler: () => console.log('long executing worker is online'), + exitHandler: () => console.log('long executing worker exited') } ) expect(longRunningPool.workerNodes.length).toBe(min) @@ -126,7 +126,7 @@ describe('Dynamic cluster pool test suite', () => { } expect(longRunningPool.workerNodes.length).toBe(max) await TestUtils.sleep(1500) - // Here we expect the workerNodes to be at the max size since the task is still running + // Here we expect the workerNodes to be at the max size since the task is still executing expect(longRunningPool.workerNodes.length).toBe(max) // We need to clean up the resources after our test await longRunningPool.destroy() diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 7a67f6c4..809d5350 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -446,7 +446,7 @@ describe('Selection strategies test suite', () => { runTime: true, avgRunTime: false, medRunTime: false, - waitTime: false, + waitTime: true, avgWaitTime: false, medWaitTime: false, elu: false @@ -464,7 +464,7 @@ describe('Selection strategies test suite', () => { runTime: true, avgRunTime: false, medRunTime: false, - waitTime: false, + waitTime: true, avgWaitTime: false, medWaitTime: false, elu: false @@ -501,7 +501,7 @@ describe('Selection strategies test suite', () => { history: expect.any(CircularArray) }, waitTime: { - aggregation: 0, + aggregation: expect.any(Number), average: 0, median: 0, history: expect.any(CircularArray) @@ -515,6 +515,9 @@ describe('Selection strategies test suite', () => { expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual( 0 ) + expect( + workerNode.workerUsage.waitTime.aggregation + ).toBeGreaterThanOrEqual(0) } // We need to clean up the resources after our test await pool.destroy() @@ -549,7 +552,7 @@ describe('Selection strategies test suite', () => { history: expect.any(CircularArray) }, waitTime: { - aggregation: 0, + aggregation: expect.any(Number), average: 0, median: 0, history: expect.any(CircularArray) @@ -561,6 +564,7 @@ describe('Selection strategies test suite', () => { max * maxMultiplier ) expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + expect(workerNode.workerUsage.waitTime.aggregation).toBeGreaterThan(0) } // We need to clean up the resources after our test await pool.destroy() diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 404f2113..1f2a66f2 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -82,15 +82,15 @@ describe('Dynamic thread pool test suite', () => { await pool1.destroy() }) - it('Verify scale thread up and down is working when long running task is used:hard', async () => { + it('Verify scale thread up and down is working when long executing task is used:hard', async () => { const longRunningPool = new DynamicThreadPool( min, max, './tests/worker-files/thread/longRunningWorkerHardBehavior.js', { errorHandler: e => console.error(e), - onlineHandler: () => console.log('long running worker is online'), - exitHandler: () => console.log('long running worker exited') + onlineHandler: () => console.log('long executing worker is online'), + exitHandler: () => console.log('long executing worker exited') } ) expect(longRunningPool.workerNodes.length).toBe(min) @@ -109,15 +109,15 @@ describe('Dynamic thread pool test suite', () => { await longRunningPool.destroy() }) - it('Verify scale thread up and down is working when long running task is used:soft', async () => { + it('Verify scale thread up and down is working when long executing task is used:soft', async () => { const longRunningPool = new DynamicThreadPool( min, max, './tests/worker-files/thread/longRunningWorkerSoftBehavior.js', { errorHandler: e => console.error(e), - onlineHandler: () => console.log('long running worker is online'), - exitHandler: () => console.log('long running worker exited') + onlineHandler: () => console.log('long executing worker is online'), + exitHandler: () => console.log('long executing worker exited') } ) expect(longRunningPool.workerNodes.length).toBe(min) @@ -126,7 +126,7 @@ describe('Dynamic thread pool test suite', () => { } expect(longRunningPool.workerNodes.length).toBe(max) await TestUtils.sleep(1500) - // Here we expect the workerNodes to be at the max size since the task is still running + // Here we expect the workerNodes to be at the max size since the task is still executing expect(longRunningPool.workerNodes.length).toBe(max) // We need to clean up the resources after our test await longRunningPool.destroy()