X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=a9f6edd73099454681644b56693b8978cec9f81e;hb=7f0e133432b540753bcdd06160d1363e9fe65437;hp=b6f3a5d08bd838426828ecd282e73f175b7e274a;hpb=39618ede0e08d380c1ac82005bcc2ab1f3c227b6;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b6f3a5d0..a9f6edd7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -91,7 +91,7 @@ export abstract class AbstractPool< /** * Worker choice strategy context referencing a worker choice algorithm implementation. */ - protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< + protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext< Worker, Data, Response @@ -189,7 +189,9 @@ export abstract class AbstractPool< } } - private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void { + private checkMinimumNumberOfWorkers ( + minimumNumberOfWorkers: number | undefined + ): void { if (minimumNumberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' @@ -210,13 +212,11 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!) + checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategyOptions( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - opts.workerChoiceStrategyOptions! + opts.workerChoiceStrategyOptions ) if (opts.workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions @@ -225,11 +225,9 @@ export abstract class AbstractPool< this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - checkValidTasksQueueOptions(opts.tasksQueueOptions!) + checkValidTasksQueueOptions(opts.tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - opts.tasksQueueOptions! + opts.tasksQueueOptions ) } } else { @@ -238,7 +236,7 @@ export abstract class AbstractPool< } private checkValidWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined ): void { if ( workerChoiceStrategyOptions != null && @@ -288,9 +286,11 @@ export abstract class AbstractPool< minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .runTime.aggregate && - this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .waitTime.aggregate && { utilization: round(this.utilization) }), + ?.runTime.aggregate === true && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.aggregate && { + utilization: round(this.utilization) + }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => @@ -331,7 +331,7 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + accumulator + (workerNode.usage.tasks.maxQueued ?? 0), 0 ) }), @@ -351,23 +351,23 @@ export abstract class AbstractPool< 0 ), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .runTime.aggregate && { + ?.runTime.aggregate === true && { runTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + workerNode => workerNode.usage.runTime.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + workerNode => workerNode.usage.runTime.maximum ?? -Infinity ) ) ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -379,7 +379,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -394,23 +394,23 @@ export abstract class AbstractPool< } }), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .waitTime.aggregate && { + ?.waitTime.aggregate === true && { waitTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + workerNode => workerNode.usage.waitTime.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + workerNode => workerNode.usage.waitTime.maximum ?? -Infinity ) ) ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -422,7 +422,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -465,12 +465,12 @@ export abstract class AbstractPool< (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), + accumulator + (workerNode.usage.runTime.aggregate ?? 0), 0 ) const totalTasksWaitTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), + accumulator + (workerNode.usage.waitTime.aggregate ?? 0), 0 ) return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity @@ -523,7 +523,7 @@ export abstract class AbstractPool< ): void { checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy - this.workerChoiceStrategyContext.setWorkerChoiceStrategy( + this.workerChoiceStrategyContext?.setWorkerChoiceStrategy( this.opts.workerChoiceStrategy ) if (workerChoiceStrategyOptions != null) { @@ -537,13 +537,13 @@ export abstract class AbstractPool< /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } - this.workerChoiceStrategyContext.setOptions( + this.workerChoiceStrategyContext?.setOptions( this.opts.workerChoiceStrategyOptions ) } @@ -559,12 +559,13 @@ export abstract class AbstractPool< this.flushTasksQueues() } this.opts.enableTasksQueue = enable - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.setTasksQueueOptions(tasksQueueOptions!) + this.setTasksQueueOptions(tasksQueueOptions) } /** @inheritDoc */ - public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void { + public setTasksQueueOptions ( + tasksQueueOptions: TasksQueueOptions | undefined + ): void { if (this.opts.enableTasksQueue === true) { checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = @@ -589,7 +590,7 @@ export abstract class AbstractPool< } private buildTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: TasksQueueOptions | undefined ): TasksQueueOptions { return { ...getDefaultTasksQueueOptions( @@ -607,18 +608,15 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent - ) + this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } } @@ -627,7 +625,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -636,7 +634,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -705,24 +703,17 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const workerId = this.getWorkerInfo(workerNodeKey).id! + const workerId = this.getWorkerInfo(workerNodeKey)?.id if ( message.taskFunctionOperationStatus != null && message.workerId === workerId ) { if (message.taskFunctionOperationStatus) { resolve(true) - } else if (!message.taskFunctionOperationStatus) { + } else { reject( new Error( - `Task function operation '${ - message.taskFunctionOperation as string - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - }' failed on worker ${message.workerId} with error: '${ - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - message.workerError!.message - }'` + `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'` ) ) } @@ -770,11 +761,8 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - }' failed on worker ${errorResponse! - .workerId!} with error: '${ - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - errorResponse!.workerError!.message + }' failed on worker ${errorResponse?.workerId} with error: '${ + errorResponse?.workerError?.message }'` ) ) @@ -1001,7 +989,8 @@ export abstract class AbstractPool< private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { - if (this.workerNodes?.[workerNodeKey] == null) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (this.workerNodes[workerNodeKey] == null) { resolve() return } @@ -1012,8 +1001,7 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - `Kill message handling failed on worker ${message.workerId!}` + `Kill message handling failed on worker ${message.workerId}` ) ) } @@ -1072,6 +1060,7 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing @@ -1113,6 +1102,7 @@ export abstract class AbstractPool< message: MessageValue ): void { let needWorkerChoiceStrategyUpdate = false + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage updateTaskStatisticsWorkerUsage(workerUsage, message) @@ -1154,7 +1144,7 @@ export abstract class AbstractPool< needWorkerChoiceStrategyUpdate = true } if (needWorkerChoiceStrategyUpdate) { - this.workerChoiceStrategyContext.update(workerNodeKey) + this.workerChoiceStrategyContext?.update(workerNodeKey) } } @@ -1184,12 +1174,14 @@ export abstract class AbstractPool< if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategyContext?.getStrategyPolicy() + .dynamicWorkerUsage === true ) { return workerNodeKey } } - return this.workerChoiceStrategyContext.execute() + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this.workerChoiceStrategyContext!.execute() } /** @@ -1252,6 +1244,7 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) }) @@ -1298,7 +1291,6 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { checkActive: true }) @@ -1313,12 +1305,15 @@ export abstract class AbstractPool< }) } } - workerInfo.dynamic = true + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.dynamic = true if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategyContext?.getStrategyPolicy() + .dynamicWorkerReady === true || + this.workerChoiceStrategyContext?.getStrategyPolicy() + .dynamicWorkerUsage === true ) { - workerInfo.ready = true + workerNode.info.ready = true } this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey @@ -1382,14 +1377,14 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1411,10 +1406,11 @@ export abstract class AbstractPool< this.sendToWorker(workerNodeKey, { statistics: { runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ?.runTime.aggregate ?? false, + elu: + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu + .aggregate ?? false } }) } @@ -1459,6 +1455,7 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.stolen } @@ -1477,6 +1474,7 @@ export abstract class AbstractPool< workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } @@ -1502,6 +1500,7 @@ export abstract class AbstractPool< workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } @@ -1523,34 +1522,36 @@ export abstract class AbstractPool< } } - private readonly handleIdleWorkerNodeEvent = ( + private readonly handleWorkerNodeIdleEvent = ( eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( - 'WorkerNode event detail workerNodeKey property must be defined' + "WorkerNode event detail 'workerNodeKey' property must be defined" ) } + const workerInfo = this.getWorkerInfo(workerNodeKey) if ( this.cannotStealTask() || - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) + (this.info.stealingWorkerNodes ?? 0) > + Math.floor(this.workerNodes.length / 2) ) { - if (previousStolenTask != null) { - this.getWorkerInfo(workerNodeKey).stealing = false + if (workerInfo != null && previousStolenTask != null) { + workerInfo.stealing = false } return } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( + workerInfo != null && previousStolenTask != null && workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { - this.getWorkerInfo(workerNodeKey).stealing = false + workerInfo.stealing = false // eslint-disable-next-line @typescript-eslint/no-non-null-assertion for (const taskName of this.workerNodes[workerNodeKey].info .taskFunctionNames!) { @@ -1562,7 +1563,12 @@ export abstract class AbstractPool< this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } - this.getWorkerInfo(workerNodeKey).stealing = true + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } + workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1594,7 +1600,7 @@ export abstract class AbstractPool< } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) + this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1627,13 +1633,13 @@ export abstract class AbstractPool< } } - private readonly handleBackPressureEvent = ( + private readonly handleWorkerNodeBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { if ( this.cannotStealTask() || - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) + (this.info.stealingWorkerNodes ?? 0) > + Math.floor(this.workerNodes.length / 2) ) { return } @@ -1661,13 +1667,19 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.opts.tasksQueueOptions!.size! - sizeOffset ) { - this.getWorkerInfo(workerNodeKey).stealing = true + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } + workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const task = sourceWorkerNode.popTask()! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) - this.getWorkerInfo(workerNodeKey).stealing = false + workerInfo.stealing = false } } } @@ -1688,26 +1700,27 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (taskFunctionNames != null) { // Task function names message received from worker - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(workerId) - ).taskFunctionNames = taskFunctionNames + ) + if (workerInfo != null) { + workerInfo.taskFunctionNames = taskFunctionNames + } } } private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message - if (ready === false) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - throw new Error(`Worker ${workerId!} failed to initialize`) + if (ready == null || !ready) { + throw new Error(`Worker ${workerId} failed to initialize`) } - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) - workerInfo.ready = ready as boolean - workerInfo.taskFunctionNames = taskFunctionNames + const workerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + workerNode.info.ready = ready + workerNode.info.taskFunctionNames = taskFunctionNames if (!this.readyEventEmitted && this.ready) { - this.readyEventEmitted = true this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true } } @@ -1736,6 +1749,7 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.emit('taskFinished', taskId) if (this.opts.enableTasksQueue === true && !this.destroying) { const workerNodeTasksUsage = workerNode.usage.tasks @@ -1753,9 +1767,8 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - workerNode.emit('idleWorkerNode', { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerId: workerId!, + workerNode.emit('idle', { + workerId, workerNodeKey }) } @@ -1786,7 +1799,7 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { return this.workerNodes[workerNodeKey]?.info } @@ -1841,20 +1854,15 @@ export abstract class AbstractPool< const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) - this.workerChoiceStrategyContext.remove(workerNodeKey) + this.workerChoiceStrategyContext?.remove(workerNodeKey) } } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { - this.getWorkerInfo(workerNodeKey).ready = false - } - - /** @inheritDoc */ - public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.ready = false + } } private hasBackPressure (): boolean {