X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=1341c9afdd8b867faea351389a6f2792b21e7bc6;hb=e8b3a5abad7d3ae4fa836ab0b76d54c1efd146e9;hp=51f77901383181f04725ca1bb84ec1c60f241855;hpb=a4e07f7216246b772e13783937a97c87105b8fc3;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 51f77901..1341c9af 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,7 +21,13 @@ import { type TasksQueueOptions, type WorkerType } from './pool' -import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker' +import type { + IWorker, + Task, + TaskStatistics, + WorkerNode, + WorkerUsage +} from './worker' import { WorkerChoiceStrategies, type WorkerChoiceStrategy, @@ -62,8 +68,6 @@ export abstract class AbstractPool< /** * Worker choice strategy context referencing a worker choice algorithm implementation. - * - * Default to a round robin algorithm. */ protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< Worker, @@ -309,30 +313,10 @@ export abstract class AbstractPool< this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } for (const workerNode of this.workerNodes) { - this.setWorkerNodeTasksUsage(workerNode, { - tasks: { - executing: 0, - executed: 0, - queued: - this.opts.enableTasksQueue === true - ? workerNode.tasksQueue.size - : 0, - failed: 0 - }, - runTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - elu: undefined - }) + this.setWorkerNodeTasksUsage( + workerNode, + this.getWorkerUsage(workerNode.worker) + ) this.setWorkerStatistics(workerNode.worker) } } @@ -395,6 +379,11 @@ export abstract class AbstractPool< */ protected abstract get busy (): boolean + /** + * Whether worker nodes are executing at least one task. + * + * @returns Worker nodes busyness boolean status. + */ protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { @@ -450,7 +439,7 @@ export abstract class AbstractPool< } /** - * Shutdowns the given worker. + * Terminates the given worker. * * @param worker - A worker within `workerNodes`. */ @@ -476,13 +465,15 @@ export abstract class AbstractPool< * Can be overridden. * * @param workerNodeKey - The worker node key. + * @param task - The task to execute. */ - protected beforeTaskExecutionHook (workerNodeKey: number): void { - ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing - if (this.opts.enableTasksQueue === true) { - this.workerNodes[workerNodeKey].workerUsage.tasks.queued = - this.tasksQueueSize(workerNodeKey) - } + protected beforeTaskExecutionHook ( + workerNodeKey: number, + task: Task + ): void { + const workerUsage = this.workerNodes[workerNodeKey].workerUsage + ++workerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(workerUsage, task) } /** @@ -498,33 +489,44 @@ export abstract class AbstractPool< ): void { const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + + private updateTaskStatisticsWorkerUsage ( + workerUsage: WorkerUsage, + message: MessageValue + ): void { const workerTaskStatistics = workerUsage.tasks --workerTaskStatistics.executing ++workerTaskStatistics.executed if (message.taskError != null) { ++workerTaskStatistics.failed } - - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateWaitTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) } private updateRunTimeWorkerUsage ( workerUsage: WorkerUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) { - workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0 + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .aggregate + ) { + workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0 if ( - this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .average && workerUsage.tasks.executed !== 0 ) { workerUsage.runTime.average = - workerUsage.runTime.aggregation / workerUsage.tasks.executed + workerUsage.runTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( - this.workerChoiceStrategyContext.getTaskStatistics().medRunTime && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .median && message.taskPerformance?.runTime != null ) { workerUsage.runTime.history.push(message.taskPerformance.runTime) @@ -535,47 +537,76 @@ export abstract class AbstractPool< private updateWaitTimeWorkerUsage ( workerUsage: WorkerUsage, - message: MessageValue + task: Task ): void { - if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) { - workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0 + const timestamp = performance.now() + const taskWaitTime = timestamp - (task.timestamp ?? timestamp) + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime + .aggregate + ) { + workerUsage.waitTime.aggregate += taskWaitTime ?? 0 if ( - this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.average && workerUsage.tasks.executed !== 0 ) { workerUsage.waitTime.average = - workerUsage.waitTime.aggregation / workerUsage.tasks.executed + workerUsage.waitTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( - this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime && - message.taskPerformance?.waitTime != null + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.median && + taskWaitTime != null ) { - workerUsage.waitTime.history.push(message.taskPerformance.waitTime) + workerUsage.waitTime.history.push(taskWaitTime) workerUsage.waitTime.median = median(workerUsage.waitTime.history) } } } private updateEluWorkerUsage ( - workerTasksUsage: WorkerUsage, + workerUsage: WorkerUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getTaskStatistics().elu) { + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .aggregate + ) { + if (workerUsage.elu != null && message.taskPerformance?.elu != null) { + workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle + workerUsage.elu.active.aggregate += message.taskPerformance.elu.active + workerUsage.elu.utilization = + (workerUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 + } else if (message.taskPerformance?.elu != null) { + workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle + workerUsage.elu.active.aggregate = message.taskPerformance.elu.active + workerUsage.elu.utilization = message.taskPerformance.elu.utilization + } + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .average && + workerUsage.tasks.executed !== 0 + ) { + const executedTasks = + workerUsage.tasks.executed - workerUsage.tasks.failed + workerUsage.elu.idle.average = + workerUsage.elu.idle.aggregate / executedTasks + workerUsage.elu.active.average = + workerUsage.elu.active.aggregate / executedTasks + } if ( - workerTasksUsage.elu != null && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .median && message.taskPerformance?.elu != null ) { - workerTasksUsage.elu = { - idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle, - active: - workerTasksUsage.elu.active + message.taskPerformance.elu.active, - utilization: - (workerTasksUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } - } else if (message.taskPerformance?.elu != null) { - workerTasksUsage.elu = message.taskPerformance.elu + workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle) + workerUsage.elu.active.history.push(message.taskPerformance.elu.active) + workerUsage.elu.idle.median = median(workerUsage.elu.idle.history) + workerUsage.elu.active.median = median(workerUsage.elu.active.history) } } } @@ -583,33 +614,29 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. * - * The default worker choice strategy uses a round robin algorithm to distribute the load. + * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * * @returns The worker node key */ - protected chooseWorkerNode (): number { - let workerNodeKey: number - if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) { - const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { - const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated) - if ( - isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && - this.workerNodes[currentWorkerNodeKey].workerUsage.tasks - .executing === 0) - ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - this.flushTasksQueue(currentWorkerNodeKey) - // FIXME: wait for tasks to be finished - void (this.destroyWorker(workerCreated) as Promise) - } - }) - workerNodeKey = this.getWorkerNodeKey(workerCreated) - } else { - workerNodeKey = this.workerChoiceStrategyContext.execute() + private chooseWorkerNode (): number { + if (this.shallCreateDynamicWorker()) { + const worker = this.createAndSetupDynamicWorker() + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + ) { + return this.getWorkerNodeKey(worker) + } } - return workerNodeKey + return this.workerChoiceStrategyContext.execute() + } + + /** + * Conditions for dynamic worker creation. + * + * @returns Whether to create a dynamic worker or not. + */ + private shallCreateDynamicWorker (): boolean { + return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() } /** @@ -634,7 +661,9 @@ export abstract class AbstractPool< >(worker: Worker, listener: (message: MessageValue) => void): void /** - * Returns a newly created worker. + * Creates a new worker. + * + * @returns Newly created worker. */ protected abstract createWorker (): Worker @@ -661,8 +690,6 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - }) - worker.on('error', () => { if (this.opts.restartWorkerOnError === true) { this.createAndSetupWorker() } @@ -682,6 +709,33 @@ export abstract class AbstractPool< return worker } + /** + * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * + * @returns New, completely set up dynamic worker. + */ + protected createAndSetupDynamicWorker (): Worker { + const worker = this.createAndSetupWorker() + this.registerWorkerMessageListener(worker, message => { + const workerNodeKey = this.getWorkerNodeKey(worker) + if ( + isKillBehavior(KillBehaviors.HARD, message.kill) || + (message.kill != null && + ((this.opts.enableTasksQueue === false && + this.workerNodes[workerNodeKey].workerUsage.tasks.executing === + 0) || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].workerUsage.tasks.executing === + 0 && + this.tasksQueueSize(workerNodeKey) === 0))) + ) { + // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) + void (this.destroyWorker(worker) as Promise) + } + }) + return worker + } + /** * This function is the listener registered for each worker message. * @@ -713,6 +767,7 @@ export abstract class AbstractPool< this.dequeueTask(workerNodeKey) as Task ) } + this.workerChoiceStrategyContext.update(workerNodeKey) } } } @@ -751,52 +806,31 @@ export abstract class AbstractPool< private pushWorkerNode (worker: Worker): number { return this.workerNodes.push({ worker, - workerUsage: { - tasks: { - executed: 0, - executing: 0, - queued: 0, - failed: 0 - }, - runTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - - waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - elu: undefined - }, + workerUsage: this.getWorkerUsage(worker), tasksQueue: new Queue>() }) } - /** - * Sets the given worker in the pool worker nodes. - * - * @param workerNodeKey - The worker node key. - * @param worker - The worker. - * @param workerUsage - The worker usage. - * @param tasksQueue - The worker task queue. - */ - private setWorkerNode ( - workerNodeKey: number, - worker: Worker, - workerUsage: WorkerUsage, - tasksQueue: Queue> - ): void { - this.workerNodes[workerNodeKey] = { - worker, - workerUsage, - tasksQueue - } - } + // /** + // * Sets the given worker in the pool worker nodes. + // * + // * @param workerNodeKey - The worker node key. + // * @param worker - The worker. + // * @param workerUsage - The worker usage. + // * @param tasksQueue - The worker task queue. + // */ + // private setWorkerNode ( + // workerNodeKey: number, + // worker: Worker, + // workerUsage: WorkerUsage, + // tasksQueue: Queue> + // ): void { + // this.workerNodes[workerNodeKey] = { + // worker, + // workerUsage, + // tasksQueue + // } + // } /** * Removes the given worker from the pool worker nodes. @@ -812,7 +846,7 @@ export abstract class AbstractPool< } private executeTask (workerNodeKey: number, task: Task): void { - this.beforeTaskExecutionHook(workerNodeKey) + this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) } @@ -848,10 +882,58 @@ export abstract class AbstractPool< private setWorkerStatistics (worker: Worker): void { this.sendToWorker(worker, { statistics: { - runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime, - waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime, - elu: this.workerChoiceStrategyContext.getTaskStatistics().elu + runTime: + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.aggregate, + elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .elu.aggregate } }) } + + private getWorkerUsage (worker: Worker): WorkerUsage { + return { + tasks: this.getTaskStatistics(worker), + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + utilization: 0 + } + } + } + + private getTaskStatistics (worker: Worker): TaskStatistics { + const queueSize = + this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size + return { + executed: 0, + executing: 0, + get queued (): number { + return queueSize ?? 0 + }, + failed: 0 + } + } }