*
* When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
- protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
- new Map<string, PromiseResponseWrapper<Response>>()
+ protected promiseResponseMap: Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ > = new Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ >()
/**
* Worker choice strategies context referencing worker choice algorithms implementation.
/**
* The start timestamp of the pool.
*/
- private readonly startTimestamp
+ private startTimestamp?: number
/**
* Constructs a new poolifier pool.
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.initializeEventEmitter()
+ this.initEventEmitter()
}
this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
Worker,
if (this.opts.startWorkers === true) {
this.start()
}
-
- this.startTimestamp = performance.now()
}
private checkPoolType (): void {
}
}
- private initializeEventEmitter (): void {
+ private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
name: `poolifier:${this.type}-${this.worker}-pool`
})
* @returns The pool utilization.
*/
private get utilization (): number {
+ if (this.startTimestamp == null) {
+ return 0
+ }
const poolTimeCapacity =
(performance.now() - this.startTimestamp) *
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
this.taskFunctions.get(name)
)
})
- this.deleteTaskFunctionWorkerUsages(name)
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
this.taskFunctions.delete(name)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
})
}
- private deleteTaskFunctionWorkerUsages (name: string): void {
- for (const workerNode of this.workerNodes) {
- workerNode.deleteTaskFunctionWorkerUsage(name)
- }
- }
-
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
/**
* Starts the minimum number of workers.
*/
- private startMinimumNumberOfWorkers (): void {
+ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
this.startingMinimumNumberOfWorkers = true
while (
this.workerNodes.reduce(
0
) < this.minimumNumberOfWorkers
) {
- this.createAndSetupWorkerNode()
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ initWorkerNodeUsage &&
+ this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
}
this.startingMinimumNumberOfWorkers = false
}
}
this.starting = true
this.startMinimumNumberOfWorkers()
+ this.startTimestamp = performance.now()
this.starting = false
this.started = true
}
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
this.readyEventEmitted = false
+ delete this.startTimestamp
this.destroying = false
this.started = false
}
workerNodeKey: number,
message: MessageValue<Response>
): void {
- let needWorkerChoiceStrategyUpdate = false
+ let needWorkerChoiceStrategiesUpdate = false
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
workerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
taskFunctionWorkerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
- if (needWorkerChoiceStrategyUpdate) {
+ if (needWorkerChoiceStrategiesUpdate) {
this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
}
/**
- * Chooses a worker node for the next task.
+ * Chooses a worker node for the next task given the worker choice strategy.
*
* @param workerChoiceStrategy - The worker choice strategy.
* @returns The chosen worker node key
transferList?: readonly TransferListItem[]
): void
+ /**
+ * Initializes the worker node usage with sensible default values gathered during runtime.
+ *
+ * @param workerNode - The worker node.
+ */
+ private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true
+ ) {
+ workerNode.usage.runTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true
+ ) {
+ workerNode.usage.waitTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+ .aggregate === true
+ ) {
+ workerNode.usage.elu.active.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+ )
+ )
+ }
+ }
+
/**
* Creates a new, completely set up worker node.
*
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else if (!this.startingMinimumNumberOfWorkers) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
}
if (
!this.startingMinimumNumberOfWorkers &&
!this.destroying
) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
})
const workerNodeKey = this.addWorkerNode(workerNode)
) {
workerNode.info.ready = true
}
+ this.initWorkerNodeUsage(workerNode)
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
}
}
}
- private redistributeQueuedTasks (workerNodeKey: number): void {
- if (workerNodeKey === -1 || this.cannotStealTask()) {
+ private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
+ if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
return
}
- while (this.tasksQueueSize(workerNodeKey) > 0) {
+ while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return workerNode.info.ready &&
+ return sourceWorkerNodeKey !== workerNodeKey &&
+ workerNode.info.ready &&
workerNode.usage.tasks.queued <
workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
this.handleTask(
destinationWorkerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.dequeueTask(workerNodeKey)!
+ this.dequeueTask(sourceWorkerNodeKey)!
)
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage =
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerNode.getTaskFunctionWorkerUsage(taskName)!
- ++taskFunctionWorkerUsage.tasks.stolen
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen
}
}
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string,
+ previousTaskName?: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
- }
-
- private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
const taskFunctionWorkerUsage =
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
workerNode.getTaskFunctionWorkerUsage(taskName)!
- ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ if (
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
+ (previousTaskName != null &&
+ previousTaskName === taskName &&
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)
+ ) {
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) {
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
}
}
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
- }
-
- private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage =
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerNode.getTaskFunctionWorkerUsage(taskName)!
- taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ )!.tasks.sequentiallyStolen = 0
}
}
) {
if (workerInfo != null && previousStolenTask != null) {
workerInfo.stealing = false
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
}
return
}
if (
workerInfo != null &&
previousStolenTask != null &&
- workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
workerInfo.stealing = false
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- taskFunctionProperties.name
- )
- }
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
return
}
if (workerInfo == null) {
}
workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
- if (
- this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- stolenTask != null
- ) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const taskFunctionTasksWorkerUsage = this.workerNodes[
- workerNodeKey
+ if (stolenTask != null) {
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
- if (
- taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
- (previousStolenTask != null &&
- previousStolenTask.name === stolenTask.name &&
- taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
- ) {
- this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- } else {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- }
+ stolenTask.name!,
+ previousStolenTask?.name
+ )
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
)
if (sourceWorkerNode != null) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueTask(1)!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task
): void => {
if (
this.cannotStealTask() ||
+ this.hasBackPressure() ||
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
}
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueTask(1)!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
this.handleWorkerReadyResponse(message)
} else if (taskFunctionsProperties != null) {
// Task function properties message received from worker
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
if (ready == null || !ready) {
throw new Error(`Worker ${workerId} failed to initialize`)
}
- const workerNode =
- this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
this.checkAndEmitReadyEvent()
}
return tasksQueueSize
}
- private dequeueTask (
- workerNodeKey: number,
- bucket?: number
- ): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].dequeueTask(bucket)
+ private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].dequeueTask()
}
private tasksQueueSize (workerNodeKey: number): number {