Fix eslint configuration
[poolifier.git] / src / pools / abstract-pool.ts
index c3f293e78bc693fba6aea17e36c880968ee58443..10fe128170b5363b0b04d58b82e451369a0f9d3d 100644 (file)
@@ -4,90 +4,15 @@ import type {
 } from '../utility-types'
 import { EMPTY_FUNCTION } from '../utils'
 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
+import type { AbstractPoolWorker } from './abstract-pool-worker'
+import type { PoolOptions } from './pool'
 import type { IPoolInternal } from './pool-internal'
 import { PoolEmitter, PoolType } from './pool-internal'
-import type { WorkerChoiceStrategy } from './selection-strategies'
 import {
   WorkerChoiceStrategies,
-  WorkerChoiceStrategyContext
-} from './selection-strategies'
-
-/**
- * Callback invoked if the worker raised an error.
- */
-export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
-
-/**
- * Callback invoked when the worker has started successfully.
- */
-export type OnlineHandler<Worker> = (this: Worker) => void
-
-/**
- * Callback invoked when the worker exits successfully.
- */
-export type ExitHandler<Worker> = (this: Worker, code: number) => void
-
-/**
- * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
- */
-export interface IWorker {
-  /**
-   * Register a listener to the error event.
-   *
-   * @param event `'error'`.
-   * @param handler The error handler.
-   */
-  on(event: 'error', handler: ErrorHandler<this>): void
-  /**
-   * Register a listener to the online event.
-   *
-   * @param event `'online'`.
-   * @param handler The online handler.
-   */
-  on(event: 'online', handler: OnlineHandler<this>): void
-  /**
-   * Register a listener to the exit event.
-   *
-   * @param event `'exit'`.
-   * @param handler The exit handler.
-   */
-  on(event: 'exit', handler: ExitHandler<this>): void
-  /**
-   * Register a listener to the exit event that will only performed once.
-   *
-   * @param event `'exit'`.
-   * @param handler The exit handler.
-   */
-  once(event: 'exit', handler: ExitHandler<this>): void
-}
-
-/**
- * Options for a poolifier pool.
- */
-export interface PoolOptions<Worker> {
-  /**
-   * A function that will listen for error event on each worker.
-   */
-  errorHandler?: ErrorHandler<Worker>
-  /**
-   * A function that will listen for online event on each worker.
-   */
-  onlineHandler?: OnlineHandler<Worker>
-  /**
-   * A function that will listen for exit event on each worker.
-   */
-  exitHandler?: ExitHandler<Worker>
-  /**
-   * The work choice strategy to use in this pool.
-   */
-  workerChoiceStrategy?: WorkerChoiceStrategy
-  /**
-   * Pool events emission.
-   *
-   * Default to true.
-   */
-  enableEvents?: boolean
-}
+  WorkerChoiceStrategy
+} from './selection-strategies/selection-strategies-types'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 
 /**
  * Base class containing some shared logic for all poolifier pools.
@@ -97,26 +22,26 @@ export interface PoolOptions<Worker> {
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export abstract class AbstractPool<
-  Worker extends IWorker,
+  Worker extends AbstractPoolWorker,
   Data = unknown,
   Response = unknown
 > implements IPoolInternal<Worker, Data, Response> {
-  /** @inheritdoc */
+  /** @inheritDoc */
   public readonly workers: Worker[] = []
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public readonly emitter?: PoolEmitter
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public readonly max?: number
 
   /**
    * The promise map.
    *
-   * - `key`: This is the message ID of each submitted task.
+   * - `key`: This is the message Id of each submitted task.
    * - `value`: An object that contains the worker, the resolve function and the reject function.
    *
    * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
@@ -127,7 +52,7 @@ export abstract class AbstractPool<
   > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
 
   /**
-   * ID of the next message.
+   * Id of the next message.
    */
   protected nextMessageId: number = 0
 
@@ -174,13 +99,12 @@ export abstract class AbstractPool<
       () => {
         const workerCreated = this.createAndSetupWorker()
         this.registerWorkerMessageListener(workerCreated, message => {
-          const tasksInProgress = this.tasks.get(workerCreated)
           if (
             isKillBehavior(KillBehaviors.HARD, message.kill) ||
-            tasksInProgress === 0
+            this.getWorkerRunningTasks(workerCreated) === 0
           ) {
             // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-            void this.destroyWorker(workerCreated)
+            this.destroyWorker(workerCreated) as void
           }
         })
         return workerCreated
@@ -219,15 +143,25 @@ export abstract class AbstractPool<
     this.opts.enableEvents = opts.enableEvents ?? true
   }
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public abstract get type (): PoolType
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public get numberOfRunningTasks (): number {
     return this.promiseMap.size
   }
 
