refactor: cleanup type definition
[poolifier.git] / src / pools / worker-node.ts
index 9ecfd012f9b5402c4676004f24f7c04ecc4bf020..f3ec3be3d83f4541da3713cbd46406d19be41ac0 100644 (file)
@@ -17,6 +17,9 @@ import {
   type WorkerUsage
 } from './worker'
 
+type EmptyQueueCallback = (workerId: number) => void
+type BackPressureCallback = EmptyQueueCallback
+
 /**
  * Worker node.
  *
@@ -30,18 +33,18 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public readonly info: WorkerInfo
   /** @inheritdoc */
-  public messageChannel?: MessageChannel
-  /** @inheritdoc */
   public usage: WorkerUsage
   /** @inheritdoc */
+  public messageChannel?: MessageChannel
+  /** @inheritdoc */
   public tasksQueueBackPressureSize: number
   /** @inheritdoc */
-  public onBackPressure?: (workerId: number) => void
+  public onBackPressure?: BackPressureCallback
   /** @inheritdoc */
-  public onEmptyQueue?: (workerId: number) => void
-  private readonly taskFunctionsUsage: Map<string, WorkerUsage>
+  public onEmptyQueue?: EmptyQueueCallback
   private readonly tasksQueue: Deque<Task<Data>>
   private onEmptyQueueCount: number
+  private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
    * Constructs a new worker node.
@@ -75,14 +78,14 @@ implements IWorkerNode<Worker, Data> {
     }
     this.worker = worker
     this.info = this.initWorkerInfo(worker, workerType)
+    this.usage = this.initWorkerUsage()
     if (workerType === WorkerTypes.thread) {
       this.messageChannel = new MessageChannel()
     }
-    this.usage = this.initWorkerUsage()
-    this.taskFunctionsUsage = new Map<string, WorkerUsage>()
-    this.tasksQueue = new Deque<Task<Data>>()
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+    this.tasksQueue = new Deque<Task<Data>>()
     this.onEmptyQueueCount = 0
+    this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
   /** @inheritdoc */
@@ -178,11 +181,14 @@ implements IWorkerNode<Worker, Data> {
   }
 
   private async startOnEmptyQueue (): Promise<void> {
-    if (this.tasksQueue.size > 0) {
+    if (
+      this.onEmptyQueueCount > 0 &&
+      (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
+    ) {
       this.onEmptyQueueCount = 0
       return
     }
-    (this.onEmptyQueue as (workerId: number) => void)(this.info.id as number)
+    (this.onEmptyQueue as EmptyQueueCallback)(this.info.id as number)
     ++this.onEmptyQueueCount
     await sleep(exponentialDelay(this.onEmptyQueueCount))
     await this.startOnEmptyQueue()