private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
- )
+ this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
})
}
})
- const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
checkActive: true
})
})
}
}
- workerInfo.dynamic = true
+ const workerNode = this.workerNodes[workerNodeKey]
+ workerNode.info.dynamic = true
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
- workerInfo.ready = true
+ workerNode.info.ready = true
}
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
}
}
- private readonly handleIdleWorkerNodeEvent = (
+ private readonly handleWorkerNodeIdleEvent = (
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
}
}
- private readonly handleBackPressureEvent = (
+ private readonly handleWorkerNodeBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
if (
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
+ if (ready == null || !ready) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
throw new Error(`Worker ${workerId!} failed to initialize`)
}
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ const workerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionNames = taskFunctionNames
if (!this.readyEventEmitted && this.ready) {
this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- workerNode.emit('idleWorkerNode', {
+ workerNode.emit('idle', {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
workerId: workerId!,
workerNodeKey