X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=062df07791af8666fd94282eac229aef466c7a1b;hb=6fbda8efcfb5ddcc7a8baa0354ffa0d1328bc91f;hp=7b0d541d8bcafaaaab1150fa01067c66acea30c2;hpb=238be25a2a03d2f32e94a70bc4a0ef06c3ae6be9;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7b0d541d..062df077 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -12,6 +12,7 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + average, isKillBehavior, isPlainObject, median, @@ -426,24 +427,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] ) ) ) @@ -467,24 +470,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] ) ) ) @@ -780,6 +785,7 @@ export abstract class AbstractPool< if ( this.opts.enableTasksQueue === false || (this.opts.enableTasksQueue === true && + this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number)) ) { @@ -831,7 +837,7 @@ export abstract class AbstractPool< * @virtual */ protected setupHook (): void { - // Intentionally empty + /** Intentionally empty */ } /** @@ -940,11 +946,13 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } updateMeasurementStatistics( workerUsage.runTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.runTime ?? 0 ) } @@ -957,8 +965,7 @@ export abstract class AbstractPool< updateMeasurementStatistics( workerUsage.waitTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime, - workerUsage.tasks.executed + taskWaitTime ) } @@ -966,19 +973,20 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.active ?? 0 ) updateMeasurementStatistics( workerUsage.elu.idle, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.idle ?? 0 ) if (eluTaskStatisticsRequirements.aggregate) { if (message.taskPerformance?.elu != null) { @@ -1190,17 +1198,10 @@ export abstract class AbstractPool< private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { - let destinationWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey!: number let minQueuedTasks = Infinity - let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { if (workerNode.info.ready && workerNodeId !== workerNodeKey) { - if ( - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } if (workerNode.usage.tasks.queued === 0) { destinationWorkerNodeKey = workerNodeId break @@ -1211,15 +1212,21 @@ export abstract class AbstractPool< } } } - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) - .id as number - } - if (executeTask) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) + if (destinationWorkerNodeKey != null) { + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: destinationWorkerNode.info.id as number + } + if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } } } } @@ -1234,6 +1241,9 @@ export abstract class AbstractPool< workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) for (const sourceWorkerNode of workerNodes) { + if (sourceWorkerNode.usage.tasks.queued === 0) { + break + } if ( sourceWorkerNode.info.ready && sourceWorkerNode.info.id !== workerId && @@ -1244,8 +1254,9 @@ export abstract class AbstractPool< workerId: destinationWorkerNode.info.id as number } if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && destinationWorkerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1267,18 +1278,20 @@ export abstract class AbstractPool< ) for (const [workerNodeKey, workerNode] of workerNodes.entries()) { if ( + sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && workerNode.info.id !== workerId && - sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() + workerNode.usage.tasks.queued < + (this.opts.tasksQueueOptions?.size as number) - 1 ) { const task = { ...(sourceWorkerNode.popTask() as Task), workerId: workerNode.info.id as number } if ( + this.tasksQueueSize(workerNodeKey) === 0 && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask(workerNodeKey, task) } else {