WorkerUsage
} from './worker'
import {
- type MeasurementStatisticsRequirements,
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
checkFilePath,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
- updateMeasurementStatistics,
+ getDefaultTasksQueueOptions,
+ updateEluWorkerUsage,
+ updateRunTimeWorkerUsage,
+ updateTaskStatisticsWorkerUsage,
+ updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
} from './utils'
}
}
- /**
- * Gets the given worker its worker node key.
- *
- * @param worker - The worker.
- * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
- */
- private getWorkerNodeKeyByWorker (worker: Worker): number {
- return this.workerNodes.findIndex(
- workerNode => workerNode.worker === worker
- )
- }
-
/**
* Gets the worker node key given its worker id.
*
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- ...{
- size: Math.pow(this.maxSize, 2),
- concurrency: 1,
- taskStealing: true,
- tasksStealingOnBackPressure: true
- },
+ ...getDefaultTasksQueueOptions(this.maxSize),
...tasksQueueOptions
}
}
workerNodeKey: number
): Promise<void> {
await new Promise<void>((resolve, reject) => {
+ if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
+ reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+ return
+ }
const killMessageListener = (message: MessageValue<Response>): void => {
this.checkMessageWorkerId(message)
if (message.kill === 'success') {
this.flagWorkerNodeAsNotReady(workerNodeKey)
const flushedTasks = this.flushTasksQueue(workerNodeKey)
const workerNode = this.workerNodes[workerNodeKey]
- await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
+ await waitWorkerNodeEvents(
+ workerNode,
+ 'taskFinished',
+ flushedTasks,
+ this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+ getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+ )
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
}
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(workerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ task
+ )
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNodeKey
].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
++taskFunctionWorkerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ task
+ )
}
}
workerNodeKey: number,
message: MessageValue<Response>
): void {
+ let needWorkerChoiceStrategyUpdate = false
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
- this.updateTaskStatisticsWorkerUsage(workerUsage, message)
- this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateEluWorkerUsage(workerUsage, message)
+ updateTaskStatisticsWorkerUsage(workerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ message
+ )
+ needWorkerChoiceStrategyUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
].getTaskFunctionWorkerUsage(
message.taskPerformance?.name as string
) as WorkerUsage
- this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
+ updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ needWorkerChoiceStrategyUpdate = true
+ }
+ if (needWorkerChoiceStrategyUpdate) {
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
)
}
- private updateTaskStatisticsWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- const workerTaskStatistics = workerUsage.tasks
- if (
- workerTaskStatistics.executing != null &&
- workerTaskStatistics.executing > 0
- ) {
- --workerTaskStatistics.executing
- }
- if (message.workerError == null) {
- ++workerTaskStatistics.executed
- } else {
- ++workerTaskStatistics.failed
- }
- }
-
- private updateRunTimeWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- updateMeasurementStatistics(
- workerUsage.runTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
- message.taskPerformance?.runTime ?? 0
- )
- }
-
- private updateWaitTimeWorkerUsage (
- workerUsage: WorkerUsage,
- task: Task<Data>
- ): void {
- const timestamp = performance.now()
- const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
- updateMeasurementStatistics(
- workerUsage.waitTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
- taskWaitTime
- )
- }
-
- private updateEluWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- updateMeasurementStatistics(
- workerUsage.elu.active,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.active ?? 0
- )
- updateMeasurementStatistics(
- workerUsage.elu.idle,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.idle ?? 0
- )
- if (eluTaskStatisticsRequirements.aggregate) {
- if (message.taskPerformance?.elu != null) {
- if (workerUsage.elu.utilization != null) {
- workerUsage.elu.utilization =
- (workerUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- } else {
- workerUsage.elu.utilization = message.taskPerformance.elu.utilization
- }
- }
- }
- }
-
/**
* Chooses a worker node for the next task.
*
if (this.started && this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode.terminate().catch(error => {
+ workerNode?.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (workerNodeKey === -1) {
+ return
+ }
if (this.workerNodes.length <= 1) {
return
}
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
- this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
- workerNode.emit('taskFinished', taskId)
- if (this.opts.enableTasksQueue === true) {
+ workerNode?.emit('taskFinished', taskId)
+ if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
env: this.opts.env,
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize:
- this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ this.opts.tasksQueueOptions?.size ??
+ getDefaultTasksQueueOptions(this.maxSize).size
}
)
// Flag the worker node as ready at pool startup.