message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const workerId = this.getWorkerInfo(workerNodeKey).id
+ const workerId = this.getWorkerInfo(workerNodeKey)?.id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
+ workerInfo != null &&
Array.isArray(workerInfo.taskFunctionNames) &&
workerInfo.taskFunctionNames.length > 2
)
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode.terminate().catch(error => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
- if (previousStolenTask != null) {
+ if (workerInfo != null && previousStolenTask != null) {
workerInfo.stealing = false
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
+ workerInfo != null &&
previousStolenTask != null &&
workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const task = sourceWorkerNode.popTask()!
this.handleTaskExecutionResponse(message)
} else if (taskFunctionNames != null) {
// Task function names message received from worker
- this.getWorkerInfo(
+ const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(workerId)
- ).taskFunctionNames = taskFunctionNames
+ )
+ if (workerInfo != null) {
+ workerInfo.taskFunctionNames = taskFunctionNames
+ }
}
}
this.afterTaskExecutionHook(workerNodeKey, message)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
- workerNode.emit('taskFinished', taskId)
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
- protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- const workerInfo = this.workerNodes[workerNodeKey]?.info
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (workerInfo == null) {
- throw new Error(`Worker node with key '${workerNodeKey}' not found`)
- }
- return workerInfo
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
+ return this.workerNodes[workerNodeKey]?.info
}
/**
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
- this.getWorkerInfo(workerNodeKey).ready = false
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.ready = false
+ }
}
private hasBackPressure (): boolean {