X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6fde2491b6e1a565ba48ce26d821407eff67c8a2;hb=f59e102739e13698f278f1d9d58ab26ed8150442;hp=da0f2e77358525336a5936a42841edce9016f44b;hpb=3c93feb918b47943d801029fad82a61469e40127;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index da0f2e77..6fde2491 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -236,6 +236,14 @@ export abstract class AbstractPool< } } + private get starting (): boolean { + return this.workerNodes.some(workerNode => !workerNode.info.started) + } + + private get started (): boolean { + return this.workerNodes.some(workerNode => workerNode.info.started) + } + /** @inheritDoc */ public get info (): PoolInfo { return { @@ -246,41 +254,39 @@ export abstract class AbstractPool< workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => - workerNode.workerUsage.tasks.executing === 0 + workerNode.usage.tasks.executing === 0 ? accumulator + 1 : accumulator, 0 ), busyWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => - workerNode.workerUsage.tasks.executing > 0 - ? accumulator + 1 - : accumulator, + workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.executed, + accumulator + workerNode.usage.tasks.executed, 0 ), executingTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.executing, + accumulator + workerNode.usage.tasks.executing, 0 ), queuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.queued, + accumulator + workerNode.usage.tasks.queued, 0 ), maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.maxQueued, + accumulator + workerNode.usage.tasks.maxQueued, 0 ), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.failed, + accumulator + workerNode.usage.tasks.failed, 0 ) } @@ -308,11 +314,22 @@ export abstract class AbstractPool< */ protected abstract get maxSize (): number + /** + * Get the worker given its id. + * + * @param workerId - The worker id. + * @returns The worker if found in the pool worker nodes, `undefined` otherwise. + */ + private getWorkerById (workerId: number): Worker | undefined { + return this.workerNodes.find(workerNode => workerNode.info.id === workerId) + ?.worker + } + /** * Gets the given worker its worker node key. * * @param worker - The worker. - * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise. + * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ private getWorkerNodeKey (worker: Worker): number { return this.workerNodes.findIndex( @@ -408,7 +425,7 @@ export abstract class AbstractPool< protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { - return workerNode.workerUsage.tasks.executing === 0 + return workerNode.usage.tasks.executing === 0 }) === -1 ) } @@ -434,7 +451,7 @@ export abstract class AbstractPool< if ( this.opts.enableTasksQueue === true && (this.busy || - this.workerNodes[workerNodeKey].workerUsage.tasks.executing >= + this.workerNodes[workerNodeKey].usage.tasks.executing >= ((this.opts.tasksQueueOptions as TasksQueueOptions) .concurrency as number)) ) { @@ -491,7 +508,7 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { - const workerUsage = this.workerNodes[workerNodeKey].workerUsage + const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) } @@ -507,8 +524,7 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerUsage = - this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage + const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) @@ -715,7 +731,7 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - if (this.opts.restartWorkerOnError === true) { + if (this.opts.restartWorkerOnError === true && !this.starting) { this.createAndSetupWorker() } }) @@ -747,11 +763,9 @@ export abstract class AbstractPool< isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && ((this.opts.enableTasksQueue === false && - this.workerNodes[workerNodeKey].workerUsage.tasks.executing === - 0) || + this.workerNodes[workerNodeKey].usage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].workerUsage.tasks.executing === - 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && this.tasksQueueSize(workerNodeKey) === 0))) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) @@ -768,7 +782,12 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.id != null) { + if (message.workerId != null && message.started != null) { + // Worker started message received + this.workerNodes[ + this.getWorkerNodeKey(this.getWorkerById(message.workerId) as Worker) + ].info.started = message.started + } else if (message.id != null) { // Task execution response received const promiseResponse = this.promiseResponseMap.get(message.id) if (promiseResponse != null) { @@ -819,7 +838,7 @@ export abstract class AbstractPool< workerNode: WorkerNode, workerUsage: WorkerUsage ): void { - workerNode.workerUsage = workerUsage + workerNode.usage = workerUsage } /** @@ -831,7 +850,8 @@ export abstract class AbstractPool< private pushWorkerNode (worker: Worker): number { this.workerNodes.push({ worker, - workerUsage: this.getWorkerUsage(), + info: { id: worker.threadId ?? worker.id, started: false }, + usage: this.getWorkerUsage(), tasksQueue: new Queue>() }) const workerNodeKey = this.getWorkerNodeKey(worker) @@ -847,18 +867,21 @@ export abstract class AbstractPool< // * // * @param workerNodeKey - The worker node key. // * @param worker - The worker. + // * @param workerInfo - The worker info. // * @param workerUsage - The worker usage. // * @param tasksQueue - The worker task queue. // */ // private setWorkerNode ( // workerNodeKey: number, // worker: Worker, + // workerInfo: WorkerInfo, // workerUsage: WorkerUsage, // tasksQueue: Queue> // ): void { // this.workerNodes[workerNodeKey] = { // worker, - // workerUsage, + // info: workerInfo, + // usage: workerUsage, // tasksQueue // } // }