-  /** @inheritdoc */
+  /** @inheritDoc */
+  public getWorkerRunningTasks (worker: Worker): number | undefined {
+    return this.tasks.get(worker)
+  }
+
+  /** @inheritDoc */
+  public getWorkerIndex (worker: Worker): number {
+    return this.workers.indexOf(worker)
+  }
+
+  /** @inheritDoc */
   public setWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy
   ): void {
@@ -237,40 +171,40 @@ export abstract class AbstractPool<
     )
   }
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public abstract get busy (): boolean
 
   protected internalGetBusyStatus (): boolean {
     return (
       this.numberOfRunningTasks >= this.numberOfWorkers &&
-      this.findFreeTasksMapEntry() === false
+      this.findFreeWorker() === false
     )
   }
 
-  /** @inheritdoc */
-  public findFreeTasksMapEntry (): [Worker, number] | false {
-    for (const [worker, numberOfTasks] of this.tasks) {
-      if (numberOfTasks === 0) {
-        // A worker is free, return the matching tasks map entry
-        return [worker, numberOfTasks]
+  /** @inheritDoc */
+  public findFreeWorker (): Worker | false {
+    for (const worker of this.workers) {
+      if (this.getWorkerRunningTasks(worker) === 0) {
+        // A worker is free, return the matching worker
+        return worker
       }
     }
     return false
   }
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public execute (data: Data): Promise<Response> {
     // Configure worker to handle message with the specified task
     const worker = this.chooseWorker()
-    this.increaseWorkersTask(worker)
-    this.checkAndEmitBusy()
     const messageId = ++this.nextMessageId
     const res = this.internalExecute(worker, messageId)
-    this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
+    this.checkAndEmitBusy()
+    data = data ?? ({} as Data)
+    this.sendToWorker(worker, { data, id: messageId })
     return res
   }
 
-  /** @inheritdoc */
+  /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
   }
@@ -296,7 +230,7 @@ export abstract class AbstractPool<
   protected abstract isMain (): boolean
 
   /**
-   * Increase the number of tasks that the given workers has applied.
+   * Increase the number of tasks that the given worker has applied.
    *
    * @param worker Worker whose tasks are increased.
    */
@@ -305,7 +239,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Decrease the number of tasks that the given workers has applied.
+   * Decrease the number of tasks that the given worker has applied.
    *
    * @param worker Worker whose tasks are decreased.
    */
@@ -314,7 +248,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Step the number of tasks that the given workers has applied.
+   * Step the number of tasks that the given worker has applied.
    *
    * @param worker Worker whose tasks are set.
    * @param step Worker number of tasks step.
@@ -335,8 +269,7 @@ export abstract class AbstractPool<
    */
   protected removeWorker (worker: Worker): void {
     // Clean worker from data structure
-    const workerIndex = this.workers.indexOf(worker)
-    this.workers.splice(workerIndex, 1)
+    this.workers.splice(this.getWorkerIndex(worker), 1)
     this.tasks.delete(worker)
   }
 
@@ -376,6 +309,7 @@ export abstract class AbstractPool<
     worker: Worker,
     messageId: number
   ): Promise<Response> {
+    this.increaseWorkersTask(worker)
     return new Promise<Response>((resolve, reject) => {
       this.promiseMap.set(messageId, { resolve, reject, worker })
     })
@@ -401,8 +335,9 @@ export abstract class AbstractPool<
    * @returns New, completely set up worker.
    */
   protected createAndSetupWorker (): Worker {
-    const worker: Worker = this.createWorker()
+    const worker = this.createWorker()
 
+    worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
@@ -421,16 +356,16 @@ export abstract class AbstractPool<
   /**
    * This function is the listener registered for each worker.
    *
-   * @returns The listener function to execute when a message is sent from a worker.
+   * @returns The listener function to execute when a message is received from a worker.
    */
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
-      if (message.id) {
-        const value = this.promiseMap.get(message.id)
-        if (value) {
-          this.decreaseWorkersTasks(value.worker)
-          if (message.error) value.reject(message.error)
-          else value.resolve(message.data as Response)
+      if (message.id !== undefined) {
+        const promise = this.promiseMap.get(message.id)
+        if (promise !== undefined) {
+          this.decreaseWorkersTasks(promise.worker)
+          if (message.error) promise.reject(message.error)
+          else promise.resolve(message.data as Response)
           this.promiseMap.delete(message.id)
         }
       }