refactor: prepare the code to handle task abstraction at execute
[poolifier.git] / src / pools / abstract-pool.ts
index 0fb695fbc5d72dfbaf268a219d32983597c80028..b622209187ec64639f4c1bea32055df07a121ec6 100644 (file)
@@ -75,7 +75,7 @@ export abstract class AbstractPool<
     this.checkFilePath(this.filePath)
     this.checkPoolOptions(this.opts)
 
-    this.chooseWorker.bind(this)
+    this.chooseWorkerNode.bind(this)
     this.internalExecute.bind(this)
     this.checkAndEmitFull.bind(this)
     this.checkAndEmitBusy.bind(this)
@@ -212,16 +212,24 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async execute (data: Data): Promise<Response> {
-    const [workerNodeKey, worker] = this.chooseWorker()
-    const messageId = crypto.randomUUID()
-    const res = this.internalExecute(workerNodeKey, worker, messageId)
-    this.checkAndEmitFull()
-    this.checkAndEmitBusy()
-    this.sendToWorker(worker, {
+    const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+    const submittedTask: Task<Data> = {
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
-      id: messageId
-    })
+      id: crypto.randomUUID()
+    }
+    const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
+    let currentTask: Task<Data>
+    // FIXME: Add sensible conditions to start tasks queuing on the worker node.
+    if (this.tasksQueueLength(workerNodeKey) > 0) {
+      currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
+      this.enqueueTask(workerNodeKey, submittedTask)
+    } else {
+      currentTask = submittedTask
+    }
+    this.sendToWorker(workerNode.worker, currentTask)
+    this.checkAndEmitFull()
+    this.checkAndEmitBusy()
     // eslint-disable-next-line @typescript-eslint/return-await
     return res
   }
@@ -305,31 +313,31 @@ export abstract class AbstractPool<
    *
    * The default uses a round robin algorithm to distribute the load.
    *
-   * @returns [worker node key, worker].
+   * @returns [worker node key, worker node].
    */
-  protected chooseWorker (): [number, Worker] {
+  protected chooseWorkerNode (): [number, WorkerNode<Worker, Data>] {
     let workerNodeKey: number
     if (
       this.type === PoolType.DYNAMIC &&
       !this.full &&
       this.findFreeWorkerNodeKey() === -1
     ) {
-      const createdWorker = this.createAndSetupWorker()
-      this.registerWorkerMessageListener(createdWorker, message => {
+      const workerCreated = this.createAndSetupWorker()
+      this.registerWorkerMessageListener(workerCreated, message => {
         if (
           isKillBehavior(KillBehaviors.HARD, message.kill) ||
           (message.kill != null &&
-            this.getWorkerTasksUsage(createdWorker)?.running === 0)
+            this.getWorkerTasksUsage(workerCreated)?.running === 0)
         ) {
           // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-          void this.destroyWorker(createdWorker)
+          void this.destroyWorker(workerCreated)
         }
       })
-      workerNodeKey = this.getWorkerNodeKey(createdWorker)
+      workerNodeKey = this.getWorkerNodeKey(workerCreated)
     } else {
       workerNodeKey = this.workerChoiceStrategyContext.execute()
     }
-    return [workerNodeKey, this.workerNodes[workerNodeKey].worker]
+    return [workerNodeKey, this.workerNodes[workerNodeKey]]
   }
 
   /**
@@ -415,12 +423,16 @@ export abstract class AbstractPool<
 
   private async internalExecute (
     workerNodeKey: number,
-    worker: Worker,
-    messageId: string
+    workerNode: WorkerNode<Worker, Data>,
+    task: Task<Data>
   ): Promise<Response> {
     this.beforePromiseResponseHook(workerNodeKey)
     return await new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(messageId, { resolve, reject, worker })
+      this.promiseResponseMap.set(task.id, {
+        resolve,
+        reject,
+        worker: workerNode.worker
+      })
     })
   }
 
@@ -507,4 +519,16 @@ export abstract class AbstractPool<
     this.workerNodes.splice(workerNodeKey, 1)
     this.workerChoiceStrategyContext.remove(workerNodeKey)
   }
+
+  protected enqueueTask (workerNodeKey: number, task: Task<Data>): void {
+    this.workerNodes[workerNodeKey].tasksQueue.push(task)
+  }
+
+  protected dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].tasksQueue.shift()
+  }
+
+  protected tasksQueueLength (workerNodeKey: number): number {
+    return this.workerNodes[workerNodeKey].tasksQueue.length
+  }
 }