feat: fire tasks stealing at worker node idling
[poolifier.git] / src / pools / worker-node.ts
index ef0133ac716b2a01087799154f3ff414518309cc..34a647fbd42f0b8b47fcc32d99d3784ce95242b8 100644 (file)
@@ -45,7 +45,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   public tasksQueueBackPressureSize: number
   private readonly tasksQueue: Deque<Task<Data>>
   private onBackPressureStarted: boolean
-  private onEmptyQueueCount: number
+  private onIdleWorkerNodeCount: number
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
@@ -66,7 +66,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
-    this.onEmptyQueueCount = 0
+    this.onIdleWorkerNodeCount = 0
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
@@ -108,8 +108,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
     const task = this.tasksQueue.shift()
-    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
-      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
     }
     return task
   }
@@ -117,8 +117,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
     const task = this.tasksQueue.pop()
-    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
-      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
     }
     return task
   }
@@ -179,22 +179,26 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.delete(name)
   }
 
-  private async startOnEmptyQueue (): Promise<void> {
+  private async startOnIdleWorkerNode (): Promise<void> {
     if (
-      this.onEmptyQueueCount > 0 &&
+      this.onIdleWorkerNodeCount > 0 &&
       (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
     ) {
-      this.onEmptyQueueCount = 0
+      this.onIdleWorkerNodeCount = 0
       return
     }
-    ++this.onEmptyQueueCount
+    ++this.onIdleWorkerNodeCount
     this.dispatchEvent(
-      new CustomEvent<WorkerNodeEventDetail>('emptyQueue', {
+      new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
         detail: { workerId: this.info.id as number }
       })
     )
-    await sleep(exponentialDelay(this.onEmptyQueueCount))
-    await this.startOnEmptyQueue()
+    await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
+    await this.startOnIdleWorkerNode()
+  }
+
+  private isIdle (): boolean {
+    return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
   }
 
   private initWorkerInfo (worker: Worker): WorkerInfo {