X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=058b43b8a37104b29d1d15978613c42735072547;hb=07783b850f6ca370b70c7638ac3df07877aaf201;hp=f082b8d144993b28621cc58e2941551c8da4a264;hpb=5df69fabd77b3ec4137a6382e2d84791bf0fe85d;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index f082b8d1..058b43b8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -29,6 +29,7 @@ import type { WorkerUsage } from './worker' import { + Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, type WorkerChoiceStrategyOptions @@ -68,8 +69,6 @@ export abstract class AbstractPool< /** * Worker choice strategy context referencing a worker choice algorithm implementation. - * - * Default to a round robin algorithm. */ protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< Worker, @@ -201,6 +200,16 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must have a weight for each worker node' ) } + if ( + workerChoiceStrategyOptions.measurement != null && + !Object.values(Measurements).includes( + workerChoiceStrategyOptions.measurement + ) + ) { + throw new Error( + `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'` + ) + } } private checkValidTasksQueueOptions ( @@ -209,11 +218,20 @@ export abstract class AbstractPool< if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') } - if ((tasksQueueOptions?.concurrency as number) <= 0) { + if ( + tasksQueueOptions?.concurrency != null && + !Number.isSafeInteger(tasksQueueOptions.concurrency) + ) { + throw new TypeError( + 'Invalid worker tasks concurrency: must be an integer' + ) + } + if ( + tasksQueueOptions?.concurrency != null && + tasksQueueOptions.concurrency <= 0 + ) { throw new Error( - `Invalid worker tasks concurrency '${ - tasksQueueOptions.concurrency as number - }'` + `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'` ) } } @@ -381,6 +399,11 @@ export abstract class AbstractPool< */ protected abstract get busy (): boolean + /** + * Whether worker nodes are executing at least one task. + * + * @returns Worker nodes busyness boolean status. + */ protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { @@ -418,7 +441,6 @@ export abstract class AbstractPool< } else { this.executeTask(workerNodeKey, submittedTask) } - this.workerChoiceStrategyContext.update(workerNodeKey) this.checkAndEmitEvents() // eslint-disable-next-line @typescript-eslint/return-await return res @@ -436,7 +458,7 @@ export abstract class AbstractPool< } /** - * Shutdowns the given worker. + * Terminates the given worker. * * @param worker - A worker within `workerNodes`. */ @@ -486,14 +508,21 @@ export abstract class AbstractPool< ): void { const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + + private updateTaskStatisticsWorkerUsage ( + workerUsage: WorkerUsage, + message: MessageValue + ): void { const workerTaskStatistics = workerUsage.tasks --workerTaskStatistics.executing ++workerTaskStatistics.executed if (message.taskError != null) { ++workerTaskStatistics.failed } - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) } private updateRunTimeWorkerUsage ( @@ -511,7 +540,8 @@ export abstract class AbstractPool< workerUsage.tasks.executed !== 0 ) { workerUsage.runTime.average = - workerUsage.runTime.aggregate / workerUsage.tasks.executed + workerUsage.runTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime @@ -541,7 +571,8 @@ export abstract class AbstractPool< workerUsage.tasks.executed !== 0 ) { workerUsage.waitTime.average = - workerUsage.waitTime.aggregate / workerUsage.tasks.executed + workerUsage.waitTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -563,10 +594,8 @@ export abstract class AbstractPool< .aggregate ) { if (workerUsage.elu != null && message.taskPerformance?.elu != null) { - workerUsage.elu.idle.aggregate = - workerUsage.elu.idle.aggregate + message.taskPerformance.elu.idle - workerUsage.elu.active.aggregate = - workerUsage.elu.active.aggregate + message.taskPerformance.elu.active + workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle + workerUsage.elu.active.aggregate += message.taskPerformance.elu.active workerUsage.elu.utilization = (workerUsage.elu.utilization + message.taskPerformance.elu.utilization) / @@ -581,10 +610,12 @@ export abstract class AbstractPool< .average && workerUsage.tasks.executed !== 0 ) { + const executedTasks = + workerUsage.tasks.executed - workerUsage.tasks.failed workerUsage.elu.idle.average = - workerUsage.elu.idle.aggregate / workerUsage.tasks.executed + workerUsage.elu.idle.aggregate / executedTasks workerUsage.elu.active.average = - workerUsage.elu.active.aggregate / workerUsage.tasks.executed + workerUsage.elu.active.aggregate / executedTasks } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu @@ -602,33 +633,29 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. * - * The default worker choice strategy uses a round robin algorithm to distribute the load. + * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * * @returns The worker node key */ - protected chooseWorkerNode (): number { - let workerNodeKey: number - if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) { - const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { - const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated) - if ( - isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && - this.workerNodes[currentWorkerNodeKey].workerUsage.tasks - .executing === 0) - ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - this.flushTasksQueue(currentWorkerNodeKey) - // FIXME: wait for tasks to be finished - void (this.destroyWorker(workerCreated) as Promise) - } - }) - workerNodeKey = this.getWorkerNodeKey(workerCreated) - } else { - workerNodeKey = this.workerChoiceStrategyContext.execute() + private chooseWorkerNode (): number { + if (this.shallCreateDynamicWorker()) { + const worker = this.createAndSetupDynamicWorker() + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + ) { + return this.getWorkerNodeKey(worker) + } } - return workerNodeKey + return this.workerChoiceStrategyContext.execute() + } + + /** + * Conditions for dynamic worker creation. + * + * @returns Whether to create a dynamic worker or not. + */ + private shallCreateDynamicWorker (): boolean { + return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() } /** @@ -653,7 +680,9 @@ export abstract class AbstractPool< >(worker: Worker, listener: (message: MessageValue) => void): void /** - * Returns a newly created worker. + * Creates a new worker. + * + * @returns Newly created worker. */ protected abstract createWorker (): Worker @@ -680,8 +709,6 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - }) - worker.on('error', () => { if (this.opts.restartWorkerOnError === true) { this.createAndSetupWorker() } @@ -701,6 +728,33 @@ export abstract class AbstractPool< return worker } + /** + * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * + * @returns New, completely set up dynamic worker. + */ + protected createAndSetupDynamicWorker (): Worker { + const worker = this.createAndSetupWorker() + this.registerWorkerMessageListener(worker, message => { + const workerNodeKey = this.getWorkerNodeKey(worker) + if ( + isKillBehavior(KillBehaviors.HARD, message.kill) || + (message.kill != null && + ((this.opts.enableTasksQueue === false && + this.workerNodes[workerNodeKey].workerUsage.tasks.executing === + 0) || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].workerUsage.tasks.executing === + 0 && + this.tasksQueueSize(workerNodeKey) === 0))) + ) { + // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) + void (this.destroyWorker(worker) as Promise) + } + }) + return worker + } + /** * This function is the listener registered for each worker message. * @@ -713,10 +767,10 @@ export abstract class AbstractPool< const promiseResponse = this.promiseResponseMap.get(message.id) if (promiseResponse != null) { if (message.taskError != null) { - promiseResponse.reject(message.taskError.message) if (this.emitter != null) { this.emitter.emit(PoolEvents.taskError, message.taskError) } + promiseResponse.reject(message.taskError.message) } else { promiseResponse.resolve(message.data as Response) } @@ -732,6 +786,7 @@ export abstract class AbstractPool< this.dequeueTask(workerNodeKey) as Task ) } + this.workerChoiceStrategyContext.update(workerNodeKey) } } }