this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
this.taskFunctions.get(name)
)
})
- this.deleteTaskFunctionWorkerUsages(name)
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
this.taskFunctions.delete(name)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
})
}
- private deleteTaskFunctionWorkerUsages (name: string): void {
- for (const workerNode of this.workerNodes) {
- workerNode.deleteTaskFunctionWorkerUsage(name)
- }
- }
-
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeKey: number,
message: MessageValue<Response>
): void {
- let needWorkerChoiceStrategyUpdate = false
+ let needWorkerChoiceStrategiesUpdate = false
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
workerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
taskFunctionWorkerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
- if (needWorkerChoiceStrategyUpdate) {
+ if (needWorkerChoiceStrategiesUpdate) {
this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
this.handleWorkerReadyResponse(message)
} else if (taskFunctionsProperties != null) {
// Task function properties message received from worker
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
if (ready == null || !ready) {
throw new Error(`Worker ${workerId} failed to initialize`)
}
- const workerNode =
- this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
this.checkAndEmitReadyEvent()
}
waitTime: {
history: new CircularArray()
},
- elu: {
- idle: {
- aggregate: 0,
- maximum: 0,
- minimum: 0,
- history: new CircularArray()
- },
- active: {
- aggregate: 0,
- maximum: 0,
- minimum: 0,
- history: new CircularArray()
- }
- }
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ })
})
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
+ ).toBeGreaterThan(0)
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
+ null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeGreaterThan(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeGreaterThanOrEqual(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeGreaterThanOrEqual(0)
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeLessThanOrEqual(1)
+ }
}
await dynamicThreadPool.destroy()
})