X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=b86a3b13a6b538efb4024c0ebe8e49df22e4a1a8;hb=afa30fd0dd5bbc5d305cc30be663966a449efc08;hp=065b228ada73e6d21f6fb8238bddf403cc04ac83;hpb=8d20e449d72975f6add9177d1097d5a204d14f71;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 065b228a..b86a3b13 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -95,12 +95,6 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) - this.setupHook() - - for (let i = 1; i <= this.numberOfWorkers; i++) { - this.createAndSetupWorker() - } - if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() } @@ -113,6 +107,12 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy, this.opts.workerChoiceStrategyOptions ) + + this.setupHook() + + for (let i = 1; i <= this.numberOfWorkers; i++) { + this.createAndSetupWorker() + } } private checkFilePath (filePath: string): void { @@ -288,6 +288,12 @@ export abstract class AbstractPool< ): void { this.checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy + this.workerChoiceStrategyContext.setWorkerChoiceStrategy( + this.opts.workerChoiceStrategy + ) + if (workerChoiceStrategyOptions != null) { + this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + } for (const workerNode of this.workerNodes) { this.setWorkerNodeTasksUsage(workerNode, { ran: 0, @@ -303,12 +309,7 @@ export abstract class AbstractPool< error: 0, elu: undefined }) - } - this.workerChoiceStrategyContext.setWorkerChoiceStrategy( - this.opts.workerChoiceStrategy - ) - if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + this.setWorkerStatistics(workerNode.worker) } } @@ -341,7 +342,7 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - } else { + } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } @@ -380,13 +381,13 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { - const submissionTimestamp = performance.now() + const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), - submissionTimestamp, + timestamp, id: crypto.randomUUID() } const res = new Promise((resolve, reject) => { @@ -483,17 +484,17 @@ export abstract class AbstractPool< workerTasksUsage: TasksUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) { + if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) { workerTasksUsage.runTime += message.runTime ?? 0 if ( - this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime && + this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime && workerTasksUsage.ran !== 0 ) { workerTasksUsage.avgRunTime = workerTasksUsage.runTime / workerTasksUsage.ran } if ( - this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime && + this.workerChoiceStrategyContext.getTaskStatistics().medRunTime && message.runTime != null ) { workerTasksUsage.runTimeHistory.push(message.runTime) @@ -506,17 +507,17 @@ export abstract class AbstractPool< workerTasksUsage: TasksUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) { + if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) { workerTasksUsage.waitTime += message.waitTime ?? 0 if ( - this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime && + this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime && workerTasksUsage.ran !== 0 ) { workerTasksUsage.avgWaitTime = workerTasksUsage.waitTime / workerTasksUsage.ran } if ( - this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime && + this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime && message.waitTime != null ) { workerTasksUsage.waitTimeHistory.push(message.waitTime) @@ -529,9 +530,8 @@ export abstract class AbstractPool< workerTasksUsage: TasksUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getRequiredStatistics().elu) { + if (this.workerChoiceStrategyContext.getTaskStatistics().elu) { if (workerTasksUsage.elu != null && message.elu != null) { - // TODO: cumulative or delta? workerTasksUsage.elu = { idle: workerTasksUsage.elu.idle + message.elu.idle, active: workerTasksUsage.elu.active + message.elu.active, @@ -625,11 +625,11 @@ export abstract class AbstractPool< this.emitter.emit(PoolEvents.error, error) } }) - if (this.opts.restartWorkerOnError === true) { - worker.on('error', () => { + worker.on('error', () => { + if (this.opts.restartWorkerOnError === true) { this.createAndSetupWorker() - }) - } + } + }) worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { @@ -638,6 +638,8 @@ export abstract class AbstractPool< this.pushWorkerNode(worker) + this.setWorkerStatistics(worker) + this.afterWorkerSetup(worker) return worker @@ -800,4 +802,14 @@ export abstract class AbstractPool< this.flushTasksQueue(workerNodeKey) } } + + private setWorkerStatistics (worker: Worker): void { + this.sendToWorker(worker, { + statistics: { + runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime, + waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime, + elu: this.workerChoiceStrategyContext.getTaskStatistics().elu + } + }) + } }