From 0d4e88b32dcc9af05423c40e049fb2693012b6d8 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 8 May 2024 11:43:09 +0200 Subject: [PATCH] perf: optimize task(s) stealing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++++ src/pools/abstract-pool.ts | 11 ++++------- src/pools/worker-node.ts | 6 ++++++ src/pools/worker.ts | 6 ++++++ src/priority-queue.ts | 12 ++++++++---- tests/priority-queue.test.mjs | 30 ++++++++++++++++++++++++++++++ 6 files changed, 58 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa5c772d..ab0615e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Optimize task(s) stealing by dequeuing task(s) from the last prioritized bucket. + ## [4.0.2] - 2024-05-06 ### Fixed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ad7adce3..065d3996 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1817,7 +1817,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueTask(1)! + const task = sourceWorkerNode.dequeueLastBucketTask()! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1868,7 +1868,7 @@ export abstract class AbstractPool< } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueTask(1)! + const task = sourceWorkerNode.dequeueLastBucketTask()! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) @@ -2105,11 +2105,8 @@ export abstract class AbstractPool< return tasksQueueSize } - private dequeueTask ( - workerNodeKey: number, - bucket?: number - ): Task | undefined { - return this.workerNodes[workerNodeKey].dequeueTask(bucket) + private dequeueTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].dequeueTask() } private tasksQueueSize (workerNodeKey: number): number { diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 64400d9e..5d28802c 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -95,6 +95,12 @@ export class WorkerNode return this.tasksQueue.dequeue(bucket) } + /** @inheritdoc */ + public dequeueLastBucketTask (): Task | undefined { + // Start from the last empty or partially filled bucket + return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1) + } + /** @inheritdoc */ public clearTasksQueue (): void { this.tasksQueue.clear() diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 35d9f755..1a9973ba 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -325,6 +325,12 @@ export interface IWorkerNode * @returns The dequeued task. */ readonly dequeueTask: (bucket?: number) => Task | undefined + /** + * Dequeue last bucket task. + * + * @returns The dequeued task. + */ + readonly dequeueLastBucketTask: () => Task | undefined /** * Clears tasks queue. */ diff --git a/src/priority-queue.ts b/src/priority-queue.ts index 64b5a924..1c0d8c92 100644 --- a/src/priority-queue.ts +++ b/src/priority-queue.ts @@ -26,6 +26,13 @@ export class PriorityQueue { /** The maximum size of the priority queue. */ public maxSize!: number + /** + * The number of filled prioritized buckets. + */ + public get buckets (): number { + return this.k === Infinity ? 1 : Math.trunc(this.nodeArray.length / this.k) + } + /** * Constructs a priority queue. * @@ -51,10 +58,7 @@ export class PriorityQueue { */ public enqueue (data: T, priority?: number): number { priority = priority ?? 0 - const startIndex = - this.k === Infinity || this.nodeArray.length / this.k < 1 - ? 0 - : Math.trunc(this.nodeArray.length / this.k) * this.k + const startIndex = this.k === Infinity ? 0 : this.buckets * this.k let inserted = false for (let index = startIndex; index < this.nodeArray.length; index++) { if (this.nodeArray[index].priority > priority) { diff --git a/tests/priority-queue.test.mjs b/tests/priority-queue.test.mjs index 58931859..624d577a 100644 --- a/tests/priority-queue.test.mjs +++ b/tests/priority-queue.test.mjs @@ -15,11 +15,13 @@ describe('Priority queue test suite', () => { ) let priorityQueue = new PriorityQueue() expect(priorityQueue.k).toBe(Infinity) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(0) expect(priorityQueue.maxSize).toBe(0) expect(priorityQueue.nodeArray).toStrictEqual([]) priorityQueue = new PriorityQueue(2) expect(priorityQueue.k).toBe(2) + expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(0) expect(priorityQueue.maxSize).toBe(0) expect(priorityQueue.nodeArray).toStrictEqual([]) @@ -28,11 +30,13 @@ describe('Priority queue test suite', () => { it('Verify default k enqueue() behavior', () => { const priorityQueue = new PriorityQueue() let rtSize = priorityQueue.enqueue(1) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(1) expect(priorityQueue.maxSize).toBe(1) expect(rtSize).toBe(priorityQueue.size) expect(priorityQueue.nodeArray).toStrictEqual([{ data: 1, priority: 0 }]) rtSize = priorityQueue.enqueue(2) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(2) expect(priorityQueue.maxSize).toBe(2) expect(rtSize).toBe(priorityQueue.size) @@ -41,6 +45,7 @@ describe('Priority queue test suite', () => { { data: 2, priority: 0 } ]) rtSize = priorityQueue.enqueue(3) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(3) expect(priorityQueue.maxSize).toBe(3) expect(rtSize).toBe(priorityQueue.size) @@ -50,6 +55,7 @@ describe('Priority queue test suite', () => { { data: 3, priority: 0 } ]) rtSize = priorityQueue.enqueue(3, -1) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(4) expect(priorityQueue.maxSize).toBe(4) expect(rtSize).toBe(priorityQueue.size) @@ -60,6 +66,7 @@ describe('Priority queue test suite', () => { { data: 3, priority: 0 } ]) rtSize = priorityQueue.enqueue(1, 1) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(5) expect(priorityQueue.maxSize).toBe(5) expect(rtSize).toBe(priorityQueue.size) @@ -75,11 +82,13 @@ describe('Priority queue test suite', () => { it('Verify k=2 enqueue() behavior', () => { const priorityQueue = new PriorityQueue(2) let rtSize = priorityQueue.enqueue(1) + expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(1) expect(priorityQueue.maxSize).toBe(1) expect(rtSize).toBe(priorityQueue.size) expect(priorityQueue.nodeArray).toStrictEqual([{ data: 1, priority: 0 }]) rtSize = priorityQueue.enqueue(2) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(2) expect(priorityQueue.maxSize).toBe(2) expect(rtSize).toBe(priorityQueue.size) @@ -88,6 +97,7 @@ describe('Priority queue test suite', () => { { data: 2, priority: 0 } ]) rtSize = priorityQueue.enqueue(3) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(3) expect(priorityQueue.maxSize).toBe(3) expect(rtSize).toBe(priorityQueue.size) @@ -97,6 +107,7 @@ describe('Priority queue test suite', () => { { data: 3, priority: 0 } ]) rtSize = priorityQueue.enqueue(3, -1) + expect(priorityQueue.buckets).toBe(2) expect(priorityQueue.size).toBe(4) expect(priorityQueue.maxSize).toBe(4) expect(rtSize).toBe(priorityQueue.size) @@ -107,6 +118,7 @@ describe('Priority queue test suite', () => { { data: 3, priority: 0 } ]) rtSize = priorityQueue.enqueue(1, 1) + expect(priorityQueue.buckets).toBe(2) expect(priorityQueue.size).toBe(5) expect(priorityQueue.maxSize).toBe(5) expect(rtSize).toBe(priorityQueue.size) @@ -118,6 +130,7 @@ describe('Priority queue test suite', () => { { data: 1, priority: 1 } ]) rtSize = priorityQueue.enqueue(3, -2) + expect(priorityQueue.buckets).toBe(3) expect(priorityQueue.size).toBe(6) expect(priorityQueue.maxSize).toBe(6) expect(rtSize).toBe(priorityQueue.size) @@ -136,7 +149,11 @@ describe('Priority queue test suite', () => { priorityQueue.enqueue(1) priorityQueue.enqueue(2, -1) priorityQueue.enqueue(3) + expect(priorityQueue.buckets).toBe(1) + expect(priorityQueue.size).toBe(3) + expect(priorityQueue.maxSize).toBe(3) let rtItem = priorityQueue.dequeue() + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(2) expect(priorityQueue.maxSize).toBe(3) expect(rtItem).toBe(2) @@ -145,11 +162,13 @@ describe('Priority queue test suite', () => { { data: 3, priority: 0 } ]) rtItem = priorityQueue.dequeue() + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(1) expect(priorityQueue.maxSize).toBe(3) expect(rtItem).toBe(1) expect(priorityQueue.nodeArray).toStrictEqual([{ data: 3, priority: 0 }]) rtItem = priorityQueue.dequeue() + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(0) expect(priorityQueue.maxSize).toBe(3) expect(rtItem).toBe(3) @@ -164,7 +183,11 @@ describe('Priority queue test suite', () => { priorityQueue.enqueue(3, -1) priorityQueue.enqueue(1, 1) priorityQueue.enqueue(3, -2) + expect(priorityQueue.buckets).toBe(3) + expect(priorityQueue.size).toBe(6) + expect(priorityQueue.maxSize).toBe(6) let rtItem = priorityQueue.dequeue(3) + expect(priorityQueue.buckets).toBe(2) expect(priorityQueue.size).toBe(5) expect(priorityQueue.maxSize).toBe(6) expect(rtItem).toBe(3) @@ -176,6 +199,7 @@ describe('Priority queue test suite', () => { { data: 1, priority: 1 } ]) rtItem = priorityQueue.dequeue() + expect(priorityQueue.buckets).toBe(2) expect(priorityQueue.size).toBe(4) expect(priorityQueue.maxSize).toBe(6) expect(rtItem).toBe(1) @@ -186,6 +210,7 @@ describe('Priority queue test suite', () => { { data: 1, priority: 1 } ]) rtItem = priorityQueue.dequeue(2) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(3) expect(priorityQueue.maxSize).toBe(6) expect(rtItem).toBe(3) @@ -195,6 +220,7 @@ describe('Priority queue test suite', () => { { data: 1, priority: 1 } ]) rtItem = priorityQueue.dequeue(2) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(2) expect(priorityQueue.maxSize).toBe(6) expect(rtItem).toBe(1) @@ -203,11 +229,13 @@ describe('Priority queue test suite', () => { { data: 3, priority: -1 } ]) rtItem = priorityQueue.dequeue(2) + expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(1) expect(priorityQueue.maxSize).toBe(6) expect(rtItem).toBe(2) expect(priorityQueue.nodeArray).toStrictEqual([{ data: 3, priority: -1 }]) rtItem = priorityQueue.dequeue() + expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(0) expect(priorityQueue.maxSize).toBe(6) expect(rtItem).toBe(3) @@ -251,6 +279,7 @@ describe('Priority queue test suite', () => { priorityQueue.enqueue(1) priorityQueue.enqueue(2) priorityQueue.enqueue(3) + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(3) expect(priorityQueue.maxSize).toBe(3) expect(priorityQueue.nodeArray).toStrictEqual([ @@ -259,6 +288,7 @@ describe('Priority queue test suite', () => { { data: 3, priority: 0 } ]) priorityQueue.clear() + expect(priorityQueue.buckets).toBe(1) expect(priorityQueue.size).toBe(0) expect(priorityQueue.maxSize).toBe(0) expect(priorityQueue.nodeArray).toStrictEqual([]) -- 2.34.1