perf: alternate worker selection between start and end of worker nodes
[poolifier.git] / src / pools / abstract-pool.ts
index 1c59f74b22b1eaf811e3a9dc0cf3d6bbbd150206..67a22ffe04898f590ab70669a9d49934d6f90ad6 100644 (file)
@@ -3,7 +3,6 @@ import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
 import {
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
-  INITIAL_TASKS_USAGE,
   median
 } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
@@ -18,9 +17,11 @@ import { PoolEmitter } from './pool'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy
+  type WorkerChoiceStrategy,
+  type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { CircularArray } from '../circular-array'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -68,7 +69,7 @@ export abstract class AbstractPool<
    * Constructs a new poolifier pool.
    *
    * @param numberOfWorkers - Number of workers that this pool should manage.
-   * @param filePath - Path to the worker-file.
+   * @param filePath - Path to the worker file.
    * @param opts - Options for the pool.
    */
   public constructor (
@@ -83,10 +84,10 @@ export abstract class AbstractPool<
     this.checkFilePath(this.filePath)
     this.checkPoolOptions(this.opts)
 
-    this.chooseWorkerNode.bind(this)
-    this.executeTask.bind(this)
-    this.enqueueTask.bind(this)
-    this.checkAndEmitEvents.bind(this)
+    this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
+    this.executeTask = this.executeTask.bind(this)
+    this.enqueueTask = this.enqueueTask.bind(this)
+    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     this.setupHook()
 
@@ -144,16 +145,12 @@ export abstract class AbstractPool<
     this.opts.enableEvents = opts.enableEvents ?? true
     this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
     if (this.opts.enableTasksQueue) {
-      if ((opts.tasksQueueOptions?.concurrency as number) <= 0) {
-        throw new Error(
-          `Invalid worker tasks concurrency '${
-            (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number
-          }'`
-        )
-      }
-      this.opts.tasksQueueOptions = {
-        concurrency: opts.tasksQueueOptions?.concurrency ?? 1
-      }
+      this.checkValidTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
     }
   }
 
@@ -167,6 +164,18 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkValidTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): void {
+    if ((tasksQueueOptions?.concurrency as number) <= 0) {
+      throw new Error(
+        `Invalid worker tasks concurrency '${
+          tasksQueueOptions.concurrency as number
+        }'`
+      )
+    }
+  }
+
   /** @inheritDoc */
   public abstract get type (): PoolType
 
@@ -207,18 +216,71 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public setWorkerChoiceStrategy (
-    workerChoiceStrategy: WorkerChoiceStrategy
+    workerChoiceStrategy: WorkerChoiceStrategy,
+    workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     this.opts.workerChoiceStrategy = workerChoiceStrategy
     for (const workerNode of this.workerNodes) {
-      this.setWorkerNodeTasksUsage(workerNode, INITIAL_TASKS_USAGE)
+      this.setWorkerNodeTasksUsage(workerNode, {
+        run: 0,
+        running: 0,
+        runTime: 0,
+        runTimeHistory: new CircularArray(),
+        avgRunTime: 0,
+        medRunTime: 0,
+        error: 0
+      })
     }
     this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      workerChoiceStrategy
+      this.opts.workerChoiceStrategy
+    )
+    if (workerChoiceStrategyOptions != null) {
+      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+    }
+  }
+
+  /** @inheritDoc */
+  public setWorkerChoiceStrategyOptions (
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+  ): void {
+    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.workerChoiceStrategyContext.setOptions(
+      this.opts.workerChoiceStrategyOptions
     )
   }
 
+  /** @inheritDoc */
+  public enableTasksQueue (
+    enable: boolean,
+    tasksQueueOptions?: TasksQueueOptions
+  ): void {
+    if (this.opts.enableTasksQueue === true && !enable) {
+      this.flushTasksQueues()
+    }
+    this.opts.enableTasksQueue = enable
+    this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+  }
+
+  /** @inheritDoc */
+  public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+    if (this.opts.enableTasksQueue === true) {
+      this.checkValidTasksQueueOptions(tasksQueueOptions)
+      this.opts.tasksQueueOptions =
+        this.buildTasksQueueOptions(tasksQueueOptions)
+    } else {
+      delete this.opts.tasksQueueOptions
+    }
+  }
+
+  private buildTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): TasksQueueOptions {
+    return {
+      concurrency: tasksQueueOptions?.concurrency ?? 1
+    }
+  }
+
   /**
    * Whether the pool is full or not.
    *
@@ -245,7 +307,21 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public async execute (data: Data): Promise<Response> {
+  public findLastFreeWorkerNodeKey (): number {
+    // It requires node >= 18.0.0
+    // return this.workerNodes.findLastIndex(workerNode => {
+    //   return workerNode.tasksUsage?.running === 0
+    // })
+    for (let i = this.workerNodes.length - 1; i >= 0; i--) {
+      if (this.workerNodes[i].tasksUsage?.running === 0) {
+        return i
+      }
+    }
+    return -1
+  }
+
+  /** @inheritDoc */
+  public async execute (data?: Data): Promise<Response> {
     const [workerNodeKey, workerNode] = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@ -498,7 +574,7 @@ export abstract class AbstractPool<
    * Gets the given worker its tasks usage in the pool.
    *
    * @param worker - The worker.
-   * @throws {@link Error} if the worker is not found in the pool worker nodes.
+   * @throws Error if the worker is not found in the pool worker nodes.
    * @returns The worker tasks usage.
    */
   private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
@@ -518,7 +594,15 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     return this.workerNodes.push({
       worker,
-      tasksUsage: INITIAL_TASKS_USAGE,
+      tasksUsage: {
+        run: 0,
+        running: 0,
+        runTime: 0,
+        runTimeHistory: new CircularArray(),
+        avgRunTime: 0,
+        medRunTime: 0,
+        error: 0
+      },
       tasksQueue: []
     })
   }
@@ -584,4 +668,10 @@ export abstract class AbstractPool<
     const workerNodeKey = this.getWorkerNodeKey(worker)
     this.flushTasksQueue(workerNodeKey)
   }
+
+  private flushTasksQueues (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.flushTasksQueue(workerNodeKey)
+    }
+  }
 }