type WorkerUsage
} from './worker'
+type EmptyQueueCallback = (workerId: number) => void
+type BackPressureCallback = EmptyQueueCallback
+
/**
* Worker node.
*
/** @inheritdoc */
public readonly info: WorkerInfo
/** @inheritdoc */
- public messageChannel?: MessageChannel
- /** @inheritdoc */
public usage: WorkerUsage
/** @inheritdoc */
+ public messageChannel?: MessageChannel
+ /** @inheritdoc */
public tasksQueueBackPressureSize: number
/** @inheritdoc */
- public onBackPressure?: (workerId: number) => void
+ public onBackPressure?: BackPressureCallback
/** @inheritdoc */
- public onEmptyQueue?: (workerId: number) => void
- private readonly taskFunctionsUsage: Map<string, WorkerUsage>
+ public onEmptyQueue?: EmptyQueueCallback
private readonly tasksQueue: Deque<Task<Data>>
private onEmptyQueueCount: number
+ private readonly taskFunctionsUsage: Map<string, WorkerUsage>
/**
* Constructs a new worker node.
}
this.worker = worker
this.info = this.initWorkerInfo(worker, workerType)
+ this.usage = this.initWorkerUsage()
if (workerType === WorkerTypes.thread) {
this.messageChannel = new MessageChannel()
}
- this.usage = this.initWorkerUsage()
- this.taskFunctionsUsage = new Map<string, WorkerUsage>()
- this.tasksQueue = new Deque<Task<Data>>()
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+ this.tasksQueue = new Deque<Task<Data>>()
this.onEmptyQueueCount = 0
+ this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
}
private async startOnEmptyQueue (): Promise<void> {
- if (this.onEmptyQueue != null) {
- if (this.tasksQueue.size > 0) {
- this.onEmptyQueueCount = 0
- return
- }
- this.onEmptyQueue(this.info.id as number)
- ++this.onEmptyQueueCount
- await sleep(exponentialDelay(this.onEmptyQueueCount))
- await this.startOnEmptyQueue()
+ if (
+ this.onEmptyQueueCount > 0 &&
+ (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
+ ) {
+ this.onEmptyQueueCount = 0
+ return
}
+ (this.onEmptyQueue as EmptyQueueCallback)(this.info.id as number)
+ ++this.onEmptyQueueCount
+ await sleep(exponentialDelay(this.onEmptyQueueCount))
+ await this.startOnEmptyQueue()
}
private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {