accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
),
- queuedTasks: this.workerNodes.reduce(
- (accumulator, workerNode) =>
- accumulator + workerNode.usage.tasks.queued,
- 0
- ),
- stealingWorkerNodes: this.workerNodes.reduce(
- (accumulator, _, workerNodeKey) =>
- this.isWorkerNodeStealing(workerNodeKey)
- ? accumulator + 1
- : accumulator,
- 0
- ),
+ queuedTasks: this.getQueuedTasks(),
+ stealingWorkerNodes: this.getStealingWorkerNodes(),
stolenTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.stolen,
})
)
if (this.emitter != null) {
- this.emitter.emit(PoolEvents.destroy, this.info)
+ this.emitter.listenerCount(PoolEvents.destroy) > 0 &&
+ this.emitter.emit(PoolEvents.destroy, this.info)
this.emitter.emitDestroy()
this.readyEventEmitted = false
}
!this.started ||
this.destroying ||
this.workerNodes.length <= 1 ||
- this.info.queuedTasks === 0
+ this.getQueuedTasks() === 0
)
}
private checkAndEmitReadyEvent (): void {
if (this.emitter != null && !this.readyEventEmitted && this.ready) {
- this.emitter.emit(PoolEvents.ready, this.info)
+ this.emitter.listenerCount(PoolEvents.ready) > 0 &&
+ this.emitter.emit(PoolEvents.ready, this.info)
this.readyEventEmitted = true
}
}
this.backPressureEventEmitted &&
!this.backPressure
) {
- this.emitter.emit(PoolEvents.backPressureEnd, this.info)
+ this.emitter.listenerCount(PoolEvents.backPressureEnd) > 0 &&
+ this.emitter.emit(PoolEvents.backPressureEnd, this.info)
this.backPressureEventEmitted = false
}
}
private checkAndEmitTaskExecutionEvents (): void {
if (this.emitter != null && !this.busyEventEmitted && this.busy) {
- this.emitter.emit(PoolEvents.busy, this.info)
+ this.emitter.listenerCount(PoolEvents.busy) > 0 &&
+ this.emitter.emit(PoolEvents.busy, this.info)
this.busyEventEmitted = true
}
}
private checkAndEmitTaskExecutionFinishedEvents (): void {
if (this.emitter != null && this.busyEventEmitted && !this.busy) {
- this.emitter.emit(PoolEvents.busyEnd, this.info)
+ this.emitter.listenerCount(PoolEvents.busyEnd) > 0 &&
+ this.emitter.emit(PoolEvents.busyEnd, this.info)
this.busyEventEmitted = false
}
}
!this.backPressureEventEmitted &&
this.backPressure
) {
- this.emitter.emit(PoolEvents.backPressure, this.info)
+ this.emitter.listenerCount(PoolEvents.backPressure) > 0 &&
+ this.emitter.emit(PoolEvents.backPressure, this.info)
this.backPressureEventEmitted = true
}
}
: new Error(`Task '${taskName}' id '${taskId}' aborted`)
}
+ private getQueuedTasks (): number {
+ return this.workerNodes.reduce((accumulator, workerNode) => {
+ return accumulator + workerNode.usage.tasks.queued
+ }, 0)
+ }
+
+ private getStealingWorkerNodes (): number {
+ return this.workerNodes.reduce(
+ (accumulator, _, workerNodeKey) =>
+ this.isWorkerNodeStealing(workerNodeKey)
+ ? accumulator + 1
+ : accumulator,
+ 0
+ )
+ }
+
/**
* Gets task function worker choice strategy, if any.
* @param name - The task function name.
private readonly isStealingRatioReached = (): boolean => {
return (
this.opts.tasksQueueOptions?.tasksStealingRatio === 0 ||
- (this.info.stealingWorkerNodes ?? 0) >
+ this.getStealingWorkerNodes() >
Math.ceil(
this.workerNodes.length *
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
message: MessageValue<Data>
): Promise<boolean> {
const targetWorkerNodeKeys = [...this.workerNodes.keys()]
+ if (targetWorkerNodeKeys.length === 0) {
+ return true
+ }
const responsesReceived: MessageValue<Response>[] = []
const taskFunctionOperationsListener = (
message: MessageValue<Response>,
protected override checkAndEmitDynamicWorkerCreationEvents (): void {
if (this.emitter != null) {
if (!this.fullEventEmitted && this.full) {
- this.emitter.emit(PoolEvents.full, this.info)
+ this.emitter.listenerCount(PoolEvents.full) > 0 &&
+ this.emitter.emit(PoolEvents.full, this.info)
this.fullEventEmitted = true
}
if (this.emptyEventEmitted && !this.empty) {
protected override checkAndEmitDynamicWorkerDestructionEvents (): void {
if (this.emitter != null) {
if (this.fullEventEmitted && !this.full) {
- this.emitter.emit(PoolEvents.fullEnd, this.info)
+ this.emitter.listenerCount(PoolEvents.fullEnd) > 0 &&
+ this.emitter.emit(PoolEvents.fullEnd, this.info)
this.fullEventEmitted = false
}
if (!this.emptyEventEmitted && this.empty) {
- this.emitter.emit(PoolEvents.empty, this.info)
+ this.emitter.listenerCount(PoolEvents.empty) > 0 &&
+ this.emitter.emit(PoolEvents.empty, this.info)
this.emptyEventEmitted = true
}
}
protected override checkAndEmitDynamicWorkerCreationEvents (): void {
if (this.emitter != null) {
if (!this.fullEventEmitted && this.full) {
- this.emitter.emit(PoolEvents.full, this.info)
+ this.emitter.listenerCount(PoolEvents.full) > 0 &&
+ this.emitter.emit(PoolEvents.full, this.info)
this.fullEventEmitted = true
}
if (this.emptyEventEmitted && !this.empty) {
protected override checkAndEmitDynamicWorkerDestructionEvents (): void {
if (this.emitter != null) {
if (this.fullEventEmitted && !this.full) {
- this.emitter.emit(PoolEvents.fullEnd, this.info)
+ this.emitter.listenerCount(PoolEvents.fullEnd) > 0 &&
+ this.emitter.emit(PoolEvents.fullEnd, this.info)
this.fullEventEmitted = false
}
if (!this.emptyEventEmitted && this.empty) {
- this.emitter.emit(PoolEvents.empty, this.info)
+ this.emitter.listenerCount(PoolEvents.empty) > 0 &&
+ this.emitter.emit(PoolEvents.empty, this.info)
this.emptyEventEmitted = true
}
}