ready: this.ready,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
strategy: this.opts.workerChoiceStrategy!,
+ strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
* The pool readiness boolean status.
*/
private get ready (): boolean {
+ if (this.empty) {
+ return false
+ }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
)
}
+ /**
+ * The pool emptiness boolean status.
+ */
+ protected get empty (): boolean {
+ return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+ }
+
/**
* The approximate pool utilization.
*
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const workerId = this.getWorkerInfo(workerNodeKey).id
+ const workerId = this.getWorkerInfo(workerNodeKey)?.id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
+ workerInfo != null &&
Array.isArray(workerInfo.taskFunctionNames) &&
workerInfo.taskFunctionNames.length > 2
)
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode.terminate().catch(error => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
- const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+ const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
- if (previousStolenTask != null) {
+ if (workerInfo != null && previousStolenTask != null) {
workerInfo.stealing = false
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
+ workerInfo != null &&
previousStolenTask != null &&
workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
- .catch(EMPTY_FUNCTION)
+ .catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
}
private readonly workerNodeStealTask = (
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const task = sourceWorkerNode.popTask()!
this.handleTaskExecutionResponse(message)
} else if (taskFunctionNames != null) {
// Task function names message received from worker
- this.getWorkerInfo(
+ const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(workerId)
- ).taskFunctionNames = taskFunctionNames
+ )
+ if (workerInfo != null) {
+ workerInfo.taskFunctionNames = taskFunctionNames
+ }
+ }
+ }
+
+ private checkAndEmitReadyEvent (): void {
+ if (!this.readyEventEmitted && this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
}
}
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
workerNode.info.taskFunctionNames = taskFunctionNames
- if (!this.readyEventEmitted && this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
- this.readyEventEmitted = true
- }
+ this.checkAndEmitReadyEvent()
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
this.afterTaskExecutionHook(workerNodeKey, message)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
- workerNode.emit('taskFinished', taskId)
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
workerNodeTasksUsage.sequentiallyStolen === 0
) {
workerNode.emit('idle', {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerId: workerId!,
+ workerId,
workerNodeKey
})
}
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
- protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- const workerInfo = this.workerNodes[workerNodeKey]?.info
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (workerInfo == null) {
- throw new Error(`Worker node with key '${workerNodeKey}' not found`)
- }
- return workerInfo
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
+ return this.workerNodes[workerNodeKey]?.info
}
/**
return workerNodeKey
}
+ private checkAndEmitEmptyEvent (): void {
+ if (this.empty) {
+ this.emitter?.emit(PoolEvents.empty, this.info)
+ this.readyEventEmitted = false
+ }
+ }
+
/**
* Removes the worker node from the pool worker nodes.
*
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
+ this.checkAndEmitEmptyEvent()
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
- this.getWorkerInfo(workerNodeKey).ready = false
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.ready = false
+ }
}
private hasBackPressure (): boolean {