isKillBehavior,
isPlainObject,
median,
- round
+ round,
+ updateMeasurementStatistics
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
import {
import type {
IWorker,
IWorkerNode,
- MessageHandler,
WorkerInfo,
WorkerType,
WorkerUsage
} from './worker'
import {
+ type MeasurementStatisticsRequirements,
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
* @param worker - The worker.
* @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
*/
- private getWorkerNodeKey (worker: Worker): number {
+ protected getWorkerNodeKey (worker: Worker): number {
return this.workerNodes.findIndex(
workerNode => workerNode.worker === worker
)
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
- .aggregate
- ) {
- const taskRunTime = message.taskPerformance?.runTime ?? 0
- workerUsage.runTime.aggregate =
- (workerUsage.runTime.aggregate ?? 0) + taskRunTime
- workerUsage.runTime.minimum = Math.min(
- taskRunTime,
- workerUsage.runTime?.minimum ?? Infinity
- )
- workerUsage.runTime.maximum = Math.max(
- taskRunTime,
- workerUsage.runTime?.maximum ?? -Infinity
- )
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
- .average &&
- workerUsage.tasks.executed !== 0
- ) {
- workerUsage.runTime.average =
- workerUsage.runTime.aggregate / workerUsage.tasks.executed
- }
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
- .median &&
- message.taskPerformance?.runTime != null
- ) {
- workerUsage.runTime.history.push(message.taskPerformance.runTime)
- workerUsage.runTime.median = median(workerUsage.runTime.history)
- }
- }
+ updateMeasurementStatistics(
+ workerUsage.runTime,
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
+ message.taskPerformance?.runTime ?? 0,
+ workerUsage.tasks.executed
+ )
}
private updateWaitTimeWorkerUsage (
): void {
const timestamp = performance.now()
const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
- .aggregate
- ) {
- workerUsage.waitTime.aggregate =
- (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
- workerUsage.waitTime.minimum = Math.min(
- taskWaitTime,
- workerUsage.waitTime?.minimum ?? Infinity
- )
- workerUsage.waitTime.maximum = Math.max(
- taskWaitTime,
- workerUsage.waitTime?.maximum ?? -Infinity
- )
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime.average &&
- workerUsage.tasks.executed !== 0
- ) {
- workerUsage.waitTime.average =
- workerUsage.waitTime.aggregate / workerUsage.tasks.executed
- }
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime.median
- ) {
- workerUsage.waitTime.history.push(taskWaitTime)
- workerUsage.waitTime.median = median(workerUsage.waitTime.history)
- }
- }
+ updateMeasurementStatistics(
+ workerUsage.waitTime,
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
+ taskWaitTime,
+ workerUsage.tasks.executed
+ )
}
private updateEluWorkerUsage (
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (
+ const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- .aggregate
- ) {
+ updateMeasurementStatistics(
+ workerUsage.elu.active,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.active ?? 0,
+ workerUsage.tasks.executed
+ )
+ updateMeasurementStatistics(
+ workerUsage.elu.idle,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.idle ?? 0,
+ workerUsage.tasks.executed
+ )
+ if (eluTaskStatisticsRequirements.aggregate) {
if (message.taskPerformance?.elu != null) {
- workerUsage.elu.idle.aggregate =
- (workerUsage.elu.idle?.aggregate ?? 0) +
- message.taskPerformance.elu.idle
- workerUsage.elu.active.aggregate =
- (workerUsage.elu.active?.aggregate ?? 0) +
- message.taskPerformance.elu.active
if (workerUsage.elu.utilization != null) {
workerUsage.elu.utilization =
(workerUsage.elu.utilization +
} else {
workerUsage.elu.utilization = message.taskPerformance.elu.utilization
}
- workerUsage.elu.idle.minimum = Math.min(
- message.taskPerformance.elu.idle,
- workerUsage.elu.idle?.minimum ?? Infinity
- )
- workerUsage.elu.idle.maximum = Math.max(
- message.taskPerformance.elu.idle,
- workerUsage.elu.idle?.maximum ?? -Infinity
- )
- workerUsage.elu.active.minimum = Math.min(
- message.taskPerformance.elu.active,
- workerUsage.elu.active?.minimum ?? Infinity
- )
- workerUsage.elu.active.maximum = Math.max(
- message.taskPerformance.elu.active,
- workerUsage.elu.active?.maximum ?? -Infinity
- )
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- .average &&
- workerUsage.tasks.executed !== 0
- ) {
- workerUsage.elu.idle.average =
- workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
- workerUsage.elu.active.average =
- workerUsage.elu.active.aggregate / workerUsage.tasks.executed
- }
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- .median
- ) {
- workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
- workerUsage.elu.active.history.push(
- message.taskPerformance.elu.active
- )
- workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
- workerUsage.elu.active.median = median(workerUsage.elu.active.history)
- }
}
}
}
const workerNodeKey = this.getWorkerNodeKey(worker)
const workerInfo = this.getWorkerInfo(workerNodeKey)
workerInfo.ready = false
+ this.workerNodes[workerNodeKey].closeChannel()
this.emitter?.emit(PoolEvents.error, error)
if (this.opts.restartWorkerOnError === true && !this.starting) {
if (workerInfo.dynamic) {
void (this.destroyWorker(worker) as Promise<void>)
}
})
- const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ const workerInfo = this.getWorkerInfoByWorker(worker)
workerInfo.dynamic = true
if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
workerInfo.ready = true
* @param worker - The worker which should register a listener.
* @param listener - The message listener callback.
*/
- private registerWorkerMessageListener<Message extends Data | Response>(
- worker: Worker,
- listener: (message: MessageValue<Message>) => void
- ): void {
- worker.on('message', listener as MessageHandler<Worker>)
- }
+ protected abstract registerWorkerMessageListener<
+ Message extends Data | Response
+ >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
/**
* Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
- // Send startup message to worker.
- this.sendWorkerStartupMessage(worker)
+ // Send the startup message to worker.
+ this.sendStartupMessageToWorker(worker)
// Setup worker task statistics computation.
this.setWorkerStatistics(worker)
}
- private sendWorkerStartupMessage (worker: Worker): void {
- this.sendToWorker(worker, {
- ready: false,
- workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
- })
- }
+ /**
+ * Sends the startup message to the given worker.
+ *
+ * @param worker - The worker which should receive the startup message.
+ */
+ protected abstract sendStartupMessageToWorker (worker: Worker): void
private redistributeQueuedTasks (workerNodeKey: number): void {
while (this.tasksQueueSize(workerNodeKey) > 0) {
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- const worker = this.getWorkerById(message.workerId)
- this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
- message.ready as boolean
+ this.getWorkerInfoByWorker(
+ this.getWorkerById(message.workerId) as Worker
+ ).ready = message.ready as boolean
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
/**
- * Gets the worker information.
+ * Gets the worker information from the given worker node key.
*
* @param workerNodeKey - The worker node key.
+ * @returns The worker information.
*/
private getWorkerInfo (workerNodeKey: number): WorkerInfo {
return this.workerNodes[workerNodeKey].info
}
+ /**
+ * Gets the worker information from the given worker.
+ *
+ * @param worker - The worker.
+ * @returns The worker information.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
+ */
+ protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ if (workerNodeKey === -1) {
+ throw new Error('Worker not found')
+ }
+ return this.workerNodes[workerNodeKey].info
+ }
+
/**
* Adds the given worker in the pool worker nodes.
*
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
},
- workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ workerId: this.getWorkerInfoByWorker(worker).id as number
})
}
}