fix: revert incorrect change
[poolifier.git] / src / pools / worker-node.ts
index fbd00096a769d53cd5d3d9415fae64c428cfb3ee..03fc0df192f3ce6689306ba3f6eb38c12dabed40 100644 (file)
@@ -9,6 +9,8 @@ import {
 } from '../utils'
 import { Deque } from '../deque'
 import {
+  type BackPressureCallback,
+  type EmptyQueueCallback,
   type IWorker,
   type IWorkerNode,
   type WorkerInfo,
@@ -30,18 +32,18 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public readonly info: WorkerInfo
   /** @inheritdoc */
-  public messageChannel?: MessageChannel
-  /** @inheritdoc */
   public usage: WorkerUsage
   /** @inheritdoc */
+  public messageChannel?: MessageChannel
+  /** @inheritdoc */
   public tasksQueueBackPressureSize: number
   /** @inheritdoc */
-  public onBackPressure?: (workerId: number) => void
+  public onBackPressure?: BackPressureCallback
   /** @inheritdoc */
-  public onEmptyQueue?: (workerId: number) => void
-  private readonly taskFunctionsUsage: Map<string, WorkerUsage>
+  public onEmptyQueue?: EmptyQueueCallback
   private readonly tasksQueue: Deque<Task<Data>>
   private onEmptyQueueCount: number
+  private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
    * Constructs a new worker node.
@@ -75,14 +77,14 @@ implements IWorkerNode<Worker, Data> {
     }
     this.worker = worker
     this.info = this.initWorkerInfo(worker, workerType)
+    this.usage = this.initWorkerUsage()
     if (workerType === WorkerTypes.thread) {
       this.messageChannel = new MessageChannel()
     }
-    this.usage = this.initWorkerUsage()
-    this.taskFunctionsUsage = new Map<string, WorkerUsage>()
-    this.tasksQueue = new Deque<Task<Data>>()
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+    this.tasksQueue = new Deque<Task<Data>>()
     this.onEmptyQueueCount = 0
+    this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
   /** @inheritdoc */
@@ -180,13 +182,12 @@ implements IWorkerNode<Worker, Data> {
   private async startOnEmptyQueue (): Promise<void> {
     if (
       this.onEmptyQueueCount > 0 &&
-      this.usage.tasks.executing > 0 &&
-      this.tasksQueue.size > 0
+      (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
     ) {
       this.onEmptyQueueCount = 0
       return
     }
-    (this.onEmptyQueue as (workerId: number) => void)(this.info.id as number)
+    (this.onEmptyQueue as EmptyQueueCallback)(this.info.id as number)
     ++this.onEmptyQueueCount
     await sleep(exponentialDelay(this.onEmptyQueueCount))
     await this.startOnEmptyQueue()