X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=615cf29604cdb1b2309b412be1df8f358d09958e;hb=ce1b31beefa0f80927316fc762e5186f823e03c7;hp=687ae25f7f43154b484fafc27bb27ea7102b3961;hpb=84d0f4f2937987e5adbb8cfa94839eaf050c7502;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 687ae25f..615cf296 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { MessageValue, PromiseResponseWrapper } from '../utility-types' import { + DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, isKillBehavior, @@ -158,18 +159,20 @@ export abstract class AbstractPool< } protected checkDynamicPoolSize (min: number, max: number): void { - if (this.type === PoolTypes.dynamic && min > max) { - throw new RangeError( - 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' - ) - } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) { - throw new RangeError( - 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero' - ) - } else if (this.type === PoolTypes.dynamic && min === max) { - throw new RangeError( - 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' - ) + if (this.type === PoolTypes.dynamic) { + if (min > max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' + ) + } else if (min === 0 && max === 0) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero' + ) + } else if (min === max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' + ) + } } } @@ -306,7 +309,7 @@ export abstract class AbstractPool< ), maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.maxQueued, + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), 0 ), failedTasks: this.workerNodes.reduce( @@ -401,14 +404,16 @@ export abstract class AbstractPool< private get starting (): boolean { return ( - !this.full || - (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready)) + this.workerNodes.length < this.minSize || + (this.workerNodes.length >= this.minSize && + this.workerNodes.some(workerNode => !workerNode.info.ready)) ) } private get ready (): boolean { return ( - this.full && this.workerNodes.every(workerNode => workerNode.info.ready) + this.workerNodes.length >= this.minSize && + this.workerNodes.every(workerNode => workerNode.info.ready) ) } @@ -584,7 +589,7 @@ export abstract class AbstractPool< const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { - name, + name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), timestamp, @@ -667,6 +672,11 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) + const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + task.name as string + ) as WorkerUsage + ++tasksWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(tasksWorkerUsage, task) } /** @@ -680,10 +690,17 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage + const workerNodeKey = this.getWorkerNodeKey(worker) + const workerUsage = this.workerNodes[workerNodeKey].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) + const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + message.name as string + ) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(tasksWorkerUsage, message) + this.updateRunTimeWorkerUsage(tasksWorkerUsage, message) + this.updateEluWorkerUsage(tasksWorkerUsage, message) } private updateTaskStatisticsWorkerUsage (