this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.initializeEventEmitter()
+ this.initEventEmitter()
}
this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
Worker,
}
}
- private initializeEventEmitter (): void {
+ private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
name: `poolifier:${this.type}-${this.worker}-pool`
})
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
this.taskFunctions.get(name)
)
})
- this.deleteTaskFunctionWorkerUsages(name)
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
this.taskFunctions.delete(name)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
})
}
- private deleteTaskFunctionWorkerUsages (name: string): void {
- for (const workerNode of this.workerNodes) {
- workerNode.deleteTaskFunctionWorkerUsage(name)
- }
- }
-
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
/**
* Starts the minimum number of workers.
*/
- private startMinimumNumberOfWorkers (): void {
+ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
this.startingMinimumNumberOfWorkers = true
while (
this.workerNodes.reduce(
0
) < this.minimumNumberOfWorkers
) {
- this.createAndSetupWorkerNode()
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ initWorkerNodeUsage &&
+ this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
}
this.startingMinimumNumberOfWorkers = false
}
workerNodeKey: number,
message: MessageValue<Response>
): void {
- let needWorkerChoiceStrategyUpdate = false
+ let needWorkerChoiceStrategiesUpdate = false
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
workerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
taskFunctionWorkerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
- if (needWorkerChoiceStrategyUpdate) {
+ if (needWorkerChoiceStrategiesUpdate) {
this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
transferList?: readonly TransferListItem[]
): void
+ /**
+ * Initializes the worker node usage with sensible default values gathered during runtime.
+ *
+ * @param workerNode - The worker node.
+ */
+ private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true
+ ) {
+ workerNode.usage.runTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true
+ ) {
+ workerNode.usage.waitTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+ .aggregate === true
+ ) {
+ workerNode.usage.elu.active.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+ )
+ )
+ }
+ }
+
/**
* Creates a new, completely set up worker node.
*
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else if (!this.startingMinimumNumberOfWorkers) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
}
if (
!this.startingMinimumNumberOfWorkers &&
!this.destroying
) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
})
const workerNodeKey = this.addWorkerNode(workerNode)
) {
workerNode.info.ready = true
}
+ this.initWorkerNodeUsage(workerNode)
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
}
this.handleWorkerReadyResponse(message)
} else if (taskFunctionsProperties != null) {
// Task function properties message received from worker
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
if (ready == null || !ready) {
throw new Error(`Worker ${workerId} failed to initialize`)
}
- const workerNode =
- this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
this.checkAndEmitReadyEvent()
}