feat: add pool runtime setters
[poolifier.git] / src / pools / abstract-pool.ts
index 6548eba30189c2491bc244eb6e7e168b395b14f0..f1c6ad13af6ea51de124cfb8545c65e2af5e01e8 100644 (file)
@@ -1,15 +1,24 @@
 import crypto from 'node:crypto'
 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
-import { EMPTY_FUNCTION, median } from '../utils'
+import {
+  DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+  EMPTY_FUNCTION,
+  median
+} from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions } from './pool'
+import {
+  PoolEvents,
+  type IPool,
+  type PoolOptions,
+  type TasksQueueOptions,
+  PoolType
+} from './pool'
 import { PoolEmitter } from './pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy
+  type WorkerChoiceStrategy,
+  type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 import { CircularArray } from '../circular-array'
@@ -19,13 +28,13 @@ import { CircularArray } from '../circular-array'
  *
  * @typeParam Worker - Type of worker which manages this pool.
  * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of response of execution. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
  */
 export abstract class AbstractPool<
   Worker extends IWorker,
   Data = unknown,
   Response = unknown
-> implements IPoolInternal<Worker, Data, Response> {
+> implements IPool<Worker, Data, Response> {
   /** @inheritDoc */
   public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
 
@@ -132,9 +141,17 @@ export abstract class AbstractPool<
       opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
     this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
     this.opts.workerChoiceStrategyOptions =
-      opts.workerChoiceStrategyOptions ?? { medRunTime: false }
+      opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
     this.opts.enableEvents = opts.enableEvents ?? true
     this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+    if (this.opts.enableTasksQueue) {
+      this.checkValidTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+    }
   }
 
   private checkValidWorkerChoiceStrategy (
@@ -147,6 +164,18 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkValidTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): void {
+    if ((tasksQueueOptions?.concurrency as number) <= 0) {
+      throw new Error(
+        `Invalid worker tasks concurrency '${
+          tasksQueueOptions.concurrency as number
+        }'`
+      )
+    }
+  }
+
   /** @inheritDoc */
   public abstract get type (): PoolType
 
@@ -203,21 +232,65 @@ export abstract class AbstractPool<
       })
     }
     this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      workerChoiceStrategy
+      this.opts.workerChoiceStrategy
     )
   }
 
   /** @inheritDoc */
-  public abstract get full (): boolean
+  public setWorkerChoiceStrategyOptions (
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+  ): void {
+    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.workerChoiceStrategyContext.setOptions(
+      this.opts.workerChoiceStrategyOptions
+    )
+  }
 
   /** @inheritDoc */
-  public abstract get busy (): boolean
+  public enableTasksQueue (enable: boolean, opts?: TasksQueueOptions): void {
+    if (this.opts.enableTasksQueue === true && !enable) {
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.flushTasksQueue(workerNodeKey)
+      }
+    }
+    this.opts.enableTasksQueue = enable
+    this.setTasksQueueOptions(opts as TasksQueueOptions)
+  }
+
+  /** @inheritDoc */
+  public setTasksQueueOptions (opts: TasksQueueOptions): void {
+    if (this.opts.enableTasksQueue === true) {
+      this.checkValidTasksQueueOptions(opts)
+      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(opts)
+    } else {
+      delete this.opts.tasksQueueOptions
+    }
+  }
+
+  private buildTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): TasksQueueOptions {
+    return {
+      concurrency: tasksQueueOptions?.concurrency ?? 1
+    }
+  }
+
+  /**
+   * Whether the pool is full or not.
+   *
+   * The pool filling boolean status.
+   */
+  protected abstract get full (): boolean
+
+  /**
+   * Whether the pool is busy or not.
+   *
+   * The pool busyness boolean status.
+   */
+  protected abstract get busy (): boolean
 
   protected internalBusy (): boolean {
-    return (
-      this.numberOfRunningTasks >= this.numberOfWorkers &&
-      this.findFreeWorkerNodeKey() === -1
-    )
+    return this.findFreeWorkerNodeKey() === -1
   }
 
   /** @inheritDoc */
@@ -236,7 +309,7 @@ export abstract class AbstractPool<
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(submittedTask.id, {
+      this.promiseResponseMap.set(submittedTask.id as string, {
         resolve,
         reject,
         worker: workerNode.worker
@@ -244,7 +317,10 @@ export abstract class AbstractPool<
     })
     if (
       this.opts.enableTasksQueue === true &&
-      (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
+      (this.busy ||
+        this.workerNodes[workerNodeKey].tasksUsage.running >=
+          ((this.opts.tasksQueueOptions as TasksQueueOptions)
+            .concurrency as number))
     ) {
       this.enqueueTask(workerNodeKey, submittedTask)
     } else {
@@ -258,8 +334,8 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
-      this.workerNodes.map(async workerNode => {
-        this.flushTasksQueueByWorker(workerNode.worker)
+      this.workerNodes.map(async (workerNode, workerNodeKey) => {
+        this.flushTasksQueue(workerNodeKey)
         await this.destroyWorker(workerNode.worker)
       })
     )
@@ -339,11 +415,7 @@ export abstract class AbstractPool<
    */
   protected chooseWorkerNode (): [number, WorkerNode<Worker, Data>] {
     let workerNodeKey: number
-    if (
-      this.type === PoolType.DYNAMIC &&
-      !this.full &&
-      this.findFreeWorkerNodeKey() === -1
-    ) {
+    if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
       const workerCreated = this.createAndSetupWorker()
       this.registerWorkerMessageListener(workerCreated, message => {
         if (
@@ -482,6 +554,7 @@ export abstract class AbstractPool<
    * Gets the given worker its tasks usage in the pool.
    *
    * @param worker - The worker.
+   * @throws {@link Error} if the worker is not found in the pool worker nodes.
    * @returns The worker tasks usage.
    */
   private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
@@ -551,8 +624,8 @@ export abstract class AbstractPool<
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }
 
-  private enqueueTask (workerNodeKey: number, task: Task<Data>): void {
-    this.workerNodes[workerNodeKey].tasksQueue.push(task)
+  private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+    return this.workerNodes[workerNodeKey].tasksQueue.push(task)
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {