} from '../utils'
import { Deque } from '../deque'
import {
+ type BackPressureCallback,
+ type EmptyQueueCallback,
type IWorker,
type IWorkerNode,
type WorkerInfo,
/** @inheritdoc */
public readonly info: WorkerInfo
/** @inheritdoc */
- public messageChannel?: MessageChannel
- /** @inheritdoc */
public usage: WorkerUsage
/** @inheritdoc */
+ public messageChannel?: MessageChannel
+ /** @inheritdoc */
public tasksQueueBackPressureSize: number
/** @inheritdoc */
- public onBackPressure?: (workerId: number) => void
+ public onBackPressure?: BackPressureCallback
/** @inheritdoc */
- public onEmptyQueue?: (workerId: number) => void
- private readonly taskFunctionsUsage: Map<string, WorkerUsage>
+ public onEmptyQueue?: EmptyQueueCallback
private readonly tasksQueue: Deque<Task<Data>>
private onEmptyQueueCount: number
+ private readonly taskFunctionsUsage: Map<string, WorkerUsage>
/**
* Constructs a new worker node.
}
this.worker = worker
this.info = this.initWorkerInfo(worker, workerType)
+ this.usage = this.initWorkerUsage()
if (workerType === WorkerTypes.thread) {
this.messageChannel = new MessageChannel()
}
- this.usage = this.initWorkerUsage()
- this.taskFunctionsUsage = new Map<string, WorkerUsage>()
- this.tasksQueue = new Deque<Task<Data>>()
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+ this.tasksQueue = new Deque<Task<Data>>()
this.onEmptyQueueCount = 0
+ this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
}
private async startOnEmptyQueue (): Promise<void> {
- if (this.tasksQueue.size > 0) {
+ if (
+ this.onEmptyQueueCount > 0 &&
+ (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
+ ) {
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as (workerId: number) => void)(this.info.id as number)
+ (this.onEmptyQueue as EmptyQueueCallback)(this.info.id as number)
++this.onEmptyQueueCount
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()