From: Jérôme Benoit Date: Tue, 19 Aug 2025 12:13:33 +0000 (+0200) Subject: fix: worker index identification at tasks stealing under back pressure X-Git-Tag: v5.1.1~4 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=ae736ce03e1f63459c4740e09ab274fb8af5cd59;p=poolifier.git fix: worker index identification at tasks stealing under back pressure Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 84ef7c6a1..65ad5e3a4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -90,6 +90,8 @@ export abstract class AbstractPool< /** @inheritDoc */ public get info (): PoolInfo { + const taskStatisticsRequirements = + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() return { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion defaultStrategy: this.opts.workerChoiceStrategy!, @@ -101,10 +103,8 @@ export abstract class AbstractPool< type: this.type, version, worker: this.worker, - ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .runTime.aggregate === true && - this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .waitTime.aggregate && { + ...(taskStatisticsRequirements?.runTime.aggregate === true && + taskStatisticsRequirements.waitTime.aggregate && { utilization: round(this.utilization), }), busyWorkerNodes: this.workerNodes.reduce( @@ -172,8 +172,7 @@ export abstract class AbstractPool< 0 ), }), - ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .runTime.aggregate === true && { + ...(taskStatisticsRequirements?.runTime.aggregate === true && { runTime: { maximum: round( max( @@ -191,8 +190,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .runTime.average && { + ...(taskStatisticsRequirements.runTime.average && { average: round( average( this.workerNodes.reduce( @@ -205,8 +203,7 @@ export abstract class AbstractPool< ) ), }), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .runTime.median && { + ...(taskStatisticsRequirements.runTime.median && { median: round( median( this.workerNodes.reduce( @@ -221,8 +218,7 @@ export abstract class AbstractPool< }), }, }), - ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .waitTime.aggregate === true && { + ...(taskStatisticsRequirements?.waitTime.aggregate === true && { waitTime: { maximum: round( max( @@ -240,8 +236,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .waitTime.average && { + ...(taskStatisticsRequirements.waitTime.average && { average: round( average( this.workerNodes.reduce( @@ -254,8 +249,7 @@ export abstract class AbstractPool< ) ), }), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .waitTime.median && { + ...(taskStatisticsRequirements.waitTime.median && { median: round( median( this.workerNodes.reduce( @@ -270,8 +264,7 @@ export abstract class AbstractPool< }), }, }), - ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .elu.aggregate === true && { + ...(taskStatisticsRequirements?.elu.aggregate === true && { elu: { active: { maximum: round( @@ -292,8 +285,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .elu.average && { + ...(taskStatisticsRequirements.elu.average && { average: round( average( this.workerNodes.reduce( @@ -306,8 +298,7 @@ export abstract class AbstractPool< ) ), }), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .elu.median && { + ...(taskStatisticsRequirements.elu.median && { median: round( median( this.workerNodes.reduce( @@ -340,8 +331,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .elu.average && { + ...(taskStatisticsRequirements.elu.average && { average: round( average( this.workerNodes.reduce( @@ -354,8 +344,7 @@ export abstract class AbstractPool< ) ), }), - ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() - .elu.median && { + ...(taskStatisticsRequirements.elu.median && { median: round( median( this.workerNodes.reduce( @@ -518,6 +507,9 @@ export abstract class AbstractPool< const poolTimeCapacity = (performance.now() - this.startTimestamp) * (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) + if (!Number.isFinite(poolTimeCapacity) || poolTimeCapacity <= 0) { + return 0 + } const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => accumulator + (workerNode.usage.runTime.aggregate ?? 0), @@ -1239,6 +1231,7 @@ export abstract class AbstractPool< * @returns Worker nodes back pressure boolean status. */ protected internalBackPressure (): boolean { + if (this.workerNodes.length === 0) return false return ( this.workerNodes.reduce( (accumulator, _, workerNodeKey) => @@ -1255,6 +1248,7 @@ export abstract class AbstractPool< * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { + if (this.workerNodes.length === 0) return false return ( this.workerNodes.reduce( (accumulator, _, workerNodeKey) => @@ -1881,7 +1875,7 @@ export abstract class AbstractPool< (workerNodeA, workerNodeB) => workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued ) - for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + for (const workerNode of workerNodes) { if (sourceWorkerNode.usage.tasks.queued === 0) { break } @@ -1892,6 +1886,7 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.opts.tasksQueueOptions!.size! - sizeOffset ) { + const workerNodeKey = this.workerNodes.indexOf(workerNode) workerNode.info.backPressureStealing = true this.stealTask(sourceWorkerNode, workerNodeKey) workerNode.info.backPressureStealing = false @@ -1977,10 +1972,9 @@ export abstract class AbstractPool< * @param workerNode - The worker node. */ private initWorkerNodeUsage (workerNode: IWorkerNode): void { - if ( + const taskStatisticsRequirements = this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .runTime.aggregate === true - ) { + if (taskStatisticsRequirements?.runTime.aggregate === true) { workerNode.usage.runTime.aggregate = min( ...this.workerNodes.map( workerNode => @@ -1988,10 +1982,7 @@ export abstract class AbstractPool< ) ) } - if ( - this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .waitTime.aggregate === true - ) { + if (taskStatisticsRequirements?.waitTime.aggregate === true) { workerNode.usage.waitTime.aggregate = min( ...this.workerNodes.map( workerNode => @@ -1999,10 +1990,7 @@ export abstract class AbstractPool< ) ) } - if ( - this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu - .aggregate === true - ) { + if (taskStatisticsRequirements?.elu.aggregate === true) { workerNode.usage.elu.active.aggregate = min( ...this.workerNodes.map( workerNode => @@ -2144,15 +2132,22 @@ export abstract class AbstractPool< while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return sourceWorkerNodeKey !== workerNodeKey && - workerNode.info.ready && - workerNode.usage.tasks.queued < - workerNodes[minWorkerNodeKey].usage.tasks.queued + if (workerNodeKey === sourceWorkerNodeKey || !workerNode.info.ready) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + return workerNodeKey + } + return workerNode.usage.tasks.queued < + workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey : minWorkerNodeKey }, - 0 + -1 ) + if (destinationWorkerNodeKey === -1) { + break + } this.handleTask( destinationWorkerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -2250,14 +2245,12 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. */ private sendStatisticsMessageToWorker (workerNodeKey: number): void { + const taskStatisticsRequirements = + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() this.sendToWorker(workerNodeKey, { statistics: { - elu: - this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .elu.aggregate ?? false, - runTime: - this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .runTime.aggregate ?? false, + elu: taskStatisticsRequirements?.elu.aggregate ?? false, + runTime: taskStatisticsRequirements?.runTime.aggregate ?? false, }, }) } @@ -2315,12 +2308,12 @@ export abstract class AbstractPool< message: MessageValue ): Promise { const targetWorkerNodeKeys = [...this.workerNodes.keys()] + const responsesReceived: MessageValue[] = [] const taskFunctionOperationsListener = ( message: MessageValue, resolve: (value: boolean | PromiseLike) => void, reject: (reason?: unknown) => void ): void => { - const responsesReceived: MessageValue[] = [] this.checkMessageWorkerId(message) if ( message.taskFunctionOperationStatus != null && @@ -2467,8 +2460,12 @@ export abstract class AbstractPool< } destinationWorkerNode.info.stealing = true sourceWorkerNode.info.stolen = true - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()! + const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask() + if (stolenTask == null) { + sourceWorkerNode.info.stolen = false + destinationWorkerNode.info.stealing = false + return + } sourceWorkerNode.info.stolen = false destinationWorkerNode.info.stealing = false this.handleTask(destinationWorkerNodeKey, stolenTask) @@ -2554,6 +2551,9 @@ export abstract class AbstractPool< private readonly workerNodeStealTask = ( workerNodeKey: number ): Task | undefined => { + const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (workerNode == null) return const workerNodes = this.workerNodes .slice() .sort( @@ -2561,8 +2561,8 @@ export abstract class AbstractPool< workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) const sourceWorkerNode = workerNodes.find( - (sourceWorkerNode, sourceWorkerNodeKey) => - sourceWorkerNodeKey !== workerNodeKey && + sourceWorkerNode => + sourceWorkerNode !== workerNode && sourceWorkerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 67c870892..3ea7aadec 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -198,10 +198,14 @@ export abstract class AbstractWorker< DEFAULT_TASK_NAME, this.taskFunctions.get(DEFAULT_TASK_NAME) ), - buildTaskFunctionProperties( - defaultTaskFunctionName, - this.taskFunctions.get(defaultTaskFunctionName) - ), + ...(defaultTaskFunctionName !== DEFAULT_TASK_NAME + ? [ + buildTaskFunctionProperties( + defaultTaskFunctionName, + this.taskFunctions.get(defaultTaskFunctionName) + ), + ] + : []), ...taskFunctionsProperties, ] } @@ -700,6 +704,7 @@ export abstract class AbstractWorker< this.checkActive.bind(this), (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2 ) + this.activeInterval.unref() } /**