From 72695f86742c18357f42f000ce6488d545187133 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 22 Aug 2023 23:36:42 +0200 Subject: [PATCH] feat: add tasks stealing algorithm MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit reference #901, #887 Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + README.md | 3 ++ src/pools/abstract-pool.ts | 65 +++++++++++++++++++++++++++++++------- src/pools/worker-node.ts | 43 ++++++++++++++++++++++++- src/pools/worker.ts | 19 +++++++++++ 5 files changed, 119 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56591cb7..0fe43abd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `queueMaxSize` option to tasks queue options. - Add O(1) deque implementation implemented with doubly linked list and use it for tasks queueing. +- Add tasks stealing algorithm when a worker node queue is back pressured if tasks queueing is enabled. ## [2.6.31] - 2023-08-20 diff --git a/README.md b/README.md index e80193ec..eae0023f 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,9 @@ Please consult our [general guidelines](#general-guidelines). - Support for sync and async task functions :white_check_mark: - Tasks distribution strategies :white_check_mark: - Lockless tasks queueing :white_check_mark: +- Queued tasks rescheduling: + - Tasks redistribution on worker error :white_check_mark: + - Tasks stealing under back pressure :white_check_mark: - General guidelines on pool choice :white_check_mark: - Error handling out of the box :white_check_mark: - Widely tested :white_check_mark: diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1926a4a4..f0119f4e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1142,6 +1142,10 @@ export abstract class AbstractPool< this.sendStartupMessageToWorker(workerNodeKey) // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } /** @@ -1175,24 +1179,23 @@ export abstract class AbstractPool< let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } if ( workerNodeId !== workerNodeKey && - workerInfo.ready && + workerNode.info.ready && workerNode.usage.tasks.queued === 0 ) { - if ( - this.workerNodes[workerNodeId].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } targetWorkerNodeKey = workerNodeId break } if ( workerNodeId !== workerNodeKey && - workerInfo.ready && + workerNode.info.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { minQueuedTasks = workerNode.usage.tasks.queued @@ -1202,12 +1205,48 @@ export abstract class AbstractPool< if (executeTask) { this.executeTask( targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + this.popTask(workerNodeKey) as Task ) } else { this.enqueueTask( targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + this.popTask(workerNodeKey) as Task + ) + } + } + } + + private tasksStealingOnBackPressure (workerId: number): void { + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .filter((workerNode) => workerNode.info.id !== workerId) + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + if ( + workerNode.info.ready && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } else if ( + workerNode.info.ready && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() && + workerNode.usage.tasks.executing >= + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.enqueueTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task ) } } @@ -1387,6 +1426,10 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].dequeueTask() } + private popTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].popTask() + } + private tasksQueueSize (workerNodeKey: number): number { return this.workerNodes[workerNodeKey].tasksQueueSize() } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index d7f96e64..b539cabd 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -30,6 +30,8 @@ implements IWorkerNode { public usage: WorkerUsage /** @inheritdoc */ public tasksQueueBackPressureSize: number + /** @inheritdoc */ + public onBackPressure?: (workerId: number) => void private readonly taskFunctionsUsage: Map private readonly tasksQueue: Deque> @@ -90,7 +92,20 @@ implements IWorkerNode { /** @inheritdoc */ public enqueueTask (task: Task): number { - return this.tasksQueue.push(task) + const tasksQueueSize = this.tasksQueue.push(task) + if (this.onBackPressure != null && this.hasBackPressure()) { + this.once(this.onBackPressure)(this.info.id as number) + } + return tasksQueueSize + } + + /** @inheritdoc */ + public unshiftTask (task: Task): number { + const tasksQueueSize = this.tasksQueue.unshift(task) + if (this.onBackPressure != null && this.hasBackPressure()) { + this.once(this.onBackPressure)(this.info.id as number) + } + return tasksQueueSize } /** @inheritdoc */ @@ -98,6 +113,11 @@ implements IWorkerNode { return this.tasksQueue.shift() } + /** @inheritdoc */ + public popTask (): Task | undefined { + return this.tasksQueue.pop() + } + /** @inheritdoc */ public clearTasksQueue (): void { this.tasksQueue.clear() @@ -251,4 +271,25 @@ implements IWorkerNode { return worker.id } } + + /** + * Executes a function once at a time. + */ + + private once ( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + fn: (...args: any[]) => void, + context = this + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ): (...args: any[]) => void { + let called = false + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function (...args: any[]): void { + if (!called) { + called = true + fn.apply(context, args) + called = false + } + } + } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index e6dd0fae..6b387a96 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -224,6 +224,12 @@ export interface IWorkerNode { * This is the number of tasks that can be enqueued before the worker node has back pressure. */ tasksQueueBackPressureSize: number + /** + * Callback invoked when worker node tasks queue is back pressured. + * + * @param workerId - The worker id. + */ + onBackPressure?: (workerId: number) => void /** * Tasks queue size. * @@ -237,12 +243,25 @@ export interface IWorkerNode { * @returns The tasks queue size. */ readonly enqueueTask: (task: Task) => number + /** + * Prepends a task to the tasks queue. + * + * @param task - The task to prepend. + * @returns The tasks queue size. + */ + readonly unshiftTask: (task: Task) => number /** * Dequeue task. * * @returns The dequeued task. */ readonly dequeueTask: () => Task | undefined + /** + * Pops a task from the tasks queue. + * + * @returns The popped task. + */ + readonly popTask: () => Task | undefined /** * Clears tasks queue. */ -- 2.34.1