## [Unreleased]
+### Changed
+
+- Make continuous tasks stealing start at worker node idling.
+
## [3.0.6] - 2023-11-24
### Fixed
- Tasks distribution strategies :white_check_mark:
- Lockless tasks queueing :white_check_mark:
- Queued tasks rescheduling:
- - Task stealing on empty queue :white_check_mark:
+ - Task stealing on idle :white_check_mark:
- Tasks stealing under back pressure :white_check_mark:
- Tasks redistribution on worker error :white_check_mark:
- General guidelines on pool choice :white_check_mark:
- `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
- `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
- - `taskStealing` (optional) - Task stealing enablement on empty queue.
+ - `taskStealing` (optional) - Task stealing enablement on idle.
- `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure.
Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].addEventListener(
- 'emptyQueue',
- this.handleEmptyQueueEvent as EventListener
+ 'idleWorkerNode',
+ this.handleIdleWorkerNodeEvent as EventListener
)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].removeEventListener(
- 'emptyQueue',
- this.handleEmptyQueueEvent as EventListener
+ 'idleWorkerNode',
+ this.handleIdleWorkerNodeEvent as EventListener
)
}
}
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].addEventListener(
- 'emptyQueue',
- this.handleEmptyQueueEvent as EventListener
+ 'idleWorkerNode',
+ this.handleIdleWorkerNodeEvent as EventListener
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
}
}
- private readonly handleEmptyQueueEvent = (
+ private readonly handleIdleWorkerNodeEvent = (
event: CustomEvent<WorkerNodeEventDetail>
): void => {
const { workerId } = event.detail
*/
readonly concurrency?: number
/**
- * Whether to enable task stealing on empty queue.
+ * Whether to enable task stealing on idle.
*
* @defaultValue true
*/
public tasksQueueBackPressureSize: number
private readonly tasksQueue: Deque<Task<Data>>
private onBackPressureStarted: boolean
- private onEmptyQueueCount: number
+ private onIdleWorkerNodeCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
/**
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
this.onBackPressureStarted = false
- this.onEmptyQueueCount = 0
+ this.onIdleWorkerNodeCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
- this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+ if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+ this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
}
return task
}
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
- this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+ if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+ this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
}
return task
}
return this.taskFunctionsUsage.delete(name)
}
- private async startOnEmptyQueue (): Promise<void> {
+ private async startOnIdleWorkerNode (): Promise<void> {
if (
- this.onEmptyQueueCount > 0 &&
+ this.onIdleWorkerNodeCount > 0 &&
(this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
) {
- this.onEmptyQueueCount = 0
+ this.onIdleWorkerNodeCount = 0
return
}
- ++this.onEmptyQueueCount
+ ++this.onIdleWorkerNodeCount
this.dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('emptyQueue', {
+ new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
detail: { workerId: this.info.id as number }
})
)
- await sleep(exponentialDelay(this.onEmptyQueueCount))
- await this.startOnEmptyQueue()
+ await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
+ await this.startOnIdleWorkerNode()
+ }
+
+ private isIdle (): boolean {
+ return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
}
private initWorkerInfo (worker: Worker): WorkerInfo {
threadWorkerNode.tasksQueue.size
)
expect(threadWorkerNode.onBackPressureStarted).toBe(false)
- expect(threadWorkerNode.onEmptyQueueCount).toBe(0)
+ expect(threadWorkerNode.onIdleWorkerNodeCount).toBe(0)
expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
clusterWorkerNode.tasksQueue.size
)
expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
- expect(clusterWorkerNode.onEmptyQueueCount).toBe(0)
+ expect(clusterWorkerNode.onIdleWorkerNodeCount).toBe(0)
expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
})