fix: fix possible null exception with worker_threads pools
[poolifier.git] / src / pools / worker-node.ts
index de65f27012634c493d697a0ae9e3972a7f5234db..59c4de7a388e81434c373f1835b144c1b3c88f57 100644 (file)
@@ -1,21 +1,14 @@
 import { MessageChannel } from 'node:worker_threads'
+import { EventEmitter } from 'node:events'
 import { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
-import {
-  DEFAULT_TASK_NAME,
-  EMPTY_FUNCTION,
-  exponentialDelay,
-  getWorkerId,
-  getWorkerType,
-  sleep
-} from '../utils'
+import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
 import { Deque } from '../deque'
 import {
   type IWorker,
   type IWorkerNode,
   type StrategyData,
   type WorkerInfo,
-  type WorkerNodeEventDetail,
   type WorkerType,
   WorkerTypes,
   type WorkerUsage
@@ -29,7 +22,7 @@ import { checkWorkerNodeArguments } from './utils'
  * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
  */
 export class WorkerNode<Worker extends IWorker, Data = unknown>
-  extends EventTarget
+  extends EventEmitter
   implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public readonly worker: Worker
@@ -45,7 +38,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   public tasksQueueBackPressureSize: number
   private readonly tasksQueue: Deque<Task<Data>>
   private onBackPressureStarted: boolean
-  private onEmptyQueueCount: number
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
@@ -66,7 +58,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
-    this.onEmptyQueueCount = 0
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
@@ -80,11 +71,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     const tasksQueueSize = this.tasksQueue.push(task)
     if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
-      this.dispatchEvent(
-        new CustomEvent<WorkerNodeEventDetail>('backpressure', {
-          detail: { workerId: this.info.id as number }
-        })
-      )
+      this.emit('backPressure', { workerId: this.info.id as number })
       this.onBackPressureStarted = false
     }
     return tasksQueueSize
@@ -95,11 +82,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     const tasksQueueSize = this.tasksQueue.unshift(task)
     if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
-      this.dispatchEvent(
-        new CustomEvent<WorkerNodeEventDetail>('backpressure', {
-          detail: { workerId: this.info.id as number }
-        })
-      )
+      this.emit('backPressure', { workerId: this.info.id as number })
       this.onBackPressureStarted = false
     }
     return tasksQueueSize
@@ -107,20 +90,12 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
 
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
-    const task = this.tasksQueue.shift()
-    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
-      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
-    }
-    return task
+    return this.tasksQueue.shift()
   }
 
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
-    const task = this.tasksQueue.pop()
-    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
-      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
-    }
-    return task
+    return this.tasksQueue.pop()
   }
 
   /** @inheritdoc */
@@ -179,24 +154,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.delete(name)
   }
 
-  private async startOnEmptyQueue (): Promise<void> {
-    if (
-      this.onEmptyQueueCount > 0 &&
-      (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
-    ) {
-      this.onEmptyQueueCount = 0
-      return
-    }
-    ++this.onEmptyQueueCount
-    this.dispatchEvent(
-      new CustomEvent<WorkerNodeEventDetail>('emptyqueue', {
-        detail: { workerId: this.info.id as number }
-      })
-    )
-    await sleep(exponentialDelay(this.onEmptyQueueCount))
-    await this.startOnEmptyQueue()
-  }
-
   private initWorkerInfo (worker: Worker): WorkerInfo {
     return {
       id: getWorkerId(worker),
@@ -223,6 +180,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
         get maxQueued (): number {
           return getTasksQueueMaxSize()
         },
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
@@ -264,6 +222,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
         get queued (): number {
           return getTaskFunctionQueueSize()
         },
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },