fix: ensure task concurrency is enforced
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Jul 2023 16:03:25 +0000 (18:03 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Jul 2023 16:03:25 +0000 (18:03 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/worker/abstract-worker.ts

index 0d3d21832a715191e6efad9723c6a47454ae2215..dde31ea49f20d7ef4a14015c3faec458683b9c2a 100644 (file)
@@ -648,8 +648,7 @@ export abstract class AbstractPool<
         this.opts.enableTasksQueue === true &&
         (this.busy ||
           this.workerNodes[workerNodeKey].usage.tasks.executing >=
-            ((this.opts.tasksQueueOptions as TasksQueueOptions)
-              .concurrency as number))
+            (this.opts.tasksQueueOptions?.concurrency as number))
       ) {
         this.enqueueTask(workerNodeKey, task)
       } else {
@@ -936,14 +935,14 @@ export abstract class AbstractPool<
       }
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
-    workerInfo.dynamic = true
-    if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
-      workerInfo.ready = true
-    }
     this.sendToWorker(workerNodeKey, {
       checkActive: true,
       workerId: workerInfo.id as number
     })
+    workerInfo.dynamic = true
+    if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
+      workerInfo.ready = true
+    }
     return workerNodeKey
   }
 
@@ -1085,7 +1084,9 @@ export abstract class AbstractPool<
       this.promiseResponseMap.delete(message.id as string)
       if (
         this.opts.enableTasksQueue === true &&
-        this.tasksQueueSize(workerNodeKey) > 0
+        this.tasksQueueSize(workerNodeKey) > 0 &&
+        this.workerNodes[workerNodeKey].usage.tasks.executing <
+          (this.opts.tasksQueueOptions?.concurrency as number)
       ) {
         this.executeTask(
           workerNodeKey,
index 0f0e0a393053b76bae28df789efe0b435ce36572..428f63134f5666a13087ca564a22336236f00db2 100644 (file)
@@ -335,7 +335,6 @@ export abstract class AbstractWorker<
       this.checkActive.bind(this),
       (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
     )
-    this.activeInterval.unref()
   }
 
   /**