- Add `queueMaxSize` option to tasks queue options.
- Add O(1) deque implementation implemented with doubly linked list and use it for tasks queueing.
+- Add tasks stealing algorithm when a worker node queue is back pressured if tasks queueing is enabled.
## [2.6.31] - 2023-08-20
- Support for sync and async task functions :white_check_mark:
- Tasks distribution strategies :white_check_mark:
- Lockless tasks queueing :white_check_mark:
+- Queued tasks rescheduling:
+ - Tasks redistribution on worker error :white_check_mark:
+ - Tasks stealing under back pressure :white_check_mark:
- General guidelines on pool choice :white_check_mark:
- Error handling out of the box :white_check_mark:
- Widely tested :white_check_mark:
this.sendStartupMessageToWorker(workerNodeKey)
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
+ if (this.opts.enableTasksQueue === true) {
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
}
/**
let minQueuedTasks = Infinity
let executeTask = false
for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
- const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo
+ if (
+ this.workerNodes[workerNodeId].usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ executeTask = true
+ }
if (
workerNodeId !== workerNodeKey &&
- workerInfo.ready &&
+ workerNode.info.ready &&
workerNode.usage.tasks.queued === 0
) {
- if (
- this.workerNodes[workerNodeId].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- executeTask = true
- }
targetWorkerNodeKey = workerNodeId
break
}
if (
workerNodeId !== workerNodeKey &&
- workerInfo.ready &&
+ workerNode.info.ready &&
workerNode.usage.tasks.queued < minQueuedTasks
) {
minQueuedTasks = workerNode.usage.tasks.queued
if (executeTask) {
this.executeTask(
targetWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
+ this.popTask(workerNodeKey) as Task<Data>
)
} else {
this.enqueueTask(
targetWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
+ this.popTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+ }
+
+ private tasksStealingOnBackPressure (workerId: number): void {
+ const sourceWorkerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodes = this.workerNodes
+ .filter((workerNode) => workerNode.info.id !== workerId)
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
+ )
+ for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+ if (
+ workerNode.info.ready &&
+ sourceWorkerNode.usage.tasks.queued > 0 &&
+ !workerNode.hasBackPressure() &&
+ workerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ sourceWorkerNode.popTask() as Task<Data>
+ )
+ } else if (
+ workerNode.info.ready &&
+ sourceWorkerNode.usage.tasks.queued > 0 &&
+ !workerNode.hasBackPressure() &&
+ workerNode.usage.tasks.executing >=
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.enqueueTask(
+ workerNodeKey,
+ sourceWorkerNode.popTask() as Task<Data>
)
}
}
return this.workerNodes[workerNodeKey].dequeueTask()
}
+ private popTask (workerNodeKey: number): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].popTask()
+ }
+
private tasksQueueSize (workerNodeKey: number): number {
return this.workerNodes[workerNodeKey].tasksQueueSize()
}
public usage: WorkerUsage
/** @inheritdoc */
public tasksQueueBackPressureSize: number
+ /** @inheritdoc */
+ public onBackPressure?: (workerId: number) => void
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
private readonly tasksQueue: Deque<Task<Data>>
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
- return this.tasksQueue.push(task)
+ const tasksQueueSize = this.tasksQueue.push(task)
+ if (this.onBackPressure != null && this.hasBackPressure()) {
+ this.once(this.onBackPressure)(this.info.id as number)
+ }
+ return tasksQueueSize
+ }
+
+ /** @inheritdoc */
+ public unshiftTask (task: Task<Data>): number {
+ const tasksQueueSize = this.tasksQueue.unshift(task)
+ if (this.onBackPressure != null && this.hasBackPressure()) {
+ this.once(this.onBackPressure)(this.info.id as number)
+ }
+ return tasksQueueSize
}
/** @inheritdoc */
return this.tasksQueue.shift()
}
+ /** @inheritdoc */
+ public popTask (): Task<Data> | undefined {
+ return this.tasksQueue.pop()
+ }
+
/** @inheritdoc */
public clearTasksQueue (): void {
this.tasksQueue.clear()
return worker.id
}
}
+
+ /**
+ * Executes a function once at a time.
+ */
+
+ private once (
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ fn: (...args: any[]) => void,
+ context = this
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ ): (...args: any[]) => void {
+ let called = false
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ return function (...args: any[]): void {
+ if (!called) {
+ called = true
+ fn.apply(context, args)
+ called = false
+ }
+ }
+ }
}
* This is the number of tasks that can be enqueued before the worker node has back pressure.
*/
tasksQueueBackPressureSize: number
+ /**
+ * Callback invoked when worker node tasks queue is back pressured.
+ *
+ * @param workerId - The worker id.
+ */
+ onBackPressure?: (workerId: number) => void
/**
* Tasks queue size.
*
* @returns The tasks queue size.
*/
readonly enqueueTask: (task: Task<Data>) => number
+ /**
+ * Prepends a task to the tasks queue.
+ *
+ * @param task - The task to prepend.
+ * @returns The tasks queue size.
+ */
+ readonly unshiftTask: (task: Task<Data>) => number
/**
* Dequeue task.
*
* @returns The dequeued task.
*/
readonly dequeueTask: () => Task<Data> | undefined
+ /**
+ * Pops a task from the tasks queue.
+ *
+ * @returns The popped task.
+ */
+ readonly popTask: () => Task<Data> | undefined
/**
* Clears tasks queue.
*/