From: Jérôme Benoit Date: Wed, 23 Aug 2023 22:03:23 +0000 (+0200) Subject: fix: fix task stealing X-Git-Tag: v2.6.33~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=a6b3272bcb95d9ea8103bf39a1e7f6a42bba7b6f;p=poolifier.git fix: fix task stealing Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 18942cb6..18d9e42e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,9 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Rename tasks queue options `queueMaxSize` to `size`. - +- Task stealing scheduling algorithm if tasks queueing is enabled. ## [2.6.32] - 2023-08-23 diff --git a/README.md b/README.md index 08de7a85..47a680d0 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ Please consult our [general guidelines](#general-guidelines). - Tasks distribution strategies :white_check_mark: - Lockless tasks queueing :white_check_mark: - Queued tasks rescheduling: - + - Task stealing :white_check_mark: - Tasks stealing under back pressure :white_check_mark: - Tasks redistribution on worker error :white_check_mark: - General guidelines on pool choice :white_check_mark: diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ee291ff9..c6712d19 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1156,8 +1156,8 @@ export abstract class AbstractPool< // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { - // this.workerNodes[workerNodeKey].onEmptyQueue = - // this.taskStealingOnEmptyQueue.bind(this) + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) this.workerNodes[workerNodeKey].onBackPressure = this.tasksStealingOnBackPressure.bind(this) } @@ -1189,26 +1189,32 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { - const workerNodes = this.workerNodes.filter( - (workerNode, workerNodeId) => - workerNode.info.ready && workerNodeId !== workerNodeKey - ) while (this.tasksQueueSize(workerNodeKey) > 0) { let destinationWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false - for (const [workerNodeId, workerNode] of workerNodes.entries()) { + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { if ( + workerNode.info.ready && + workerNodeId !== workerNodeKey && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { executeTask = true } - if (workerNode.usage.tasks.queued === 0) { + if ( + workerNode.info.ready && + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued === 0 + ) { destinationWorkerNodeKey = workerNodeId break } - if (workerNode.usage.tasks.queued < minQueuedTasks) { + if ( + workerNode.info.ready && + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued < minQueuedTasks + ) { minQueuedTasks = workerNode.usage.tasks.queued destinationWorkerNodeKey = workerNodeId } @@ -1227,32 +1233,30 @@ export abstract class AbstractPool< } private taskStealingOnEmptyQueue (workerId: number): void { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const workerNodes = this.workerNodes - .filter( - (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId - ) + .slice() .sort( (workerNodeA, workerNodeB) => workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) - const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const destinationWorkerNode = workerNodes[destinationWorkerNodeKey] for (const sourceWorkerNode of workerNodes) { - if (sourceWorkerNode.usage.tasks.queued > 0) { + if ( + sourceWorkerNode.info.ready && + sourceWorkerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number + } if ( - destinationWorkerNode?.usage?.tasks?.executing < + destinationWorkerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } this.executeTask(destinationWorkerNodeKey, task) } else { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } this.enqueueTask(destinationWorkerNodeKey, task) } break @@ -1264,15 +1268,15 @@ export abstract class AbstractPool< const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes - .filter( - (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId - ) + .slice() .sort( (workerNodeA, workerNodeB) => workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued ) for (const [workerNodeKey, workerNode] of workerNodes.entries()) { if ( + workerNode.info.ready && + workerNode.info.id !== workerId && sourceWorkerNode.usage.tasks.queued > 0 && !workerNode.hasBackPressure() ) { diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index a5790db0..900f0edc 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -139,7 +139,10 @@ describe('Fixed cluster pool test suite', () => { expect(queuePool.info.backPressure).toBe(false) await Promise.all(promises) for (const workerNode of queuePool.workerNodes) { - expect(workerNode.usage.tasks.executing).toBe(0) + expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) expect(workerNode.usage.tasks.executed).toBe(maxMultiplier) expect(workerNode.usage.tasks.queued).toBe(0) expect(workerNode.usage.tasks.maxQueued).toBe( diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 4c2c6952..8cd337ee 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -139,7 +139,10 @@ describe('Fixed thread pool test suite', () => { expect(queuePool.info.backPressure).toBe(false) await Promise.all(promises) for (const workerNode of queuePool.workerNodes) { - expect(workerNode.usage.tasks.executing).toBe(0) + expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( + numberOfThreads * maxMultiplier + ) expect(workerNode.usage.tasks.executed).toBe(maxMultiplier) expect(workerNode.usage.tasks.queued).toBe(0) expect(workerNode.usage.tasks.maxQueued).toBe(