From: Jérôme Benoit Date: Fri, 24 Nov 2023 20:49:00 +0000 (+0100) Subject: feat: fire tasks stealing at worker node idling X-Git-Tag: v3.0.7~2 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=65542a35fd6759cddb82167dd4c47f9bed843ebf;p=poolifier.git feat: fire tasks stealing at worker node idling Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index df07fdbb..8afbdaa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Make continuous tasks stealing start at worker node idling. + ## [3.0.6] - 2023-11-24 ### Fixed diff --git a/README.md b/README.md index b2b80c2b..68e9fe65 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ Please consult our [general guidelines](#general-guidelines). - Tasks distribution strategies :white_check_mark: - Lockless tasks queueing :white_check_mark: - Queued tasks rescheduling: - - Task stealing on empty queue :white_check_mark: + - Task stealing on idle :white_check_mark: - Tasks stealing under back pressure :white_check_mark: - Tasks redistribution on worker error :white_check_mark: - General guidelines on pool choice :white_check_mark: diff --git a/docs/api.md b/docs/api.md index 8b6ffeb9..28282ac2 100644 --- a/docs/api.md +++ b/docs/api.md @@ -136,7 +136,7 @@ An object with these properties: - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer. - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer. - - `taskStealing` (optional) - Task stealing enablement on empty queue. + - `taskStealing` (optional) - Task stealing enablement on idle. - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure. Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b14801b6..8c2dcc31 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -628,8 +628,8 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].addEventListener( - 'emptyQueue', - this.handleEmptyQueueEvent as EventListener + 'idleWorkerNode', + this.handleIdleWorkerNodeEvent as EventListener ) } } @@ -637,8 +637,8 @@ export abstract class AbstractPool< private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].removeEventListener( - 'emptyQueue', - this.handleEmptyQueueEvent as EventListener + 'idleWorkerNode', + this.handleIdleWorkerNodeEvent as EventListener ) } } @@ -1401,8 +1401,8 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].addEventListener( - 'emptyQueue', - this.handleEmptyQueueEvent as EventListener + 'idleWorkerNode', + this.handleIdleWorkerNodeEvent as EventListener ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { @@ -1478,7 +1478,7 @@ export abstract class AbstractPool< } } - private readonly handleEmptyQueueEvent = ( + private readonly handleIdleWorkerNodeEvent = ( event: CustomEvent ): void => { const { workerId } = event.detail diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 581adc95..b1b504d5 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -110,7 +110,7 @@ export interface TasksQueueOptions { */ readonly concurrency?: number /** - * Whether to enable task stealing on empty queue. + * Whether to enable task stealing on idle. * * @defaultValue true */ diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index ef0133ac..34a647fb 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -45,7 +45,7 @@ export class WorkerNode public tasksQueueBackPressureSize: number private readonly tasksQueue: Deque> private onBackPressureStarted: boolean - private onEmptyQueueCount: number + private onIdleWorkerNodeCount: number private readonly taskFunctionsUsage: Map /** @@ -66,7 +66,7 @@ export class WorkerNode this.tasksQueueBackPressureSize = tasksQueueBackPressureSize this.tasksQueue = new Deque>() this.onBackPressureStarted = false - this.onEmptyQueueCount = 0 + this.onIdleWorkerNodeCount = 0 this.taskFunctionsUsage = new Map() } @@ -108,8 +108,8 @@ export class WorkerNode /** @inheritdoc */ public dequeueTask (): Task | undefined { const task = this.tasksQueue.shift() - if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) { - this.startOnEmptyQueue().catch(EMPTY_FUNCTION) + if (this.isIdle() && this.onIdleWorkerNodeCount === 0) { + this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION) } return task } @@ -117,8 +117,8 @@ export class WorkerNode /** @inheritdoc */ public popTask (): Task | undefined { const task = this.tasksQueue.pop() - if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) { - this.startOnEmptyQueue().catch(EMPTY_FUNCTION) + if (this.isIdle() && this.onIdleWorkerNodeCount === 0) { + this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION) } return task } @@ -179,22 +179,26 @@ export class WorkerNode return this.taskFunctionsUsage.delete(name) } - private async startOnEmptyQueue (): Promise { + private async startOnIdleWorkerNode (): Promise { if ( - this.onEmptyQueueCount > 0 && + this.onIdleWorkerNodeCount > 0 && (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0) ) { - this.onEmptyQueueCount = 0 + this.onIdleWorkerNodeCount = 0 return } - ++this.onEmptyQueueCount + ++this.onIdleWorkerNodeCount this.dispatchEvent( - new CustomEvent('emptyQueue', { + new CustomEvent('idleWorkerNode', { detail: { workerId: this.info.id as number } }) ) - await sleep(exponentialDelay(this.onEmptyQueueCount)) - await this.startOnEmptyQueue() + await sleep(exponentialDelay(this.onIdleWorkerNodeCount)) + await this.startOnIdleWorkerNode() + } + + private isIdle (): boolean { + return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0 } private initWorkerInfo (worker: Worker): WorkerInfo { diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index d524c464..b72c2887 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -84,7 +84,7 @@ describe('Worker node test suite', () => { threadWorkerNode.tasksQueue.size ) expect(threadWorkerNode.onBackPressureStarted).toBe(false) - expect(threadWorkerNode.onEmptyQueueCount).toBe(0) + expect(threadWorkerNode.onIdleWorkerNodeCount).toBe(0) expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) @@ -127,7 +127,7 @@ describe('Worker node test suite', () => { clusterWorkerNode.tasksQueue.size ) expect(clusterWorkerNode.onBackPressureStarted).toBe(false) - expect(clusterWorkerNode.onEmptyQueueCount).toBe(0) + expect(clusterWorkerNode.onIdleWorkerNodeCount).toBe(0) expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) })