X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=91cb5facd01f736d43703d4d0530ec7c01a3bf41;hb=3d76750ac8dacd95003ca89cf9542c3b2a014b8c;hp=7c71c8557f0edcdc6a31ea12fd67649b2a8d0521;hpb=81c02522f2c5538a8e5ce341678724132d4da8ba;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7c71c855..91cb5fac 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -186,7 +186,7 @@ export abstract class AbstractPool< ) } else if (max === 0) { throw new RangeError( - 'Cannot instantiate a dynamic pool with a pool size equal to zero' + 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) } else if (min === max) { throw new RangeError( @@ -334,16 +334,20 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.executing, 0 ), - queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.queued, - 0 - ), - maxQueuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), - 0 - ), + ...(this.opts.enableTasksQueue === true && { + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.queued, + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -611,16 +615,28 @@ export abstract class AbstractPool< protected abstract get busy (): boolean /** - * Whether worker nodes are executing at least one task. + * Whether worker nodes are executing concurrently their tasks quota or not. * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { - return ( - this.workerNodes.findIndex(workerNode => { - return workerNode.usage.tasks.executing === 0 - }) === -1 - ) + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) === -1 + ) + } else { + return ( + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 + ) + } } /** @inheritDoc */ @@ -634,22 +650,22 @@ export abstract class AbstractPool< data: data ?? ({} as Data), timestamp, workerId: this.getWorkerInfo(workerNodeKey).id as number, - id: randomUUID() + taskId: randomUUID() } - this.promiseResponseMap.set(task.id as string, { + this.promiseResponseMap.set(task.taskId as string, { resolve, reject, workerNodeKey }) if ( - this.opts.enableTasksQueue === true && - (this.busy || - this.workerNodes[workerNodeKey].usage.tasks.executing >= + this.opts.enableTasksQueue === false || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number)) ) { - this.enqueueTask(workerNodeKey, task) - } else { this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) } this.checkAndEmitEvents() }) @@ -1032,7 +1048,7 @@ export abstract class AbstractPool< if (message.ready != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.id != null) { + } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) } @@ -1049,7 +1065,9 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get(message.id as string) + const promiseResponse = this.promiseResponseMap.get( + message.taskId as string + ) if (promiseResponse != null) { if (message.taskError != null) { this.emitter?.emit(PoolEvents.taskError, message.taskError) @@ -1059,7 +1077,7 @@ export abstract class AbstractPool< } const workerNodeKey = promiseResponse.workerNodeKey this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(message.id as string) + this.promiseResponseMap.delete(message.taskId as string) if ( this.opts.enableTasksQueue === true && this.tasksQueueSize(workerNodeKey) > 0 &&