From b5e113f6151d0647d6ec9b3be9c000e83db9065e Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 21 Jul 2023 18:03:25 +0200 Subject: [PATCH] fix: ensure task concurrency is enforced MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 15 ++++++++------- src/worker/abstract-worker.ts | 1 - 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 0d3d2183..dde31ea4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -648,8 +648,7 @@ export abstract class AbstractPool< this.opts.enableTasksQueue === true && (this.busy || this.workerNodes[workerNodeKey].usage.tasks.executing >= - ((this.opts.tasksQueueOptions as TasksQueueOptions) - .concurrency as number)) + (this.opts.tasksQueueOptions?.concurrency as number)) ) { this.enqueueTask(workerNodeKey, task) } else { @@ -936,14 +935,14 @@ export abstract class AbstractPool< } }) const workerInfo = this.getWorkerInfo(workerNodeKey) - workerInfo.dynamic = true - if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { - workerInfo.ready = true - } this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) + workerInfo.dynamic = true + if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { + workerInfo.ready = true + } return workerNodeKey } @@ -1085,7 +1084,9 @@ export abstract class AbstractPool< this.promiseResponseMap.delete(message.id as string) if ( this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 + this.tasksQueueSize(workerNodeKey) > 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask( workerNodeKey, diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 0f0e0a39..428f6313 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -335,7 +335,6 @@ export abstract class AbstractWorker< this.checkActive.bind(this), (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2 ) - this.activeInterval.unref() } /** -- 2.34.1