refactor: prepare the code to handle task abstraction at execute
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Apr 2023 21:34:50 +0000 (23:34 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Apr 2023 21:34:50 +0000 (23:34 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
.eslintrc.js
src/pools/abstract-pool.ts
tests/pools/selection-strategies/selection-strategies.test.js

index 1d6840072e7dcc07e92d2586af15623120b927f8..70a8ec3c10cec2bea12b3d01707155e585a7b406 100644 (file)
@@ -42,7 +42,9 @@ module.exports = defineConfig({
           'cpus',
           'ctx',
           'deprecations',
+          'dequeue',
           'ecma',
+          'enqueue',
           'enum',
           'esm',
           'fibonacci',
index 0fb695fbc5d72dfbaf268a219d32983597c80028..b622209187ec64639f4c1bea32055df07a121ec6 100644 (file)
@@ -75,7 +75,7 @@ export abstract class AbstractPool<
     this.checkFilePath(this.filePath)
     this.checkPoolOptions(this.opts)
 
-    this.chooseWorker.bind(this)
+    this.chooseWorkerNode.bind(this)
     this.internalExecute.bind(this)
     this.checkAndEmitFull.bind(this)
     this.checkAndEmitBusy.bind(this)
@@ -212,16 +212,24 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async execute (data: Data): Promise<Response> {
-    const [workerNodeKey, worker] = this.chooseWorker()
-    const messageId = crypto.randomUUID()
-    const res = this.internalExecute(workerNodeKey, worker, messageId)
-    this.checkAndEmitFull()
-    this.checkAndEmitBusy()
-    this.sendToWorker(worker, {
+    const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+    const submittedTask: Task<Data> = {
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
-      id: messageId
-    })
+      id: crypto.randomUUID()
+    }
+    const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
+    let currentTask: Task<Data>
+    // FIXME: Add sensible conditions to start tasks queuing on the worker node.
+    if (this.tasksQueueLength(workerNodeKey) > 0) {
+      currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
+      this.enqueueTask(workerNodeKey, submittedTask)
+    } else {
+      currentTask = submittedTask
+    }
+    this.sendToWorker(workerNode.worker, currentTask)
+    this.checkAndEmitFull()
+    this.checkAndEmitBusy()
     // eslint-disable-next-line @typescript-eslint/return-await
     return res
   }
@@ -305,31 +313,31 @@ export abstract class AbstractPool<
    *
    * The default uses a round robin algorithm to distribute the load.
    *
-   * @returns [worker node key, worker].
+   * @returns [worker node key, worker node].
    */
-  protected chooseWorker (): [number, Worker] {
+  protected chooseWorkerNode (): [number, WorkerNode<Worker, Data>] {
     let workerNodeKey: number
     if (
       this.type === PoolType.DYNAMIC &&
       !this.full &&
       this.findFreeWorkerNodeKey() === -1
     ) {
-      const createdWorker = this.createAndSetupWorker()
-      this.registerWorkerMessageListener(createdWorker, message => {
+      const workerCreated = this.createAndSetupWorker()
+      this.registerWorkerMessageListener(workerCreated, message => {
         if (
           isKillBehavior(KillBehaviors.HARD, message.kill) ||
           (message.kill != null &&
-            this.getWorkerTasksUsage(createdWorker)?.running === 0)
+            this.getWorkerTasksUsage(workerCreated)?.running === 0)
         ) {
           // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-          void this.destroyWorker(createdWorker)
+          void this.destroyWorker(workerCreated)
         }
       })
-      workerNodeKey = this.getWorkerNodeKey(createdWorker)
+      workerNodeKey = this.getWorkerNodeKey(workerCreated)
     } else {
       workerNodeKey = this.workerChoiceStrategyContext.execute()
     }
-    return [workerNodeKey, this.workerNodes[workerNodeKey].worker]
+    return [workerNodeKey, this.workerNodes[workerNodeKey]]
   }
 
   /**
@@ -415,12 +423,16 @@ export abstract class AbstractPool<
 
   private async internalExecute (
     workerNodeKey: number,
-    worker: Worker,
-    messageId: string
+    workerNode: WorkerNode<Worker, Data>,
+    task: Task<Data>
   ): Promise<Response> {
     this.beforePromiseResponseHook(workerNodeKey)
     return await new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(messageId, { resolve, reject, worker })
+      this.promiseResponseMap.set(task.id, {
+        resolve,
+        reject,
+        worker: workerNode.worker
+      })
     })
   }
 
@@ -507,4 +519,16 @@ export abstract class AbstractPool<
     this.workerNodes.splice(workerNodeKey, 1)
     this.workerChoiceStrategyContext.remove(workerNodeKey)
   }
+
+  protected enqueueTask (workerNodeKey: number, task: Task<Data>): void {
+    this.workerNodes[workerNodeKey].tasksQueue.push(task)
+  }
+
+  protected dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].tasksQueue.shift()
+  }
+
+  protected tasksQueueLength (workerNodeKey: number): number {
+    return this.workerNodes[workerNodeKey].tasksQueue.length
+  }
 }
index 796c84baba1feb68953e8dd24b3eb610ac898382..2fff1b13772e43c4acad99ad48e7441fda474dee 100644 (file)
@@ -146,14 +146,14 @@ describe('Selection strategies test suite', () => {
     )
     let results = new Set()
     for (let i = 0; i < max; i++) {
-      results.add(pool.chooseWorker()[1].id)
+      results.add(pool.chooseWorkerNode()[1].worker.id)
     }
     expect(results.size).toBe(max)
     await pool.destroy()
     pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js')
     results = new Set()
     for (let i = 0; i < max; i++) {
-      results.add(pool.chooseWorker()[1].threadId)
+      results.add(pool.chooseWorkerNode()[1].worker.threadId)
     }
     expect(results.size).toBe(max)
     await pool.destroy()