Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
index 6a26c7b7a63129c1d51e1bcfac0804b2d9962608..6761882589bb14bd9d205c33684ac7e7b3ab5128 100644 (file)
@@ -69,8 +69,7 @@ export abstract class AbstractPool<
   public readonly emitter?: PoolEmitter
 
   /**
-   * The task execution response promise map.
-   *
+   * The task execution response promise map:
    * - `key`: The message id of each submitted task.
    * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
    *
@@ -100,14 +99,14 @@ export abstract class AbstractPool<
    */
   private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
 
-  /**
-   * Whether the pool is starting or not.
-   */
-  private readonly starting: boolean
   /**
    * Whether the pool is started or not.
    */
   private started: boolean
+  /**
+   * Whether the pool is starting or not.
+   */
+  private starting: boolean
   /**
    * The start timestamp of the pool.
    */
@@ -155,10 +154,11 @@ export abstract class AbstractPool<
 
     this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
 
-    this.starting = true
-    this.startPool()
+    this.started = false
     this.starting = false
-    this.started = true
+    if (this.opts.startWorkers === true) {
+      this.start()
+    }
 
     this.startTimestamp = performance.now()
   }
@@ -222,6 +222,7 @@ export abstract class AbstractPool<
 
   private checkPoolOptions (opts: PoolOptions<Worker>): void {
     if (isPlainObject(opts)) {
+      this.opts.startWorkers = opts.startWorkers ?? true
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
@@ -324,11 +325,6 @@ export abstract class AbstractPool<
         `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
       )
     }
-    if (tasksQueueOptions?.queueMaxSize != null) {
-      throw new Error(
-        'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-      )
-    }
     if (
       tasksQueueOptions?.size != null &&
       !Number.isSafeInteger(tasksQueueOptions?.size)
@@ -344,24 +340,13 @@ export abstract class AbstractPool<
     }
   }
 
-  private startPool (): void {
-    while (
-      this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
-        0
-      ) < this.numberOfWorkers
-    ) {
-      this.createAndSetupWorkerNode()
-    }
-  }
-
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
       version,
       type: this.type,
       worker: this.worker,
+      started: this.started,
       ready: this.ready,
       strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
       minSize: this.minSize,
@@ -685,7 +670,9 @@ export abstract class AbstractPool<
     return {
       ...{
         size: Math.pow(this.maxSize, 2),
-        concurrency: 1
+        concurrency: 1,
+        taskStealing: true,
+        tasksStealingOnBackPressure: true
       },
       ...tasksQueueOptions
     }
@@ -722,14 +709,13 @@ export abstract class AbstractPool<
               (this.opts.tasksQueueOptions?.concurrency as number)
         ) === -1
       )
-    } else {
-      return (
-        this.workerNodes.findIndex(
-          workerNode =>
-            workerNode.info.ready && workerNode.usage.tasks.executing === 0
-        ) === -1
-      )
     }
+    return (
+      this.workerNodes.findIndex(
+        workerNode =>
+          workerNode.info.ready && workerNode.usage.tasks.executing === 0
+      ) === -1
+    )
   }
 
   private async sendTaskFunctionOperationToWorker (
@@ -870,7 +856,7 @@ export abstract class AbstractPool<
   ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
       if (!this.started) {
-        reject(new Error('Cannot execute a task on destroyed pool'))
+        reject(new Error('Cannot execute a task on not started pool'))
         return
       }
       if (name != null && typeof name !== 'string') {
@@ -916,6 +902,22 @@ export abstract class AbstractPool<
     })
   }
 
+  /** @inheritdoc */
+  public start (): void {
+    this.starting = true
+    while (
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+        0
+      ) < this.numberOfWorkers
+    ) {
+      this.createAndSetupWorkerNode()
+    }
+    this.starting = false
+    this.started = true
+  }
+
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
@@ -1248,8 +1250,7 @@ export abstract class AbstractPool<
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     this.sendToWorker(workerNodeKey, {
-      checkActive: true,
-      workerId: workerInfo.id as number
+      checkActive: true
     })
     if (this.taskFunctions.size > 0) {
       for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
@@ -1300,10 +1301,14 @@ export abstract class AbstractPool<
     // Send the statistics message to worker.
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
-      this.workerNodes[workerNodeKey].onEmptyQueue =
-        this.taskStealingOnEmptyQueue.bind(this)
-      this.workerNodes[workerNodeKey].onBackPressure =
-        this.tasksStealingOnBackPressure.bind(this)
+      if (this.opts.tasksQueueOptions?.taskStealing === true) {
+        this.workerNodes[workerNodeKey].onEmptyQueue =
+          this.taskStealingOnEmptyQueue.bind(this)
+      }
+      if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+        this.workerNodes[workerNodeKey].onBackPressure =
+          this.tasksStealingOnBackPressure.bind(this)
+      }
     }
   }