X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=1aa1a13413f807019352907d2dac7a635ab665d1;hb=7daf8b1ee6ee28ba8c39c63067cf7071f8638177;hp=183344f84b42a2a55411788d1d414fb82383f9dc;hpb=8735b4e51c0cfabc9612d57417834d42042cab4e;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 183344f8..1aa1a134 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -709,7 +709,7 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo if ( name != null && Array.isArray(workerInfo.taskFunctions) && @@ -805,10 +805,17 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) - if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + ++workerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(workerUsage, task) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage @@ -828,11 +835,18 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) - if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + message.taskPerformance?.name as string + ) != null + ) { const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage( @@ -853,6 +867,7 @@ export abstract class AbstractPool< private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( + workerInfo != null && Array.isArray(workerInfo.taskFunctions) && workerInfo.taskFunctions.length > 2 ) @@ -952,7 +967,7 @@ export abstract class AbstractPool< if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { return workerNodeKey } @@ -1002,7 +1017,7 @@ export abstract class AbstractPool< worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', (error) => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) @@ -1056,13 +1071,16 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) workerInfo.dynamic = true - if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + ) { workerInfo.ready = true } this.checkAndEmitDynamicWorkerCreationEvents() @@ -1118,7 +1136,7 @@ export abstract class AbstractPool< elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number }) } @@ -1128,7 +1146,7 @@ export abstract class AbstractPool< let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) + const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo if ( workerNodeId !== workerNodeKey && workerInfo.ready && @@ -1182,8 +1200,10 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (message.taskFunctions != null) { // Task functions message received from worker - this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) + ( + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ) as WorkerInfo ).taskFunctions = message.taskFunctions } } @@ -1195,7 +1215,7 @@ export abstract class AbstractPool< } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ) + ) as WorkerInfo workerInfo.ready = message.ready as boolean workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { @@ -1257,8 +1277,8 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { + return this.workerNodes[workerNodeKey]?.info } /**