refactor: methods namespace cleanup
[poolifier.git] / src / pools / abstract-pool.ts
index ffbda872af76d22fd7d037b39dd54025568e39c5..f32aab0dc838409e2514bf0377c4ba9276180579 100644 (file)
@@ -3,17 +3,20 @@ import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
 import {
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
+  isPlainObject,
   median
 } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
+import { CircularArray } from '../circular-array'
+import { Queue } from '../queue'
 import {
-  PoolEvents,
   type IPool,
+  PoolEmitter,
+  PoolEvents,
   type PoolOptions,
-  type TasksQueueOptions,
-  PoolType
+  PoolType,
+  type TasksQueueOptions
 } from './pool'
-import { PoolEmitter } from './pool'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
   WorkerChoiceStrategies,
@@ -21,7 +24,6 @@ import {
   type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { CircularArray } from '../circular-array'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -84,10 +86,10 @@ export abstract class AbstractPool<
     this.checkFilePath(this.filePath)
     this.checkPoolOptions(this.opts)
 
-    this.chooseWorkerNode.bind(this)
-    this.executeTask.bind(this)
-    this.enqueueTask.bind(this)
-    this.checkAndEmitEvents.bind(this)
+    this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
+    this.executeTask = this.executeTask.bind(this)
+    this.enqueueTask = this.enqueueTask.bind(this)
+    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     this.setupHook()
 
@@ -125,7 +127,7 @@ export abstract class AbstractPool<
       )
     } else if (!Number.isSafeInteger(numberOfWorkers)) {
       throw new TypeError(
-        'Cannot instantiate a pool with a non integer number of workers'
+        'Cannot instantiate a pool with a non safe integer number of workers'
       )
     } else if (numberOfWorkers < 0) {
       throw new RangeError(
@@ -137,20 +139,28 @@ export abstract class AbstractPool<
   }
 
   private checkPoolOptions (opts: PoolOptions<Worker>): void {
-    this.opts.workerChoiceStrategy =
-      opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
-    this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
-    this.opts.workerChoiceStrategyOptions =
-      opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-    this.opts.enableEvents = opts.enableEvents ?? true
-    this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
-    if (this.opts.enableTasksQueue) {
-      this.checkValidTasksQueueOptions(
-        opts.tasksQueueOptions as TasksQueueOptions
-      )
-      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
-        opts.tasksQueueOptions as TasksQueueOptions
+    if (isPlainObject(opts)) {
+      this.opts.workerChoiceStrategy =
+        opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+      this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+      this.opts.workerChoiceStrategyOptions =
+        opts.workerChoiceStrategyOptions ??
+        DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+      this.checkValidWorkerChoiceStrategyOptions(
+        this.opts.workerChoiceStrategyOptions
       )
+      this.opts.enableEvents = opts.enableEvents ?? true
+      this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+      if (this.opts.enableTasksQueue) {
+        this.checkValidTasksQueueOptions(
+          opts.tasksQueueOptions as TasksQueueOptions
+        )
+        this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+          opts.tasksQueueOptions as TasksQueueOptions
+        )
+      }
+    } else {
+      throw new TypeError('Invalid pool options: must be a plain object')
     }
   }
 
@@ -164,9 +174,30 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkValidWorkerChoiceStrategyOptions (
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+  ): void {
+    if (!isPlainObject(workerChoiceStrategyOptions)) {
+      throw new TypeError(
+        'Invalid worker choice strategy options: must be a plain object'
+      )
+    }
+    if (
+      workerChoiceStrategyOptions.weights != null &&
+      Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
+    ) {
+      throw new Error(
+        'Invalid worker choice strategy options: must have a weight for each worker node'
+      )
+    }
+  }
+
   private checkValidTasksQueueOptions (
     tasksQueueOptions: TasksQueueOptions
   ): void {
+    if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
+      throw new TypeError('Invalid tasks queue options: must be a plain object')
+    }
     if ((tasksQueueOptions?.concurrency as number) <= 0) {
       throw new Error(
         `Invalid worker tasks concurrency '${
@@ -179,6 +210,9 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public abstract get type (): PoolType
 
+  /** @inheritDoc */
+  public abstract get size (): number
+
   /**
    * Number of tasks running in the pool.
    */
@@ -197,7 +231,7 @@ export abstract class AbstractPool<
       return 0
     }
     return this.workerNodes.reduce(
-      (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length,
+      (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
       0
     )
   }
@@ -229,6 +263,10 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       })
     }
@@ -244,6 +282,7 @@ export abstract class AbstractPool<
   public setWorkerChoiceStrategyOptions (
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
+    this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
     this.workerChoiceStrategyContext.setOptions(
       this.opts.workerChoiceStrategyOptions
@@ -296,29 +335,29 @@ export abstract class AbstractPool<
   protected abstract get busy (): boolean
 
   protected internalBusy (): boolean {
-    return this.findFreeWorkerNodeKey() === -1
-  }
-
-  /** @inheritDoc */
-  public findFreeWorkerNodeKey (): number {
-    return this.workerNodes.findIndex(workerNode => {
-      return workerNode.tasksUsage?.running === 0
-    })
+    return (
+      this.workerNodes.findIndex(workerNode => {
+        return workerNode.tasksUsage.running === 0
+      }) === -1
+    )
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data): Promise<Response> {
-    const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+  public async execute (data?: Data, name?: string): Promise<Response> {
+    const submissionTimestamp = performance.now()
+    const workerNodeKey = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
+      name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
+      submissionTimestamp,
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
       this.promiseResponseMap.set(submittedTask.id as string, {
         resolve,
         reject,
-        worker: workerNode.worker
+        worker: this.workerNodes[workerNodeKey].worker
       })
     })
     if (
@@ -332,6 +371,7 @@ export abstract class AbstractPool<
     } else {
       this.executeTask(workerNodeKey, submittedTask)
     }
+    this.workerChoiceStrategyContext.update(workerNodeKey)
     this.checkAndEmitEvents()
     // eslint-disable-next-line @typescript-eslint/return-await
     return res
@@ -390,12 +430,21 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage
+    const workerTasksUsage =
+      this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
     --workerTasksUsage.running
     ++workerTasksUsage.run
     if (message.error != null) {
       ++workerTasksUsage.error
     }
+    this.updateRunTimeTasksUsage(workerTasksUsage, message)
+    this.updateWaitTimeTasksUsage(workerTasksUsage, message)
+  }
+
+  private updateRunTimeTasksUsage (
+    workerTasksUsage: TasksUsage,
+    message: MessageValue<Response>
+  ): void {
     if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
       workerTasksUsage.runTime += message.runTime ?? 0
       if (
@@ -405,40 +454,67 @@ export abstract class AbstractPool<
         workerTasksUsage.avgRunTime =
           workerTasksUsage.runTime / workerTasksUsage.run
       }
-      if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) {
-        workerTasksUsage.runTimeHistory.push(message.runTime ?? 0)
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
+        message.runTime != null
+      ) {
+        workerTasksUsage.runTimeHistory.push(message.runTime)
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
   }
 
+  private updateWaitTimeTasksUsage (
+    workerTasksUsage: TasksUsage,
+    message: MessageValue<Response>
+  ): void {
+    if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+      workerTasksUsage.waitTime += message.waitTime ?? 0
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+        workerTasksUsage.run !== 0
+      ) {
+        workerTasksUsage.avgWaitTime =
+          workerTasksUsage.waitTime / workerTasksUsage.run
+      }
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
+        message.waitTime != null
+      ) {
+        workerTasksUsage.waitTimeHistory.push(message.waitTime)
+        workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+      }
+    }
+  }
+
   /**
    * Chooses a worker node for the next task.
    *
-   * The default uses a round robin algorithm to distribute the load.
+   * The default worker choice strategy uses a round robin algorithm to distribute the load.
    *
-   * @returns [worker node key, worker node].
+   * @returns The worker node key
    */
-  protected chooseWorkerNode (): [number, WorkerNode<Worker, Data>] {
+  protected chooseWorkerNode (): number {
     let workerNodeKey: number
     if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
       const workerCreated = this.createAndSetupWorker()
       this.registerWorkerMessageListener(workerCreated, message => {
+        const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
         if (
           isKillBehavior(KillBehaviors.HARD, message.kill) ||
           (message.kill != null &&
-            this.getWorkerTasksUsage(workerCreated)?.running === 0)
+            this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
         ) {
           // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-          this.flushTasksQueueByWorker(workerCreated)
-          void this.destroyWorker(workerCreated)
+          this.flushTasksQueue(currentWorkerNodeKey)
+          void (this.destroyWorker(workerCreated) as Promise<void>)
         }
       })
       workerNodeKey = this.getWorkerNodeKey(workerCreated)
     } else {
       workerNodeKey = this.workerChoiceStrategyContext.execute()
     }
-    return [workerNodeKey, this.workerNodes[workerNodeKey]]
+    return workerNodeKey
   }
 
   /**
@@ -556,21 +632,6 @@ export abstract class AbstractPool<
     workerNode.tasksUsage = tasksUsage
   }
 
-  /**
-   * Gets the given worker its tasks usage in the pool.
-   *
-   * @param worker - The worker.
-   * @throws Error if the worker is not found in the pool worker nodes.
-   * @returns The worker tasks usage.
-   */
-  private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
-    if (workerNodeKey !== -1) {
-      return this.workerNodes[workerNodeKey].tasksUsage
-    }
-    throw new Error('Worker could not be found in the pool worker nodes')
-  }
-
   /**
    * Pushes the given worker in the pool worker nodes.
    *
@@ -587,9 +648,13 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       },
-      tasksQueue: []
+      tasksQueue: new Queue<Task<Data>>()
     })
   }
 
@@ -605,7 +670,7 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     worker: Worker,
     tasksUsage: TasksUsage,
-    tasksQueue: Array<Task<Data>>
+    tasksQueue: Queue<Task<Data>>
   ): void {
     this.workerNodes[workerNodeKey] = {
       worker,
@@ -631,30 +696,28 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.push(task)
+    return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].tasksQueue.shift()
+    return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
   }
 
   private tasksQueueSize (workerNodeKey: number): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.length
+    return this.workerNodes[workerNodeKey].tasksQueue.size
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
     if (this.tasksQueueSize(workerNodeKey) > 0) {
-      for (const task of this.workerNodes[workerNodeKey].tasksQueue) {
-        this.executeTask(workerNodeKey, task)
+      for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
+        this.executeTask(
+          workerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
       }
     }
   }
 
-  private flushTasksQueueByWorker (worker: Worker): void {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
-    this.flushTasksQueue(workerNodeKey)
-  }
-
   private flushTasksQueues (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.flushTasksQueue(workerNodeKey)