Merge pull request #747 from poolifier/multiple-functions
[poolifier.git] / src / pools / abstract-pool.ts
index 473789809d2dde4ea57571dd1a2ded5dd7499477..edf2b29a6e9de328133735cea8a6c58b18609db0 100644 (file)
@@ -6,13 +6,15 @@ import {
   median
 } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
+import { CircularArray } from '../circular-array'
+import { Queue } from '../queue'
 import {
+  type IPool,
   PoolEmitter,
   PoolEvents,
-  type IPool,
   type PoolOptions,
-  type TasksQueueOptions,
-  PoolType
+  PoolType,
+  type TasksQueueOptions
 } from './pool'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
 import {
@@ -21,7 +23,6 @@ import {
   type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { CircularArray } from '../circular-array'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -197,7 +198,7 @@ export abstract class AbstractPool<
       return 0
     }
     return this.workerNodes.reduce(
-      (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length,
+      (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
       0
     )
   }
@@ -304,9 +305,10 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data): Promise<Response> {
+  public async execute (data?: Data, name?: string): Promise<Response> {
     const [workerNodeKey, workerNode] = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
+      name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
       id: crypto.randomUUID()
@@ -387,7 +389,7 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage
+    const workerTasksUsage = this.getWorkerTasksUsage(worker)
     --workerTasksUsage.running
     ++workerTasksUsage.run
     if (message.error != null) {
@@ -560,7 +562,7 @@ export abstract class AbstractPool<
    * @throws Error if the worker is not found in the pool worker nodes.
    * @returns The worker tasks usage.
    */
-  private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
+  private getWorkerTasksUsage (worker: Worker): TasksUsage {
     const workerNodeKey = this.getWorkerNodeKey(worker)
     if (workerNodeKey !== -1) {
       return this.workerNodes[workerNodeKey].tasksUsage
@@ -586,7 +588,7 @@ export abstract class AbstractPool<
         medRunTime: 0,
         error: 0
       },
-      tasksQueue: []
+      tasksQueue: new Queue<Task<Data>>()
     })
   }
 
@@ -602,7 +604,7 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     worker: Worker,
     tasksUsage: TasksUsage,
-    tasksQueue: Array<Task<Data>>
+    tasksQueue: Queue<Task<Data>>
   ): void {
     this.workerNodes[workerNodeKey] = {
       worker,
@@ -628,21 +630,24 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.push(task)
+    return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].tasksQueue.shift()
+    return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
   }
 
   private tasksQueueSize (workerNodeKey: number): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.length
+    return this.workerNodes[workerNodeKey].tasksQueue.size
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
     if (this.tasksQueueSize(workerNodeKey) > 0) {
-      for (const task of this.workerNodes[workerNodeKey].tasksQueue) {
-        this.executeTask(workerNodeKey, task)
+      for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
+        this.executeTask(
+          workerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
       }
     }
   }