X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=13bc904a07bf12a02f1958991c53ef54a21889c2;hb=e5536a06df85c554b8832f5fb5195b369258053b;hp=3c690fb819e9c718163e86badcde28786a5dd849;hpb=4a59691c9f48b3a6602510aeece809e2a0fe14c1;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3c690fb8..13bc904a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -68,8 +68,6 @@ export abstract class AbstractPool< /** * Worker choice strategy context referencing a worker choice algorithm implementation. - * - * Default to a round robin algorithm. */ protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< Worker, @@ -381,6 +379,11 @@ export abstract class AbstractPool< */ protected abstract get busy (): boolean + /** + * Whether worker nodes are executing at least one task. + * + * @returns Worker nodes busyness boolean status. + */ protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { @@ -436,7 +439,7 @@ export abstract class AbstractPool< } /** - * Shutdowns the given worker. + * Terminates the given worker. * * @param worker - A worker within `workerNodes`. */ @@ -486,14 +489,21 @@ export abstract class AbstractPool< ): void { const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + + private updateTaskStatisticsWorkerUsage ( + workerUsage: WorkerUsage, + message: MessageValue + ): void { const workerTaskStatistics = workerUsage.tasks --workerTaskStatistics.executing ++workerTaskStatistics.executed if (message.taskError != null) { ++workerTaskStatistics.failed } - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) } private updateRunTimeWorkerUsage ( @@ -511,7 +521,8 @@ export abstract class AbstractPool< workerUsage.tasks.executed !== 0 ) { workerUsage.runTime.average = - workerUsage.runTime.aggregate / workerUsage.tasks.executed + workerUsage.runTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime @@ -541,7 +552,8 @@ export abstract class AbstractPool< workerUsage.tasks.executed !== 0 ) { workerUsage.waitTime.average = - workerUsage.waitTime.aggregate / workerUsage.tasks.executed + workerUsage.waitTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -555,25 +567,46 @@ export abstract class AbstractPool< } private updateEluWorkerUsage ( - workerTasksUsage: WorkerUsage, + workerUsage: WorkerUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) { + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .aggregate + ) { + if (workerUsage.elu != null && message.taskPerformance?.elu != null) { + workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle + workerUsage.elu.active.aggregate += message.taskPerformance.elu.active + workerUsage.elu.utilization = + (workerUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 + } else if (message.taskPerformance?.elu != null) { + workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle + workerUsage.elu.active.aggregate = message.taskPerformance.elu.active + workerUsage.elu.utilization = message.taskPerformance.elu.utilization + } if ( - workerTasksUsage.elu != null && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .average && + workerUsage.tasks.executed !== 0 + ) { + const executedTasks = + workerUsage.tasks.executed - workerUsage.tasks.failed + workerUsage.elu.idle.average = + workerUsage.elu.idle.aggregate / executedTasks + workerUsage.elu.active.average = + workerUsage.elu.active.aggregate / executedTasks + } + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .median && message.taskPerformance?.elu != null ) { - workerTasksUsage.elu = { - idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle, - active: - workerTasksUsage.elu.active + message.taskPerformance.elu.active, - utilization: - (workerTasksUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } - } else if (message.taskPerformance?.elu != null) { - workerTasksUsage.elu = message.taskPerformance.elu + workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle) + workerUsage.elu.active.history.push(message.taskPerformance.elu.active) + workerUsage.elu.idle.median = median(workerUsage.elu.idle.history) + workerUsage.elu.active.median = median(workerUsage.elu.active.history) } } } @@ -581,33 +614,29 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. * - * The default worker choice strategy uses a round robin algorithm to distribute the load. + * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * * @returns The worker node key */ - protected chooseWorkerNode (): number { - let workerNodeKey: number - if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) { - const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { - const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated) - if ( - isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && - this.workerNodes[currentWorkerNodeKey].workerUsage.tasks - .executing === 0) - ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - this.flushTasksQueue(currentWorkerNodeKey) - // FIXME: wait for tasks to be finished - void (this.destroyWorker(workerCreated) as Promise) - } - }) - workerNodeKey = this.getWorkerNodeKey(workerCreated) - } else { - workerNodeKey = this.workerChoiceStrategyContext.execute() + private chooseWorkerNode (): number { + if (this.shallCreateDynamicWorker()) { + const worker = this.createAndSetupDynamicWorker() + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + ) { + return this.getWorkerNodeKey(worker) + } } - return workerNodeKey + return this.workerChoiceStrategyContext.execute() + } + + /** + * Conditions for dynamic worker creation. + * + * @returns Whether to create a dynamic worker or not. + */ + private shallCreateDynamicWorker (): boolean { + return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() } /** @@ -632,7 +661,9 @@ export abstract class AbstractPool< >(worker: Worker, listener: (message: MessageValue) => void): void /** - * Returns a newly created worker. + * Creates a new worker. + * + * @returns Newly created worker. */ protected abstract createWorker (): Worker @@ -659,8 +690,6 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - }) - worker.on('error', () => { if (this.opts.restartWorkerOnError === true) { this.createAndSetupWorker() } @@ -680,6 +709,33 @@ export abstract class AbstractPool< return worker } + /** + * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * + * @returns New, completely set up dynamic worker. + */ + protected createAndSetupDynamicWorker (): Worker { + const worker = this.createAndSetupWorker() + this.registerWorkerMessageListener(worker, message => { + const currentWorkerNodeKey = this.getWorkerNodeKey(worker) + if ( + isKillBehavior(KillBehaviors.HARD, message.kill) || + (message.kill != null && + ((this.opts.enableTasksQueue === false && + this.workerNodes[currentWorkerNodeKey].workerUsage.tasks + .executing === 0) || + (this.opts.enableTasksQueue === true && + this.workerNodes[currentWorkerNodeKey].workerUsage.tasks + .executing === 0 && + this.tasksQueueSize(currentWorkerNodeKey) === 0))) + ) { + // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) + void (this.destroyWorker(worker) as Promise) + } + }) + return worker + } + /** * This function is the listener registered for each worker message. * @@ -711,6 +767,7 @@ export abstract class AbstractPool< this.dequeueTask(workerNodeKey) as Task ) } + this.workerChoiceStrategyContext.update(workerNodeKey) } } } @@ -829,7 +886,7 @@ export abstract class AbstractPool< this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu + .elu.aggregate } }) } @@ -849,7 +906,21 @@ export abstract class AbstractPool< median: 0, history: new CircularArray() }, - elu: undefined + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + utilization: 0 + } } }