Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const workerId = this.getWorkerInfo(workerNodeKey).id
+ const workerId = this.getWorkerInfo(workerNodeKey)?.id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
Array.isArray(workerInfo.taskFunctionNames) &&
workerInfo.taskFunctionNames.length > 2
)
Array.isArray(workerInfo.taskFunctionNames) &&
workerInfo.taskFunctionNames.length > 2
)
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
- if (previousStolenTask != null) {
+ if (workerInfo != null && previousStolenTask != null) {
workerInfo.stealing = false
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
workerInfo.stealing = false
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
previousStolenTask != null &&
workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
const workerInfo = this.getWorkerInfo(workerNodeKey)
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const task = sourceWorkerNode.popTask()!
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const task = sourceWorkerNode.popTask()!
this.handleTaskExecutionResponse(message)
} else if (taskFunctionNames != null) {
// Task function names message received from worker
this.handleTaskExecutionResponse(message)
} else if (taskFunctionNames != null) {
// Task function names message received from worker
+ const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(workerId)
this.getWorkerNodeKeyByWorkerId(workerId)
- ).taskFunctionNames = taskFunctionNames
+ )
+ if (workerInfo != null) {
+ workerInfo.taskFunctionNames = taskFunctionNames
+ }
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
- protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- const workerInfo = this.workerNodes[workerNodeKey]?.info
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (workerInfo == null) {
- throw new Error(`Worker node with key '${workerNodeKey}' not found`)
- }
- return workerInfo
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
+ return this.workerNodes[workerNodeKey]?.info
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
- this.getWorkerInfo(workerNodeKey).ready = false
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.ready = false
+ }
}
private hasBackPressure (): boolean {
}
private hasBackPressure (): boolean {
): void {
this.workerNodes[workerNodeKey].worker.send({
...message,
): void {
this.workerNodes[workerNodeKey].worker.send({
...message,
- workerId: this.getWorkerInfo(workerNodeKey).id
+ workerId: this.getWorkerInfo(workerNodeKey)?.id
transferList?: TransferListItem[]
): void {
this.workerNodes[workerNodeKey].messageChannel?.port1.postMessage(
transferList?: TransferListItem[]
): void {
this.workerNodes[workerNodeKey].messageChannel?.port1.postMessage(
- { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
+ { ...message, workerId: this.getWorkerInfo(workerNodeKey)?.id },
workerNode.worker.postMessage(
{
ready: false,
workerNode.worker.postMessage(
{
ready: false,
- workerId: this.getWorkerInfo(workerNodeKey).id,
+ workerId: this.getWorkerInfo(workerNodeKey)?.id,