From: Jérôme Benoit Date: Tue, 30 May 2023 08:52:30 +0000 (+0200) Subject: Merge branch 'master' of github.com:poolifier/poolifier into waittime X-Git-Tag: v2.5.0~9^2^2~10 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=1d6aaa0a809637ba26d80288126f5a160eb8ce60;hp=e4543b1428fd6b52f5832ea75f21ac082b52684e;p=poolifier.git Merge branch 'master' of github.com:poolifier/poolifier into waittime --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4c82ef08..2e769fc9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -263,6 +263,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }) } @@ -340,11 +344,13 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { + const submissionTimestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + submissionTimestamp, id: crypto.randomUUID() } const res = new Promise((resolve, reject) => { @@ -448,6 +454,23 @@ export abstract class AbstractPool< workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) } } + if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) { + workerTasksUsage.waitTime += message.waitTime ?? 0 + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime && + workerTasksUsage.run !== 0 + ) { + workerTasksUsage.avgWaitTime = + workerTasksUsage.waitTime / workerTasksUsage.run + } + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime && + message.waitTime != null + ) { + workerTasksUsage.waitTimeHistory.push(message.waitTime) + workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory) + } + } } /** @@ -611,6 +634,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }, tasksQueue: new Queue>() diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index d969f0c0..efa9578d 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -27,7 +27,10 @@ export abstract class AbstractWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: false, avgRunTime: false, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** @@ -52,6 +55,14 @@ export abstract class AbstractWorkerChoiceStrategy< this.requiredStatistics.avgRunTime = true this.requiredStatistics.medRunTime = opts.medRunTime as boolean } + if (this.requiredStatistics.avgWaitTime && opts.medWaitTime === true) { + this.requiredStatistics.avgWaitTime = false + this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean + } + if (this.requiredStatistics.medWaitTime && opts.medWaitTime === false) { + this.requiredStatistics.avgWaitTime = true + this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean + } } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index e26f6b8f..56d9dc36 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -27,7 +27,10 @@ export class FairShareWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: true, avgRunTime: true, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 05caa42a..6e413825 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -26,7 +26,10 @@ export class LeastBusyWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: true, avgRunTime: false, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 282e306d..c90036fe 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -39,6 +39,12 @@ export interface WorkerChoiceStrategyOptions { * @defaultValue false */ medRunTime?: boolean + /** + * Use tasks median wait time instead of average runtime. + * + * @defaultValue false + */ + medWaitTime?: boolean /** * Worker weights to use for weighted round robin worker selection strategy. * Weight is the tasks maximum average or median runtime in milliseconds. @@ -66,6 +72,18 @@ export interface RequiredStatistics { * Require tasks median runtime. */ medRunTime: boolean + /** + * Require tasks wait time. + */ + waitTime: boolean + /** + * Require tasks average wait time. + */ + avgWaitTime: boolean + /** + * Require tasks median wait time. + */ + medWaitTime: boolean } /** diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index a0553fcb..df45feb3 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -28,7 +28,10 @@ export class WeightedRoundRobinWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: true, avgRunTime: true, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 2db5f630..35d2c0ab 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -45,6 +45,10 @@ export interface Task { * Task input data that will be passed to the worker. */ readonly data?: Data + /** + * Submission timestamp. + */ + readonly submissionTimestamp?: number /** * Message UUID. */ @@ -81,6 +85,22 @@ export interface TasksUsage { * Median tasks runtime. */ medRunTime: number + /** + * Tasks wait time. + */ + waitTime: number + /** + * Tasks wait time history. + */ + waitTimeHistory: CircularArray + /** + * Average tasks wait time. + */ + avgWaitTime: number + /** + * Median tasks wait time. + */ + medWaitTime: number /** * Number of tasks errored. */ diff --git a/src/utility-types.ts b/src/utility-types.ts index 08861ebd..cc04f2cb 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -33,6 +33,10 @@ export interface MessageValue< * Runtime. */ readonly runTime?: number + /** + * Wait time. + */ + readonly waitTime?: number /** * Reference to main worker. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 5d0c9492..176a5893 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -207,12 +207,14 @@ export abstract class AbstractWorker< ): void { try { const startTimestamp = performance.now() + const waitTime = startTimestamp - (message.submissionTimestamp ?? 0) const res = fn(message.data) const runTime = performance.now() - startTimestamp this.sendToMainWorker({ data: res, id: message.id, - runTime + runTime, + waitTime }) } catch (e) { const err = this.handleError(e as Error) @@ -233,13 +235,15 @@ export abstract class AbstractWorker< message: MessageValue ): void { const startTimestamp = performance.now() + const waitTime = startTimestamp - (message.submissionTimestamp ?? 0) fn(message.data) .then(res => { const runTime = performance.now() - startTimestamp this.sendToMainWorker({ data: res, id: message.id, - runTime + runTime, + waitTime }) return null })