type TasksQueueOptions,
type WorkerType
} from './pool'
-import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
+import type {
+ IWorker,
+ Task,
+ TaskStatistics,
+ WorkerNode,
+ WorkerUsage
+} from './worker'
import {
+ Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
/**
* Worker choice strategy context referencing a worker choice algorithm implementation.
- *
- * Default to a round robin algorithm.
*/
protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
Worker,
'Invalid worker choice strategy options: must have a weight for each worker node'
)
}
+ if (
+ workerChoiceStrategyOptions.measurement != null &&
+ !Object.values(Measurements).includes(
+ workerChoiceStrategyOptions.measurement
+ )
+ ) {
+ throw new Error(
+ `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
+ )
+ }
}
private checkValidTasksQueueOptions (
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
- if ((tasksQueueOptions?.concurrency as number) <= 0) {
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
+ ) {
+ throw new TypeError(
+ 'Invalid worker tasks concurrency: must be an integer'
+ )
+ }
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ tasksQueueOptions.concurrency <= 0
+ ) {
throw new Error(
- `Invalid worker tasks concurrency '${
- tasksQueueOptions.concurrency as number
- }'`
+ `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
)
}
}
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
+ workerNode.workerUsage.tasks.executing === 0
+ ? accumulator + 1
+ : accumulator,
0
),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
+ workerNode.workerUsage.tasks.executing > 0
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ),
+ executedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.workerUsage.tasks.executed,
0
),
- runningTasks: this.workerNodes.reduce(
+ executingTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.tasksUsage.running,
+ accumulator + workerNode.workerUsage.tasks.executing,
0
),
queuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.tasksQueue.maxSize,
0
+ ),
+ failedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.workerUsage.tasks.failed,
+ 0
)
}
}
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
for (const workerNode of this.workerNodes) {
- this.setWorkerNodeTasksUsage(workerNode, {
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: new CircularArray(),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
- elu: undefined
- })
+ this.setWorkerNodeTasksUsage(
+ workerNode,
+ this.getWorkerUsage(workerNode.worker)
+ )
this.setWorkerStatistics(workerNode.worker)
}
}
*/
protected abstract get busy (): boolean
+ /**
+ * Whether worker nodes are executing at least one task.
+ *
+ * @returns Worker nodes busyness boolean status.
+ */
protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
- return workerNode.tasksUsage.running === 0
+ return workerNode.workerUsage.tasks.executing === 0
}) === -1
)
}
if (
this.opts.enableTasksQueue === true &&
(this.busy ||
- this.workerNodes[workerNodeKey].tasksUsage.running >=
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
((this.opts.tasksQueueOptions as TasksQueueOptions)
.concurrency as number))
) {
} else {
this.executeTask(workerNodeKey, submittedTask)
}
- this.workerChoiceStrategyContext.update(workerNodeKey)
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
}
/**
- * Shutdowns the given worker.
+ * Terminates the given worker.
*
* @param worker - A worker within `workerNodes`.
*/
* Can be overridden.
*
* @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
*/
- protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].tasksUsage.running
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+ ++workerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(workerUsage, task)
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerTasksUsage =
- this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
- --workerTasksUsage.running
- ++workerTasksUsage.ran
- if (message.error != null) {
- ++workerTasksUsage.error
+ const workerUsage =
+ this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+ this.updateTaskStatisticsWorkerUsage(workerUsage, message)
+ this.updateRunTimeWorkerUsage(workerUsage, message)
+ this.updateEluWorkerUsage(workerUsage, message)
+ }
+
+ private updateTaskStatisticsWorkerUsage (
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void {
+ const workerTaskStatistics = workerUsage.tasks
+ --workerTaskStatistics.executing
+ ++workerTaskStatistics.executed
+ if (message.taskError != null) {
+ ++workerTaskStatistics.failed
}
- this.updateRunTimeTasksUsage(workerTasksUsage, message)
- this.updateWaitTimeTasksUsage(workerTasksUsage, message)
- this.updateEluTasksUsage(workerTasksUsage, message)
}
- private updateRunTimeTasksUsage (
- workerTasksUsage: TasksUsage,
+ private updateRunTimeWorkerUsage (
+ workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
- workerTasksUsage.runTime += message.runTime ?? 0
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .aggregate
+ ) {
+ workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
if (
- this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
- workerTasksUsage.ran !== 0
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .average &&
+ workerUsage.tasks.executed !== 0
) {
- workerTasksUsage.avgRunTime =
- workerTasksUsage.runTime / workerTasksUsage.ran
+ workerUsage.runTime.average =
+ workerUsage.runTime.aggregate /
+ (workerUsage.tasks.executed - workerUsage.tasks.failed)
}
if (
- this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
- message.runTime != null
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .median &&
+ message.taskPerformance?.runTime != null
) {
- workerTasksUsage.runTimeHistory.push(message.runTime)
- workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
+ workerUsage.runTime.history.push(message.taskPerformance.runTime)
+ workerUsage.runTime.median = median(workerUsage.runTime.history)
}
}
}
- private updateWaitTimeTasksUsage (
- workerTasksUsage: TasksUsage,
- message: MessageValue<Response>
+ private updateWaitTimeWorkerUsage (
+ workerUsage: WorkerUsage,
+ task: Task<Data>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
- workerTasksUsage.waitTime += message.waitTime ?? 0
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
+ .aggregate
+ ) {
+ workerUsage.waitTime.aggregate += taskWaitTime ?? 0
if (
- this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
- workerTasksUsage.ran !== 0
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.average &&
+ workerUsage.tasks.executed !== 0
) {
- workerTasksUsage.avgWaitTime =
- workerTasksUsage.waitTime / workerTasksUsage.ran
+ workerUsage.waitTime.average =
+ workerUsage.waitTime.aggregate /
+ (workerUsage.tasks.executed - workerUsage.tasks.failed)
}
if (
- this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
- message.waitTime != null
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.median &&
+ taskWaitTime != null
) {
- workerTasksUsage.waitTimeHistory.push(message.waitTime)
- workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+ workerUsage.waitTime.history.push(taskWaitTime)
+ workerUsage.waitTime.median = median(workerUsage.waitTime.history)
}
}
}
- private updateEluTasksUsage (
- workerTasksUsage: TasksUsage,
+ private updateEluWorkerUsage (
+ workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
- if (workerTasksUsage.elu != null && message.elu != null) {
- workerTasksUsage.elu = {
- idle: workerTasksUsage.elu.idle + message.elu.idle,
- active: workerTasksUsage.elu.active + message.elu.active,
- utilization:
- (workerTasksUsage.elu.utilization + message.elu.utilization) / 2
- }
- } else if (message.elu != null) {
- workerTasksUsage.elu = message.elu
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .aggregate
+ ) {
+ if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else if (message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .average &&
+ workerUsage.tasks.executed !== 0
+ ) {
+ const executedTasks =
+ workerUsage.tasks.executed - workerUsage.tasks.failed
+ workerUsage.elu.idle.average =
+ workerUsage.elu.idle.aggregate / executedTasks
+ workerUsage.elu.active.average =
+ workerUsage.elu.active.aggregate / executedTasks
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .median &&
+ message.taskPerformance?.elu != null
+ ) {
+ workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
+ workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
+ workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
+ workerUsage.elu.active.median = median(workerUsage.elu.active.history)
}
}
}
/**
* Chooses a worker node for the next task.
*
- * The default worker choice strategy uses a round robin algorithm to distribute the load.
+ * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
*
* @returns The worker node key
*/
- protected chooseWorkerNode (): number {
- let workerNodeKey: number
- if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
- const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener(workerCreated, message => {
- const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
- if (
- isKillBehavior(KillBehaviors.HARD, message.kill) ||
- (message.kill != null &&
- this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
- ) {
- // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- this.flushTasksQueue(currentWorkerNodeKey)
- // FIXME: wait for tasks to be finished
- void (this.destroyWorker(workerCreated) as Promise<void>)
- }
- })
- workerNodeKey = this.getWorkerNodeKey(workerCreated)
- } else {
- workerNodeKey = this.workerChoiceStrategyContext.execute()
+ private chooseWorkerNode (): number {
+ if (this.shallCreateDynamicWorker()) {
+ const worker = this.createAndSetupDynamicWorker()
+ if (
+ this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+ ) {
+ return this.getWorkerNodeKey(worker)
+ }
}
- return workerNodeKey
+ return this.workerChoiceStrategyContext.execute()
+ }
+
+ /**
+ * Conditions for dynamic worker creation.
+ *
+ * @returns Whether to create a dynamic worker or not.
+ */
+ private shallCreateDynamicWorker (): boolean {
+ return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
}
/**
>(worker: Worker, listener: (message: MessageValue<Message>) => void): void
/**
- * Returns a newly created worker.
+ * Creates a new worker.
+ *
+ * @returns Newly created worker.
*/
protected abstract createWorker (): Worker
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
- })
- worker.on('error', () => {
if (this.opts.restartWorkerOnError === true) {
this.createAndSetupWorker()
}
return worker
}
+ /**
+ * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+ *
+ * @returns New, completely set up dynamic worker.
+ */
+ protected createAndSetupDynamicWorker (): Worker {
+ const worker = this.createAndSetupWorker()
+ this.registerWorkerMessageListener(worker, message => {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ if (
+ isKillBehavior(KillBehaviors.HARD, message.kill) ||
+ (message.kill != null &&
+ ((this.opts.enableTasksQueue === false &&
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+ 0) ||
+ (this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+ 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0)))
+ ) {
+ // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ void (this.destroyWorker(worker) as Promise<void>)
+ }
+ })
+ return worker
+ }
+
/**
* This function is the listener registered for each worker message.
*
// Task execution response received
const promiseResponse = this.promiseResponseMap.get(message.id)
if (promiseResponse != null) {
- if (message.error != null) {
- promiseResponse.reject(message.error)
+ if (message.taskError != null) {
if (this.emitter != null) {
- this.emitter.emit(PoolEvents.taskError, {
- error: message.error,
- errorData: message.errorData
- })
+ this.emitter.emit(PoolEvents.taskError, message.taskError)
}
+ promiseResponse.reject(message.taskError.message)
} else {
promiseResponse.resolve(message.data as Response)
}
this.dequeueTask(workerNodeKey) as Task<Data>
)
}
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
}
* Sets the given worker node its tasks usage in the pool.
*
* @param workerNode - The worker node.
- * @param tasksUsage - The worker node tasks usage.
+ * @param workerUsage - The worker usage.
*/
private setWorkerNodeTasksUsage (
workerNode: WorkerNode<Worker, Data>,
- tasksUsage: TasksUsage
+ workerUsage: WorkerUsage
): void {
- workerNode.tasksUsage = tasksUsage
+ workerNode.workerUsage = workerUsage
}
/**
private pushWorkerNode (worker: Worker): number {
return this.workerNodes.push({
worker,
- tasksUsage: {
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: new CircularArray(),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
- elu: undefined
- },
+ workerUsage: this.getWorkerUsage(worker),
tasksQueue: new Queue<Task<Data>>()
})
}
- /**
- * Sets the given worker in the pool worker nodes.
- *
- * @param workerNodeKey - The worker node key.
- * @param worker - The worker.
- * @param tasksUsage - The worker tasks usage.
- * @param tasksQueue - The worker task queue.
- */
- private setWorkerNode (
- workerNodeKey: number,
- worker: Worker,
- tasksUsage: TasksUsage,
- tasksQueue: Queue<Task<Data>>
- ): void {
- this.workerNodes[workerNodeKey] = {
- worker,
- tasksUsage,
- tasksQueue
- }
- }
+ // /**
+ // * Sets the given worker in the pool worker nodes.
+ // *
+ // * @param workerNodeKey - The worker node key.
+ // * @param worker - The worker.
+ // * @param workerUsage - The worker usage.
+ // * @param tasksQueue - The worker task queue.
+ // */
+ // private setWorkerNode (
+ // workerNodeKey: number,
+ // worker: Worker,
+ // workerUsage: WorkerUsage,
+ // tasksQueue: Queue<Task<Data>>
+ // ): void {
+ // this.workerNodes[workerNodeKey] = {
+ // worker,
+ // workerUsage,
+ // tasksQueue
+ // }
+ // }
/**
* Removes the given worker from the pool worker nodes.
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
- this.beforeTaskExecutionHook(workerNodeKey)
+ this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
}
private setWorkerStatistics (worker: Worker): void {
this.sendToWorker(worker, {
statistics: {
- runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime,
- waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime,
- elu: this.workerChoiceStrategyContext.getTaskStatistics().elu
+ runTime:
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate,
+ elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .elu.aggregate
}
})
}
+
+ private getWorkerUsage (worker: Worker): WorkerUsage {
+ return {
+ tasks: this.getTaskStatistics(worker),
+ runTime: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ waitTime: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ utilization: 0
+ }
+ }
+ }
+
+ private getTaskStatistics (worker: Worker): TaskStatistics {
+ const queueSize =
+ this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
+ return {
+ executed: 0,
+ executing: 0,
+ get queued (): number {
+ return queueSize ?? 0
+ },
+ failed: 0
+ }
+ }
}