perf: alternate worker selection between start and end of worker nodes
[poolifier.git] / src / pools / abstract-pool.ts
index fea4847d685c1c60433b55a321c83f30dee20abf..67a22ffe04898f590ab70669a9d49934d6f90ad6 100644 (file)
@@ -1,15 +1,24 @@
 import crypto from 'node:crypto'
 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
-import { EMPTY_FUNCTION, median } from '../utils'
+import {
+  DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+  EMPTY_FUNCTION,
+  median
+} from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions } from './pool'
+import {
+  PoolEvents,
+  type IPool,
+  type PoolOptions,
+  type TasksQueueOptions,
+  PoolType
+} from './pool'
 import { PoolEmitter } from './pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy
+  type WorkerChoiceStrategy,
+  type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 import { CircularArray } from '../circular-array'
@@ -19,13 +28,13 @@ import { CircularArray } from '../circular-array'
  *
  * @typeParam Worker - Type of worker which manages this pool.
  * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of response of execution. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
  */
 export abstract class AbstractPool<
   Worker extends IWorker,
   Data = unknown,
   Response = unknown
-> implements IPoolInternal<Worker, Data, Response> {
+> implements IPool<Worker, Data, Response> {
   /** @inheritDoc */
   public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
 
@@ -60,7 +69,7 @@ export abstract class AbstractPool<
    * Constructs a new poolifier pool.
    *
    * @param numberOfWorkers - Number of workers that this pool should manage.
-   * @param filePath - Path to the worker-file.
+   * @param filePath - Path to the worker file.
    * @param opts - Options for the pool.
    */
   public constructor (
@@ -75,10 +84,10 @@ export abstract class AbstractPool<
     this.checkFilePath(this.filePath)
     this.checkPoolOptions(this.opts)
 
-    this.chooseWorkerNode.bind(this)
-    this.executeTask.bind(this)
-    this.enqueueTask.bind(this)
-    this.checkAndEmitEvents.bind(this)
+    this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
+    this.executeTask = this.executeTask.bind(this)
+    this.enqueueTask = this.enqueueTask.bind(this)
+    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     this.setupHook()
 
@@ -132,9 +141,17 @@ export abstract class AbstractPool<
       opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
     this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
     this.opts.workerChoiceStrategyOptions =
-      opts.workerChoiceStrategyOptions ?? { medRunTime: false }
+      opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
     this.opts.enableEvents = opts.enableEvents ?? true
     this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+    if (this.opts.enableTasksQueue) {
+      this.checkValidTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+    }
   }
 
   private checkValidWorkerChoiceStrategy (
@@ -147,6 +164,18 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkValidTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): void {
+    if ((tasksQueueOptions?.concurrency as number) <= 0) {
+      throw new Error(
+        `Invalid worker tasks concurrency '${
+          tasksQueueOptions.concurrency as number
+        }'`
+      )
+    }
+  }
+
   /** @inheritDoc */
   public abstract get type (): PoolType
 
@@ -187,7 +216,8 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public setWorkerChoiceStrategy (
-    workerChoiceStrategy: WorkerChoiceStrategy
+    workerChoiceStrategy: WorkerChoiceStrategy,
+    workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     this.opts.workerChoiceStrategy = workerChoiceStrategy
@@ -203,15 +233,67 @@ export abstract class AbstractPool<
       })
     }
     this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      workerChoiceStrategy
+      this.opts.workerChoiceStrategy
     )
+    if (workerChoiceStrategyOptions != null) {
+      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+    }
   }
 
   /** @inheritDoc */
-  public abstract get full (): boolean
+  public setWorkerChoiceStrategyOptions (
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+  ): void {
+    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.workerChoiceStrategyContext.setOptions(
+      this.opts.workerChoiceStrategyOptions
+    )
+  }
+
+  /** @inheritDoc */
+  public enableTasksQueue (
+    enable: boolean,
+    tasksQueueOptions?: TasksQueueOptions
+  ): void {
+    if (this.opts.enableTasksQueue === true && !enable) {
+      this.flushTasksQueues()
+    }
+    this.opts.enableTasksQueue = enable
+    this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+  }
 
   /** @inheritDoc */
-  public abstract get busy (): boolean
+  public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+    if (this.opts.enableTasksQueue === true) {
+      this.checkValidTasksQueueOptions(tasksQueueOptions)
+      this.opts.tasksQueueOptions =
+        this.buildTasksQueueOptions(tasksQueueOptions)
+    } else {
+      delete this.opts.tasksQueueOptions
+    }
+  }
+
+  private buildTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): TasksQueueOptions {
+    return {
+      concurrency: tasksQueueOptions?.concurrency ?? 1
+    }
+  }
+
+  /**
+   * Whether the pool is full or not.
+   *
+   * The pool filling boolean status.
+   */
+  protected abstract get full (): boolean
+
+  /**
+   * Whether the pool is busy or not.
+   *
+   * The pool busyness boolean status.
+   */
+  protected abstract get busy (): boolean
 
   protected internalBusy (): boolean {
     return this.findFreeWorkerNodeKey() === -1
@@ -225,7 +307,21 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public async execute (data: Data): Promise<Response> {
+  public findLastFreeWorkerNodeKey (): number {
+    // It requires node >= 18.0.0
+    // return this.workerNodes.findLastIndex(workerNode => {
+    //   return workerNode.tasksUsage?.running === 0
+    // })
+    for (let i = this.workerNodes.length - 1; i >= 0; i--) {
+      if (this.workerNodes[i].tasksUsage?.running === 0) {
+        return i
+      }
+    }
+    return -1
+  }
+
+  /** @inheritDoc */
+  public async execute (data?: Data): Promise<Response> {
     const [workerNodeKey, workerNode] = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@ -233,7 +329,7 @@ export abstract class AbstractPool<
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(submittedTask.id, {
+      this.promiseResponseMap.set(submittedTask.id as string, {
         resolve,
         reject,
         worker: workerNode.worker
@@ -241,7 +337,10 @@ export abstract class AbstractPool<
     })
     if (
       this.opts.enableTasksQueue === true &&
-      (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
+      (this.busy ||
+        this.workerNodes[workerNodeKey].tasksUsage.running >=
+          ((this.opts.tasksQueueOptions as TasksQueueOptions)
+            .concurrency as number))
     ) {
       this.enqueueTask(workerNodeKey, submittedTask)
     } else {
@@ -475,6 +574,7 @@ export abstract class AbstractPool<
    * Gets the given worker its tasks usage in the pool.
    *
    * @param worker - The worker.
+   * @throws Error if the worker is not found in the pool worker nodes.
    * @returns The worker tasks usage.
    */
   private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
@@ -544,8 +644,8 @@ export abstract class AbstractPool<
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }
 
-  private enqueueTask (workerNodeKey: number, task: Task<Data>): void {
-    this.workerNodes[workerNodeKey].tasksQueue.push(task)
+  private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+    return this.workerNodes[workerNodeKey].tasksQueue.push(task)
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
@@ -568,4 +668,10 @@ export abstract class AbstractPool<
     const workerNodeKey = this.getWorkerNodeKey(worker)
     this.flushTasksQueue(workerNodeKey)
   }
+
+  private flushTasksQueues (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.flushTasksQueue(workerNodeKey)
+    }
+  }
 }