/**
* Worker choice strategy context referencing a worker choice algorithm implementation.
- *
- * Default to a round robin algorithm.
*/
protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
Worker,
*/
protected abstract get busy (): boolean
+ /**
+ * Whether worker nodes are executing at least one task.
+ *
+ * @returns Worker nodes busyness boolean status.
+ */
protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
}
/**
- * Shutdowns the given worker.
+ * Terminates the given worker.
*
* @param worker - A worker within `workerNodes`.
*/
): void {
const workerUsage =
this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+ this.updateTaskStatisticsWorkerUsage(workerUsage, message)
+ this.updateRunTimeWorkerUsage(workerUsage, message)
+ this.updateEluWorkerUsage(workerUsage, message)
+ }
+
+ private updateTaskStatisticsWorkerUsage (
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void {
const workerTaskStatistics = workerUsage.tasks
--workerTaskStatistics.executing
++workerTaskStatistics.executed
if (message.taskError != null) {
++workerTaskStatistics.failed
}
- this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateEluWorkerUsage(workerUsage, message)
}
private updateRunTimeWorkerUsage (
workerUsage.tasks.executed !== 0
) {
workerUsage.runTime.average =
- workerUsage.runTime.aggregate / workerUsage.tasks.executed
+ workerUsage.runTime.aggregate /
+ (workerUsage.tasks.executed - workerUsage.tasks.failed)
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
workerUsage.tasks.executed !== 0
) {
workerUsage.waitTime.average =
- workerUsage.waitTime.aggregate / workerUsage.tasks.executed
+ workerUsage.waitTime.aggregate /
+ (workerUsage.tasks.executed - workerUsage.tasks.failed)
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
}
private updateEluWorkerUsage (
- workerTasksUsage: WorkerUsage,
+ workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) {
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .aggregate
+ ) {
+ if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else if (message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
if (
- workerTasksUsage.elu != null &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .average &&
+ workerUsage.tasks.executed !== 0
+ ) {
+ const executedTasks =
+ workerUsage.tasks.executed - workerUsage.tasks.failed
+ workerUsage.elu.idle.average =
+ workerUsage.elu.idle.aggregate / executedTasks
+ workerUsage.elu.active.average =
+ workerUsage.elu.active.aggregate / executedTasks
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .median &&
message.taskPerformance?.elu != null
) {
- workerTasksUsage.elu = {
- idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
- active:
- workerTasksUsage.elu.active + message.taskPerformance.elu.active,
- utilization:
- (workerTasksUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- }
- } else if (message.taskPerformance?.elu != null) {
- workerTasksUsage.elu = message.taskPerformance.elu
+ workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
+ workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
+ workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
+ workerUsage.elu.active.median = median(workerUsage.elu.active.history)
}
}
}
/**
* Chooses a worker node for the next task.
*
- * The default worker choice strategy uses a round robin algorithm to distribute the load.
+ * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
*
* @returns The worker node key
*/
- protected chooseWorkerNode (): number {
- let workerNodeKey: number
- if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
- const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener(workerCreated, message => {
- const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
- if (
- isKillBehavior(KillBehaviors.HARD, message.kill) ||
- (message.kill != null &&
- this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
- .executing === 0)
- ) {
- // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- this.flushTasksQueue(currentWorkerNodeKey)
- // FIXME: wait for tasks to be finished
- void (this.destroyWorker(workerCreated) as Promise<void>)
- }
- })
- workerNodeKey = this.getWorkerNodeKey(workerCreated)
- } else {
- workerNodeKey = this.workerChoiceStrategyContext.execute()
+ private chooseWorkerNode (): number {
+ if (this.shallCreateDynamicWorker()) {
+ const worker = this.createAndSetupDynamicWorker()
+ if (
+ this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+ ) {
+ return this.getWorkerNodeKey(worker)
+ }
}
- return workerNodeKey
+ return this.workerChoiceStrategyContext.execute()
+ }
+
+ /**
+ * Conditions for dynamic worker creation.
+ *
+ * @returns Whether to create a dynamic worker or not.
+ */
+ private shallCreateDynamicWorker (): boolean {
+ return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
}
/**
>(worker: Worker, listener: (message: MessageValue<Message>) => void): void
/**
- * Returns a newly created worker.
+ * Creates a new worker.
+ *
+ * @returns Newly created worker.
*/
protected abstract createWorker (): Worker
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
- })
- worker.on('error', () => {
if (this.opts.restartWorkerOnError === true) {
this.createAndSetupWorker()
}
return worker
}
+ /**
+ * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+ *
+ * @returns New, completely set up dynamic worker.
+ */
+ protected createAndSetupDynamicWorker (): Worker {
+ const worker = this.createAndSetupWorker()
+ this.registerWorkerMessageListener(worker, message => {
+ const currentWorkerNodeKey = this.getWorkerNodeKey(worker)
+ if (
+ isKillBehavior(KillBehaviors.HARD, message.kill) ||
+ (message.kill != null &&
+ ((this.opts.enableTasksQueue === false &&
+ this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
+ .executing === 0) ||
+ (this.opts.enableTasksQueue === true &&
+ this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
+ .executing === 0 &&
+ this.tasksQueueSize(currentWorkerNodeKey) === 0)))
+ ) {
+ // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ void (this.destroyWorker(worker) as Promise<void>)
+ }
+ })
+ return worker
+ }
+
/**
* This function is the listener registered for each worker message.
*
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu
+ .elu.aggregate
}
})
}
median: 0,
history: new CircularArray()
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ utilization: 0
+ }
}
}