Add dynamic worker choice strategy change at runtime
[poolifier.git] / src / pools / abstract-pool.ts
index 71046d494db7770ade3339d46edcde5d27291efe..928b24c65832a38ef625b7b07d498ce9f3ad504b 100644 (file)
@@ -4,106 +4,15 @@ import type {
 } from '../utility-types'
 import { EMPTY_FUNCTION } from '../utils'
 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
+import type { AbstractPoolWorker } from './abstract-pool-worker'
+import type { PoolOptions } from './pool'
 import type { IPoolInternal } from './pool-internal'
 import { PoolEmitter, PoolType } from './pool-internal'
-import type { WorkerChoiceStrategy } from './selection-strategies'
 import {
   WorkerChoiceStrategies,
-  WorkerChoiceStrategyContext
-} from './selection-strategies'
-
-/**
- * Callback invoked if the worker has received a message.
- */
-export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
-
-/**
- * Callback invoked if the worker raised an error.
- */
-export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
-
-/**
- * Callback invoked when the worker has started successfully.
- */
-export type OnlineHandler<Worker> = (this: Worker) => void
-
-/**
- * Callback invoked when the worker exits successfully.
- */
-export type ExitHandler<Worker> = (this: Worker, code: number) => void
-
-/**
- * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
- */
-export interface IWorker {
-  /**
-   * Register a listener to the message event.
-   *
-   * @param event `'message'`.
-   * @param handler The message handler.
-   */
-  on(event: 'message', handler: MessageHandler<this>): void
-  /**
-   * Register a listener to the error event.
-   *
-   * @param event `'error'`.
-   * @param handler The error handler.
-   */
-  on(event: 'error', handler: ErrorHandler<this>): void
-  /**
-   * Register a listener to the online event.
-   *
-   * @param event `'online'`.
-   * @param handler The online handler.
-   */
-  on(event: 'online', handler: OnlineHandler<this>): void
-  /**
-   * Register a listener to the exit event.
-   *
-   * @param event `'exit'`.
-   * @param handler The exit handler.
-   */
-  on(event: 'exit', handler: ExitHandler<this>): void
-  /**
-   * Register a listener to the exit event that will only performed once.
-   *
-   * @param event `'exit'`.
-   * @param handler The exit handler.
-   */
-  once(event: 'exit', handler: ExitHandler<this>): void
-}
-
-/**
- * Options for a poolifier pool.
- */
-export interface PoolOptions<Worker> {
-  /**
-   * A function that will listen for message event on each worker.
-   */
-  messageHandler?: MessageHandler<Worker>
-  /**
-   * A function that will listen for error event on each worker.
-   */
-  errorHandler?: ErrorHandler<Worker>
-  /**
-   * A function that will listen for online event on each worker.
-   */
-  onlineHandler?: OnlineHandler<Worker>
-  /**
-   * A function that will listen for exit event on each worker.
-   */
-  exitHandler?: ExitHandler<Worker>
-  /**
-   * The work choice strategy to use in this pool.
-   */
-  workerChoiceStrategy?: WorkerChoiceStrategy
-  /**
-   * Pool events emission.
-   *
-   * @default true
-   */
-  enableEvents?: boolean
-}
+  WorkerChoiceStrategy
+} from './selection-strategies/selection-strategies-types'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 
 /**
  * Base class containing some shared logic for all poolifier pools.
@@ -113,7 +22,7 @@ export interface PoolOptions<Worker> {
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export abstract class AbstractPool<
-  Worker extends IWorker,
+  Worker extends AbstractPoolWorker,
   Data = unknown,
   Response = unknown
 > implements IPoolInternal<Worker, Data, Response> {
@@ -192,7 +101,7 @@ export abstract class AbstractPool<
         this.registerWorkerMessageListener(workerCreated, message => {
           if (
             isKillBehavior(KillBehaviors.HARD, message.kill) ||
-            this.tasks.get(workerCreated) === 0
+            this.getWorkerRunningTasks(workerCreated) === 0
           ) {
             // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
             this.destroyWorker(workerCreated) as void
@@ -242,6 +151,16 @@ export abstract class AbstractPool<
     return this.promiseMap.size
   }
 
+  /** @inheritdoc */
+  public getWorkerRunningTasks (worker: Worker): number | undefined {
+    return this.tasks.get(worker)
+  }
+
+  /** @inheritdoc */
+  public getWorkerIndex (worker: Worker): number {
+    return this.workers.indexOf(worker)
+  }
+
   /** @inheritdoc */
   public setWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy
@@ -258,16 +177,16 @@ export abstract class AbstractPool<
   protected internalGetBusyStatus (): boolean {
     return (
       this.numberOfRunningTasks >= this.numberOfWorkers &&
-      this.findFreeTasksMapEntry() === false
+      this.findFreeWorker() === false
     )
   }
 
   /** @inheritdoc */
-  public findFreeTasksMapEntry (): [Worker, number] | false {
-    for (const [worker, numberOfTasks] of this.tasks) {
-      if (numberOfTasks === 0) {
-        // A worker is free, return the matching tasks map entry
-        return [worker, numberOfTasks]
+  public findFreeWorker (): Worker | false {
+    for (const worker of this.workers) {
+      if (this.getWorkerRunningTasks(worker) === 0) {
+        // A worker is free, return the matching worker
+        return worker
       }
     }
     return false
@@ -350,8 +269,7 @@ export abstract class AbstractPool<
    */
   protected removeWorker (worker: Worker): void {
     // Clean worker from data structure
-    const workerIndex = this.workers.indexOf(worker)
-    this.workers.splice(workerIndex, 1)
+    this.workers.splice(this.getWorkerIndex(worker), 1)
     this.tasks.delete(worker)
   }