protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
- return workerNode.tasksUsage?.running === 0
+ return workerNode.tasksUsage.running === 0
}) === -1
)
}
} else {
this.executeTask(workerNodeKey, submittedTask)
}
+ this.workerChoiceStrategyContext.update(workerNodeKey)
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
worker: Worker,
message: MessageValue<Response>
): void {
- const workerTasksUsage = this.getWorkerTasksUsage(worker)
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
--workerTasksUsage.running
++workerTasksUsage.run
if (message.error != null) {
workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
}
}
- this.workerChoiceStrategyContext.update()
}
/**
if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
const workerCreated = this.createAndSetupWorker()
this.registerWorkerMessageListener(workerCreated, message => {
+ const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
- this.getWorkerTasksUsage(workerCreated)?.running === 0)
+ this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- this.flushTasksQueueByWorker(workerCreated)
+ this.flushTasksQueue(currentWorkerNodeKey)
void (this.destroyWorker(workerCreated) as Promise<void>)
}
})
workerNode.tasksUsage = tasksUsage
}
- /**
- * Gets the given worker its tasks usage in the pool.
- *
- * @param worker - The worker.
- * @throws Error if the worker is not found in the pool worker nodes.
- * @returns The worker tasks usage.
- */
- private getWorkerTasksUsage (worker: Worker): TasksUsage {
- const workerNodeKey = this.getWorkerNodeKey(worker)
- if (workerNodeKey !== -1) {
- return this.workerNodes[workerNodeKey].tasksUsage
- }
- throw new Error('Worker could not be found in the pool worker nodes')
- }
-
/**
* Pushes the given worker in the pool worker nodes.
*
}
}
- private flushTasksQueueByWorker (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKey(worker)
- this.flushTasksQueue(workerNodeKey)
- }
-
private flushTasksQueues (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.flushTasksQueue(workerNodeKey)