feat: fire tasks stealing at worker node idling
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 24 Nov 2023 20:49:00 +0000 (21:49 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 24 Nov 2023 20:49:00 +0000 (21:49 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
README.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
tests/pools/worker-node.test.mjs

index df07fdbb85048c98631b85cc8ff3949ccda3ab1e..8afbdaa189c3bc2bd49b8bddd27fb42104221dde 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- Make continuous tasks stealing start at worker node idling.
+
 ## [3.0.6] - 2023-11-24
 
 ### Fixed
index b2b80c2b02e9d2369c7fbbcf470df3990102718b..68e9fe6589ff64573e18eb25a3bd48a8b3ba7650 100644 (file)
--- a/README.md
+++ b/README.md
@@ -44,7 +44,7 @@ Please consult our [general guidelines](#general-guidelines).
 - Tasks distribution strategies :white_check_mark:
 - Lockless tasks queueing :white_check_mark:
 - Queued tasks rescheduling:
-  - Task stealing on empty queue :white_check_mark:
+  - Task stealing on idle :white_check_mark:
   - Tasks stealing under back pressure :white_check_mark:
   - Tasks redistribution on worker error :white_check_mark:
 - General guidelines on pool choice :white_check_mark:
index 8b6ffeb91bb8d5c4627f46ac6a33e77361b1297c..28282ac2314bc05322098d09ad5ad0d08a163595 100644 (file)
@@ -136,7 +136,7 @@ An object with these properties:
 
   - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
   - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
-  - `taskStealing` (optional) - Task stealing enablement on empty queue.
+  - `taskStealing` (optional) - Task stealing enablement on idle.
   - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure.
 
   Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
index b14801b69269b43cfe878dee88ef4987151dfb33..8c2dcc31d2e849c0fdbf4e0ed9d8bfa297c585de 100644 (file)
@@ -628,8 +628,8 @@ export abstract class AbstractPool<
   private setTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].addEventListener(
-        'emptyQueue',
-        this.handleEmptyQueueEvent as EventListener
+        'idleWorkerNode',
+        this.handleIdleWorkerNodeEvent as EventListener
       )
     }
   }
@@ -637,8 +637,8 @@ export abstract class AbstractPool<
   private unsetTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].removeEventListener(
-        'emptyQueue',
-        this.handleEmptyQueueEvent as EventListener
+        'idleWorkerNode',
+        this.handleIdleWorkerNodeEvent as EventListener
       )
     }
   }
@@ -1401,8 +1401,8 @@ export abstract class AbstractPool<
     if (this.opts.enableTasksQueue === true) {
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
         this.workerNodes[workerNodeKey].addEventListener(
-          'emptyQueue',
-          this.handleEmptyQueueEvent as EventListener
+          'idleWorkerNode',
+          this.handleIdleWorkerNodeEvent as EventListener
         )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
@@ -1478,7 +1478,7 @@ export abstract class AbstractPool<
     }
   }
 
-  private readonly handleEmptyQueueEvent = (
+  private readonly handleIdleWorkerNodeEvent = (
     event: CustomEvent<WorkerNodeEventDetail>
   ): void => {
     const { workerId } = event.detail
index 581adc952c1e5a1a1b7e1f11c033283dc5df5fa5..b1b504d59b22bba2aca61becc898fa87167c5c6c 100644 (file)
@@ -110,7 +110,7 @@ export interface TasksQueueOptions {
    */
   readonly concurrency?: number
   /**
-   * Whether to enable task stealing on empty queue.
+   * Whether to enable task stealing on idle.
    *
    * @defaultValue true
    */
index ef0133ac716b2a01087799154f3ff414518309cc..34a647fbd42f0b8b47fcc32d99d3784ce95242b8 100644 (file)
@@ -45,7 +45,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   public tasksQueueBackPressureSize: number
   private readonly tasksQueue: Deque<Task<Data>>
   private onBackPressureStarted: boolean
-  private onEmptyQueueCount: number
+  private onIdleWorkerNodeCount: number
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
@@ -66,7 +66,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
-    this.onEmptyQueueCount = 0
+    this.onIdleWorkerNodeCount = 0
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
@@ -108,8 +108,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
     const task = this.tasksQueue.shift()
-    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
-      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
     }
     return task
   }
@@ -117,8 +117,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
     const task = this.tasksQueue.pop()
-    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
-      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
     }
     return task
   }
@@ -179,22 +179,26 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.delete(name)
   }
 
-  private async startOnEmptyQueue (): Promise<void> {
+  private async startOnIdleWorkerNode (): Promise<void> {
     if (
-      this.onEmptyQueueCount > 0 &&
+      this.onIdleWorkerNodeCount > 0 &&
       (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
     ) {
-      this.onEmptyQueueCount = 0
+      this.onIdleWorkerNodeCount = 0
       return
     }
-    ++this.onEmptyQueueCount
+    ++this.onIdleWorkerNodeCount
     this.dispatchEvent(
-      new CustomEvent<WorkerNodeEventDetail>('emptyQueue', {
+      new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
         detail: { workerId: this.info.id as number }
       })
     )
-    await sleep(exponentialDelay(this.onEmptyQueueCount))
-    await this.startOnEmptyQueue()
+    await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
+    await this.startOnIdleWorkerNode()
+  }
+
+  private isIdle (): boolean {
+    return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
   }
 
   private initWorkerInfo (worker: Worker): WorkerInfo {
index d524c464ee2d92139bb243b22b5b93749711e451..b72c28872dc65b04f55354403cd423ea1b369916 100644 (file)
@@ -84,7 +84,7 @@ describe('Worker node test suite', () => {
       threadWorkerNode.tasksQueue.size
     )
     expect(threadWorkerNode.onBackPressureStarted).toBe(false)
-    expect(threadWorkerNode.onEmptyQueueCount).toBe(0)
+    expect(threadWorkerNode.onIdleWorkerNodeCount).toBe(0)
     expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
 
     expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
@@ -127,7 +127,7 @@ describe('Worker node test suite', () => {
       clusterWorkerNode.tasksQueue.size
     )
     expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
-    expect(clusterWorkerNode.onEmptyQueueCount).toBe(0)
+    expect(clusterWorkerNode.onIdleWorkerNodeCount).toBe(0)
     expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
   })