X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6081902720554c2bcdb809d37f04d5f93ed89da7;hb=79b197bb222403411621c41775d9d640f4150633;hp=a11a574147cf074dc938d1e0d47b3797e6e15ad2;hpb=ddc35f92c712f38d9cfd79fd558cf80c0e031219;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index a11a5741..60819027 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -405,6 +405,13 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { backPressure: this.hasBackPressure() }), + ...(this.opts.enableTasksQueue === true && { + stolenTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.stolen, + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -427,21 +434,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - average( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), - [] + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] + ) ) ) - ), + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] ) ) ) @@ -465,21 +477,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - average( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), - [] + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] + ) ) ) - ), + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] ) ) ) @@ -775,6 +792,7 @@ export abstract class AbstractPool< if ( this.opts.enableTasksQueue === false || (this.opts.enableTasksQueue === true && + this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number)) ) { @@ -826,7 +844,7 @@ export abstract class AbstractPool< * @virtual */ protected setupHook (): void { - // Intentionally empty + /** Intentionally empty */ } /** @@ -1189,15 +1207,8 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let destinationWorkerNodeKey!: number let minQueuedTasks = Infinity - let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { if (workerNode.info.ready && workerNodeId !== workerNodeKey) { - if ( - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } if (workerNode.usage.tasks.queued === 0) { destinationWorkerNodeKey = workerNodeId break @@ -1209,12 +1220,16 @@ export abstract class AbstractPool< } } if (destinationWorkerNodeKey != null) { + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const task = { ...(this.dequeueTask(workerNodeKey) as Task), - workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) - .id as number + workerId: destinationWorkerNode.info.id as number } - if (executeTask) { + if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { this.executeTask(destinationWorkerNodeKey, task) } else { this.enqueueTask(destinationWorkerNodeKey, task) @@ -1246,13 +1261,29 @@ export abstract class AbstractPool< workerId: destinationWorkerNode.info.id as number } if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && destinationWorkerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask(destinationWorkerNodeKey, task) } else { this.enqueueTask(destinationWorkerNodeKey, task) } + if (destinationWorkerNode?.usage != null) { + ++destinationWorkerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) && + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { + const taskFunctionWorkerUsage = + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } break } } @@ -1280,13 +1311,26 @@ export abstract class AbstractPool< workerId: workerNode.info.id as number } if ( + this.tasksQueueSize(workerNodeKey) === 0 && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask(workerNodeKey, task) } else { this.enqueueTask(workerNodeKey, task) } + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(task.name as string) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } } } }