X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=65ace6d907a20ca5a88d820d638c1aaa1a6f8bf1;hb=cdaecaee1c7fa5c412daf29f2db41470506793ac;hp=4170b8b61cd6f6e1f0de861ca6c94ee6278cb58f;hpb=c63a35a04c190989be80f9218d97e0aca739475e;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4170b8b6..65ace6d9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -703,7 +703,7 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - const workerId = this.getWorkerInfo(workerNodeKey).id + const workerId = this.getWorkerInfo(workerNodeKey)?.id if ( message.taskFunctionOperationStatus != null && message.workerId === workerId @@ -1061,7 +1061,7 @@ export abstract class AbstractPool< task: Task ): void { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (this.workerNodes[workerNodeKey].usage != null) { + if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing updateWaitTimeWorkerUsage( @@ -1103,7 +1103,7 @@ export abstract class AbstractPool< ): void { let needWorkerChoiceStrategyUpdate = false // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (this.workerNodes[workerNodeKey].usage != null) { + if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage updateTaskStatisticsWorkerUsage(workerUsage, message) updateRunTimeWorkerUsage( @@ -1157,6 +1157,7 @@ export abstract class AbstractPool< private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( + workerInfo != null && Array.isArray(workerInfo.taskFunctionNames) && workerInfo.taskFunctionNames.length > 2 ) @@ -1180,7 +1181,7 @@ export abstract class AbstractPool< } } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategyContext!.execute()! + return this.workerChoiceStrategyContext!.execute() } /** @@ -1454,7 +1455,7 @@ export abstract class AbstractPool< ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (workerNode.usage != null) { + if (workerNode?.usage != null) { ++workerNode.usage.tasks.stolen } if ( @@ -1473,7 +1474,7 @@ export abstract class AbstractPool< ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (workerNode.usage != null) { + if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } } @@ -1499,7 +1500,7 @@ export abstract class AbstractPool< ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (workerNode.usage != null) { + if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } } @@ -1536,13 +1537,14 @@ export abstract class AbstractPool< (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { - if (previousStolenTask != null) { + if (workerInfo != null && previousStolenTask != null) { workerInfo.stealing = false } return } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( + workerInfo != null && previousStolenTask != null && workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || @@ -1560,6 +1562,11 @@ export abstract class AbstractPool< this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( @@ -1660,6 +1667,11 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions!.size! - sizeOffset ) { const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const task = sourceWorkerNode.popTask()! @@ -1687,9 +1699,12 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (taskFunctionNames != null) { // Task function names message received from worker - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(workerId) - ).taskFunctionNames = taskFunctionNames + ) + if (workerInfo != null) { + workerInfo.taskFunctionNames = taskFunctionNames + } } } @@ -1733,7 +1748,8 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) - workerNode.emit('taskFinished', taskId) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.emit('taskFinished', taskId) if (this.opts.enableTasksQueue === true && !this.destroying) { const workerNodeTasksUsage = workerNode.usage.tasks if ( @@ -1783,13 +1799,8 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { - const workerInfo = this.workerNodes[workerNodeKey]?.info - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (workerInfo == null) { - throw new Error(`Worker node with key '${workerNodeKey}' not found`) - } - return workerInfo + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { + return this.workerNodes[workerNodeKey]?.info } /** @@ -1848,7 +1859,10 @@ export abstract class AbstractPool< } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { - this.getWorkerInfo(workerNodeKey).ready = false + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.ready = false + } } private hasBackPressure (): boolean {