From dd95187664018669192560e65e3b996a8143b699 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 23 Aug 2023 23:21:11 +0200 Subject: [PATCH] fix: fix queued tasks rescheduling MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 8 ++++ README.md | 3 +- src/pools/abstract-pool.ts | 86 ++++++++++++++++++++++++++------------ src/pools/worker-node.ts | 27 ++++++------ src/pools/worker.ts | 6 +++ 5 files changed, 89 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89b19186..18942cb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix queued tasks rescheduling. + ### Changed - Rename tasks queue options `queueMaxSize` to `size`. + + ## [2.6.32] - 2023-08-23 ### Fixed diff --git a/README.md b/README.md index eae0023f..08de7a85 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,9 @@ Please consult our [general guidelines](#general-guidelines). - Tasks distribution strategies :white_check_mark: - Lockless tasks queueing :white_check_mark: - Queued tasks rescheduling: - - Tasks redistribution on worker error :white_check_mark: + - Tasks stealing under back pressure :white_check_mark: + - Tasks redistribution on worker error :white_check_mark: - General guidelines on pool choice :white_check_mark: - Error handling out of the box :white_check_mark: - Widely tested :white_check_mark: diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3a1a8780..ee291ff9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1156,6 +1156,8 @@ export abstract class AbstractPool< // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { + // this.workerNodes[workerNodeKey].onEmptyQueue = + // this.taskStealingOnEmptyQueue.bind(this) this.workerNodes[workerNodeKey].onBackPressure = this.tasksStealingOnBackPressure.bind(this) } @@ -1188,10 +1190,11 @@ export abstract class AbstractPool< private redistributeQueuedTasks (workerNodeKey: number): void { const workerNodes = this.workerNodes.filter( - (_, workerNodeId) => workerNodeId !== workerNodeKey + (workerNode, workerNodeId) => + workerNode.info.ready && workerNodeId !== workerNodeKey ) while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of workerNodes.entries()) { @@ -1201,28 +1204,58 @@ export abstract class AbstractPool< ) { executeTask = true } - if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) { - targetWorkerNodeKey = workerNodeId + if (workerNode.usage.tasks.queued === 0) { + destinationWorkerNodeKey = workerNodeId break } - if ( - workerNode.info.ready && - workerNode.usage.tasks.queued < minQueuedTasks - ) { + if (workerNode.usage.tasks.queued < minQueuedTasks) { minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId + destinationWorkerNodeKey = workerNodeId } } + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) + .id as number + } if (executeTask) { - this.executeTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.executeTask(destinationWorkerNodeKey, task) } else { - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.enqueueTask(destinationWorkerNodeKey, task) + } + } + } + + private taskStealingOnEmptyQueue (workerId: number): void { + const workerNodes = this.workerNodes + .filter( + (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId + ) + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = workerNodes[destinationWorkerNodeKey] + for (const sourceWorkerNode of workerNodes) { + if (sourceWorkerNode.usage.tasks.queued > 0) { + if ( + destinationWorkerNode?.usage?.tasks?.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number + } + this.executeTask(destinationWorkerNodeKey, task) + } else { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number + } + this.enqueueTask(destinationWorkerNodeKey, task) + } + break } } } @@ -1231,30 +1264,29 @@ export abstract class AbstractPool< const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes - .filter((workerNode) => workerNode.info.id !== workerId) + .filter( + (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId + ) .sort( (workerNodeA, workerNodeB) => workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued ) for (const [workerNodeKey, workerNode] of workerNodes.entries()) { if ( - workerNode.info.ready && sourceWorkerNode.usage.tasks.queued > 0 && !workerNode.hasBackPressure() ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: workerNode.info.id as number + } if ( workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { - this.executeTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + this.executeTask(workerNodeKey, task) } else { - this.enqueueTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + this.enqueueTask(workerNodeKey, task) } } } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index d8044032..116fb844 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -32,6 +32,8 @@ implements IWorkerNode { public tasksQueueBackPressureSize: number /** @inheritdoc */ public onBackPressure?: (workerId: number) => void + /** @inheritdoc */ + public onEmptyQueue?: (workerId: number) => void private readonly taskFunctionsUsage: Map private readonly tasksQueue: Deque> @@ -81,15 +83,6 @@ implements IWorkerNode { return this.tasksQueue.size } - /** - * Tasks queue maximum size. - * - * @returns The tasks queue maximum size. - */ - private tasksQueueMaxSize (): number { - return this.tasksQueue.maxSize - } - /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.push(task) @@ -110,12 +103,20 @@ implements IWorkerNode { /** @inheritdoc */ public dequeueTask (): Task | undefined { - return this.tasksQueue.shift() + const task = this.tasksQueue.shift() + if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + once(this.onEmptyQueue, this)(this.info.id as number) + } + return task } /** @inheritdoc */ public popTask (): Task | undefined { - return this.tasksQueue.pop() + const task = this.tasksQueue.pop() + if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + once(this.onEmptyQueue, this)(this.info.id as number) + } + return task } /** @inheritdoc */ @@ -180,10 +181,10 @@ implements IWorkerNode { private initWorkerUsage (): WorkerUsage { const getTasksQueueSize = (): number => { - return this.tasksQueueSize() + return this.tasksQueue.size } const getTasksQueueMaxSize = (): number => { - return this.tasksQueueMaxSize() + return this.tasksQueue.maxSize } return { tasks: { diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 6b387a96..14c8dbe1 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -230,6 +230,12 @@ export interface IWorkerNode { * @param workerId - The worker id. */ onBackPressure?: (workerId: number) => void + /** + * Callback invoked when worker node tasks queue is empty. + * + * @param workerId - The worker id. + */ + onEmptyQueue?: (workerId: number) => void /** * Tasks queue size. * -- 2.34.1