X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=615cf29604cdb1b2309b412be1df8f358d09958e;hb=ce1b31beefa0f80927316fc762e5186f823e03c7;hp=097882b7dc971112b98050d95c52d43776f27362;hpb=658b9aa08266ed9a8ae3c0fc947d237fa2674f09;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 097882b7..615cf296 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { MessageValue, PromiseResponseWrapper } from '../utility-types' import { + DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, isKillBehavior, @@ -153,7 +154,25 @@ export abstract class AbstractPool< 'Cannot instantiate a pool with a negative number of workers' ) } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) { - throw new Error('Cannot instantiate a fixed pool with no worker') + throw new RangeError('Cannot instantiate a fixed pool with zero worker') + } + } + + protected checkDynamicPoolSize (min: number, max: number): void { + if (this.type === PoolTypes.dynamic) { + if (min > max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' + ) + } else if (min === 0 && max === 0) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero' + ) + } else if (min === max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' + ) + } } } @@ -252,6 +271,8 @@ export abstract class AbstractPool< version, type: this.type, worker: this.worker, + ready: this.ready, + strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, minSize: this.minSize, maxSize: this.maxSize, ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -288,7 +309,7 @@ export abstract class AbstractPool< ), maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.maxQueued, + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), 0 ), failedTasks: this.workerNodes.reduce( @@ -381,6 +402,21 @@ export abstract class AbstractPool< } } + private get starting (): boolean { + return ( + this.workerNodes.length < this.minSize || + (this.workerNodes.length >= this.minSize && + this.workerNodes.some(workerNode => !workerNode.info.ready)) + ) + } + + private get ready (): boolean { + return ( + this.workerNodes.length >= this.minSize && + this.workerNodes.every(workerNode => workerNode.info.ready) + ) + } + /** * Gets the approximate pool utilization. * @@ -435,6 +471,17 @@ export abstract class AbstractPool< ?.worker } + private checkMessageWorkerId (message: MessageValue): void { + if ( + message.workerId != null && + this.getWorkerById(message.workerId) == null + ) { + throw new Error( + `Worker message received from unknown worker '${message.workerId}'` + ) + } + } + /** * Gets the given worker its worker node key. * @@ -542,10 +589,11 @@ export abstract class AbstractPool< const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { - name, + name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), timestamp, + workerId: this.getWorkerInfo(workerNodeKey).id as number, id: randomUUID() } const res = new Promise((resolve, reject) => { @@ -624,6 +672,11 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) + const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + task.name as string + ) as WorkerUsage + ++tasksWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(tasksWorkerUsage, task) } /** @@ -637,10 +690,17 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage + const workerNodeKey = this.getWorkerNodeKey(worker) + const workerUsage = this.workerNodes[workerNodeKey].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) + const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + message.name as string + ) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(tasksWorkerUsage, message) + this.updateRunTimeWorkerUsage(tasksWorkerUsage, message) + this.updateEluWorkerUsage(tasksWorkerUsage, message) } private updateTaskStatisticsWorkerUsage ( @@ -864,6 +924,13 @@ export abstract class AbstractPool< protected afterWorkerSetup (worker: Worker): void { // Listen to worker messages. this.registerWorkerMessageListener(worker, this.workerListener()) + // Send startup message to worker. + this.sendToWorker(worker, { + ready: false, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number + }) + // Setup worker task statistics computation. + this.setWorkerStatistics(worker) } /** @@ -883,7 +950,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(worker) } - if (this.opts.restartWorkerOnError === true) { + if (this.opts.restartWorkerOnError === true && !this.starting) { if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { this.createAndSetupDynamicWorker() } else { @@ -899,8 +966,6 @@ export abstract class AbstractPool< this.pushWorkerNode(worker) - this.setWorkerStatistics(worker) - this.afterWorkerSetup(worker) return worker @@ -941,7 +1006,6 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorker (): Worker { const worker = this.createAndSetupWorker() - this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true this.registerWorkerMessageListener(worker, message => { const workerNodeKey = this.getWorkerNodeKey(worker) if ( @@ -957,7 +1021,12 @@ export abstract class AbstractPool< void (this.destroyWorker(worker) as Promise) } }) - this.sendToWorker(worker, { dynamic: true }) + const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker)) + workerInfo.dynamic = true + this.sendToWorker(worker, { + checkAlive: true, + workerId: workerInfo.id as number + }) return worker } @@ -968,9 +1037,10 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.workerId != null && message.started != null) { - // Worker started message received - this.handleWorkerStartedMessage(message) + this.checkMessageWorkerId(message) + if (message.ready != null && message.workerId != null) { + // Worker ready message received + this.handleWorkerReadyMessage(message) } else if (message.id != null) { // Task execution response received this.handleTaskExecutionResponse(message) @@ -978,18 +1048,12 @@ export abstract class AbstractPool< } } - private handleWorkerStartedMessage (message: MessageValue): void { - // Worker started message received - const worker = this.getWorkerById(message.workerId as number) - if (worker != null) { - this.workerNodes[this.getWorkerNodeKey(worker)].info.started = - message.started as boolean - } else { - throw new Error( - `Worker started message received from unknown worker '${ - message.workerId as number - }'` - ) + private handleWorkerReadyMessage (message: MessageValue): void { + const worker = this.getWorkerById(message.workerId) + this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready = + message.ready as boolean + if (this.emitter != null && this.ready) { + this.emitter.emit(PoolEvents.ready, this.info) } } @@ -1104,7 +1168,8 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - } + }, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number }) } }