fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / src / pools / worker-node.ts
index 34a647fbd42f0b8b47fcc32d99d3784ce95242b8..3de3cb809cda9a3a3f2787579371a467e3f9d200 100644 (file)
@@ -1,14 +1,7 @@
 import { MessageChannel } from 'node:worker_threads'
 import { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
-import {
-  DEFAULT_TASK_NAME,
-  EMPTY_FUNCTION,
-  exponentialDelay,
-  getWorkerId,
-  getWorkerType,
-  sleep
-} from '../utils'
+import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
 import { Deque } from '../deque'
 import {
   type IWorker,
@@ -45,7 +38,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   public tasksQueueBackPressureSize: number
   private readonly tasksQueue: Deque<Task<Data>>
   private onBackPressureStarted: boolean
-  private onIdleWorkerNodeCount: number
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
@@ -66,7 +58,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
-    this.onIdleWorkerNodeCount = 0
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
@@ -107,20 +98,12 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
 
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
-    const task = this.tasksQueue.shift()
-    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
-      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
-    }
-    return task
+    return this.tasksQueue.shift()
   }
 
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
-    const task = this.tasksQueue.pop()
-    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
-      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
-    }
-    return task
+    return this.tasksQueue.pop()
   }
 
   /** @inheritdoc */
@@ -179,28 +162,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.delete(name)
   }
 
-  private async startOnIdleWorkerNode (): Promise<void> {
-    if (
-      this.onIdleWorkerNodeCount > 0 &&
-      (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
-    ) {
-      this.onIdleWorkerNodeCount = 0
-      return
-    }
-    ++this.onIdleWorkerNodeCount
-    this.dispatchEvent(
-      new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
-        detail: { workerId: this.info.id as number }
-      })
-    )
-    await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
-    await this.startOnIdleWorkerNode()
-  }
-
-  private isIdle (): boolean {
-    return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
-  }
-
   private initWorkerInfo (worker: Worker): WorkerInfo {
     return {
       id: getWorkerId(worker),
@@ -227,6 +188,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
         get maxQueued (): number {
           return getTasksQueueMaxSize()
         },
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
@@ -268,6 +230,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
         get queued (): number {
           return getTaskFunctionQueueSize()
         },
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },