feat: add pool runtime setters
[poolifier.git] / src / pools / abstract-pool.ts
index 1fd9a1737f10991489af3b006829dcaf3e7bd580..f1c6ad13af6ea51de124cfb8545c65e2af5e01e8 100644 (file)
@@ -6,14 +6,19 @@ import {
   median
 } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions, type TasksQueueOptions } from './pool'
+import {
+  PoolEvents,
+  type IPool,
+  type PoolOptions,
+  type TasksQueueOptions,
+  PoolType
+} from './pool'
 import { PoolEmitter } from './pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy
+  type WorkerChoiceStrategy,
+  type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 import { CircularArray } from '../circular-array'
@@ -23,13 +28,13 @@ import { CircularArray } from '../circular-array'
  *
  * @typeParam Worker - Type of worker which manages this pool.
  * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of response of execution. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
  */
 export abstract class AbstractPool<
   Worker extends IWorker,
   Data = unknown,
   Response = unknown
-> implements IPoolInternal<Worker, Data, Response> {
+> implements IPool<Worker, Data, Response> {
   /** @inheritDoc */
   public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
 
@@ -140,16 +145,12 @@ export abstract class AbstractPool<
     this.opts.enableEvents = opts.enableEvents ?? true
     this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
     if (this.opts.enableTasksQueue) {
-      if ((opts.tasksQueueOptions?.concurrency as number) <= 0) {
-        throw new Error(
-          `Invalid tasks queue concurrency '${
-            (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number
-          }'`
-        )
-      }
-      this.opts.tasksQueueOptions = {
-        concurrency: opts.tasksQueueOptions?.concurrency ?? 1
-      }
+      this.checkValidTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
     }
   }
 
@@ -163,6 +164,18 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkValidTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): void {
+    if ((tasksQueueOptions?.concurrency as number) <= 0) {
+      throw new Error(
+        `Invalid worker tasks concurrency '${
+          tasksQueueOptions.concurrency as number
+        }'`
+      )
+    }
+  }
+
   /** @inheritDoc */
   public abstract get type (): PoolType
 
@@ -219,15 +232,62 @@ export abstract class AbstractPool<
       })
     }
     this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      workerChoiceStrategy
+      this.opts.workerChoiceStrategy
     )
   }
 
   /** @inheritDoc */
-  public abstract get full (): boolean
+  public setWorkerChoiceStrategyOptions (
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+  ): void {
+    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.workerChoiceStrategyContext.setOptions(
+      this.opts.workerChoiceStrategyOptions
+    )
+  }
 
   /** @inheritDoc */
-  public abstract get busy (): boolean
+  public enableTasksQueue (enable: boolean, opts?: TasksQueueOptions): void {
+    if (this.opts.enableTasksQueue === true && !enable) {
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.flushTasksQueue(workerNodeKey)
+      }
+    }
+    this.opts.enableTasksQueue = enable
+    this.setTasksQueueOptions(opts as TasksQueueOptions)
+  }
+
+  /** @inheritDoc */
+  public setTasksQueueOptions (opts: TasksQueueOptions): void {
+    if (this.opts.enableTasksQueue === true) {
+      this.checkValidTasksQueueOptions(opts)
+      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(opts)
+    } else {
+      delete this.opts.tasksQueueOptions
+    }
+  }
+
+  private buildTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): TasksQueueOptions {
+    return {
+      concurrency: tasksQueueOptions?.concurrency ?? 1
+    }
+  }
+
+  /**
+   * Whether the pool is full or not.
+   *
+   * The pool filling boolean status.
+   */
+  protected abstract get full (): boolean
+
+  /**
+   * Whether the pool is busy or not.
+   *
+   * The pool busyness boolean status.
+   */
+  protected abstract get busy (): boolean
 
   protected internalBusy (): boolean {
     return this.findFreeWorkerNodeKey() === -1
@@ -249,7 +309,7 @@ export abstract class AbstractPool<
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(submittedTask.id, {
+      this.promiseResponseMap.set(submittedTask.id as string, {
         resolve,
         reject,
         worker: workerNode.worker
@@ -258,10 +318,9 @@ export abstract class AbstractPool<
     if (
       this.opts.enableTasksQueue === true &&
       (this.busy ||
-        this.workerNodes[workerNodeKey].tasksUsage.running >
+        this.workerNodes[workerNodeKey].tasksUsage.running >=
           ((this.opts.tasksQueueOptions as TasksQueueOptions)
-            .concurrency as number) -
-            1)
+            .concurrency as number))
     ) {
       this.enqueueTask(workerNodeKey, submittedTask)
     } else {
@@ -495,6 +554,7 @@ export abstract class AbstractPool<
    * Gets the given worker its tasks usage in the pool.
    *
    * @param worker - The worker.
+   * @throws {@link Error} if the worker is not found in the pool worker nodes.
    * @returns The worker tasks usage.
    */
   private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {