docs: refine pool class param documentation
[poolifier.git] / src / pools / abstract-pool.ts
index 6462cb09a882c6c005ceea481cacb5519e2de0de..7d8531aa8d5d5480f8395f0263c90248f955358d 100644 (file)
@@ -6,14 +6,19 @@ import {
   median
 } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions } from './pool'
+import {
+  PoolEvents,
+  type IPool,
+  type PoolOptions,
+  type TasksQueueOptions,
+  PoolType
+} from './pool'
 import { PoolEmitter } from './pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy
+  type WorkerChoiceStrategy,
+  type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 import { CircularArray } from '../circular-array'
@@ -23,13 +28,13 @@ import { CircularArray } from '../circular-array'
  *
  * @typeParam Worker - Type of worker which manages this pool.
  * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of response of execution. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
  */
 export abstract class AbstractPool<
   Worker extends IWorker,
   Data = unknown,
   Response = unknown
-> implements IPoolInternal<Worker, Data, Response> {
+> implements IPool<Worker, Data, Response> {
   /** @inheritDoc */
   public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
 
@@ -64,7 +69,7 @@ export abstract class AbstractPool<
    * Constructs a new poolifier pool.
    *
    * @param numberOfWorkers - Number of workers that this pool should manage.
-   * @param filePath - Path to the worker-file.
+   * @param filePath - Path to the worker file.
    * @param opts - Options for the pool.
    */
   public constructor (
@@ -139,6 +144,14 @@ export abstract class AbstractPool<
       opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
     this.opts.enableEvents = opts.enableEvents ?? true
     this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+    if (this.opts.enableTasksQueue) {
+      this.checkValidTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+      this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+        opts.tasksQueueOptions as TasksQueueOptions
+      )
+    }
   }
 
   private checkValidWorkerChoiceStrategy (
@@ -151,6 +164,18 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkValidTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): void {
+    if ((tasksQueueOptions?.concurrency as number) <= 0) {
+      throw new Error(
+        `Invalid worker tasks concurrency '${
+          tasksQueueOptions.concurrency as number
+        }'`
+      )
+    }
+  }
+
   /** @inheritDoc */
   public abstract get type (): PoolType
 
@@ -191,7 +216,8 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public setWorkerChoiceStrategy (
-    workerChoiceStrategy: WorkerChoiceStrategy
+    workerChoiceStrategy: WorkerChoiceStrategy,
+    workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     this.opts.workerChoiceStrategy = workerChoiceStrategy
@@ -207,15 +233,69 @@ export abstract class AbstractPool<
       })
     }
     this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      workerChoiceStrategy
+      this.opts.workerChoiceStrategy
     )
+    if (workerChoiceStrategyOptions != null) {
+      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+    }
   }
 
   /** @inheritDoc */
-  public abstract get full (): boolean
+  public setWorkerChoiceStrategyOptions (
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+  ): void {
+    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.workerChoiceStrategyContext.setOptions(
+      this.opts.workerChoiceStrategyOptions
+    )
+  }
 
   /** @inheritDoc */
-  public abstract get busy (): boolean
+  public enableTasksQueue (
+    enable: boolean,
+    tasksQueueOptions?: TasksQueueOptions
+  ): void {
+    if (this.opts.enableTasksQueue === true && !enable) {
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.flushTasksQueue(workerNodeKey)
+      }
+    }
+    this.opts.enableTasksQueue = enable
+    this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+  }
+
+  /** @inheritDoc */
+  public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+    if (this.opts.enableTasksQueue === true) {
+      this.checkValidTasksQueueOptions(tasksQueueOptions)
+      this.opts.tasksQueueOptions =
+        this.buildTasksQueueOptions(tasksQueueOptions)
+    } else {
+      delete this.opts.tasksQueueOptions
+    }
+  }
+
+  private buildTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions
+  ): TasksQueueOptions {
+    return {
+      concurrency: tasksQueueOptions?.concurrency ?? 1
+    }
+  }
+
+  /**
+   * Whether the pool is full or not.
+   *
+   * The pool filling boolean status.
+   */
+  protected abstract get full (): boolean
+
+  /**
+   * Whether the pool is busy or not.
+   *
+   * The pool busyness boolean status.
+   */
+  protected abstract get busy (): boolean
 
   protected internalBusy (): boolean {
     return this.findFreeWorkerNodeKey() === -1
@@ -237,7 +317,7 @@ export abstract class AbstractPool<
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(submittedTask.id, {
+      this.promiseResponseMap.set(submittedTask.id as string, {
         resolve,
         reject,
         worker: workerNode.worker
@@ -245,7 +325,10 @@ export abstract class AbstractPool<
     })
     if (
       this.opts.enableTasksQueue === true &&
-      (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
+      (this.busy ||
+        this.workerNodes[workerNodeKey].tasksUsage.running >=
+          ((this.opts.tasksQueueOptions as TasksQueueOptions)
+            .concurrency as number))
     ) {
       this.enqueueTask(workerNodeKey, submittedTask)
     } else {
@@ -479,6 +562,7 @@ export abstract class AbstractPool<
    * Gets the given worker its tasks usage in the pool.
    *
    * @param worker - The worker.
+   * @throws Error if the worker is not found in the pool worker nodes.
    * @returns The worker tasks usage.
    */
   private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {