X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=ba05d92959a83f362fd721b4ca3615e66c86d196;hb=76aa2b0f39bd376383f2fdc79046328693719590;hp=71f765ac575da13933a1543cd069feb0b6b1984d;hpb=465b29401ecddad49070d3b0df4e55dc3902788c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 71f765ac..ba05d929 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -19,7 +19,8 @@ import { type PoolType, PoolTypes, type TasksQueueOptions, - type WorkerType + type WorkerType, + WorkerTypes } from './pool' import type { IWorker, @@ -236,6 +237,14 @@ export abstract class AbstractPool< } } + private get starting (): boolean { + return this.workerNodes.some(workerNode => !workerNode.info.started) + } + + private get started (): boolean { + return this.workerNodes.some(workerNode => workerNode.info.started) + } + /** @inheritDoc */ public get info (): PoolInfo { return { @@ -306,6 +315,17 @@ export abstract class AbstractPool< */ protected abstract get maxSize (): number + /** + * Get the worker given its id. + * + * @param workerId - The worker id. + * @returns The worker if found in the pool worker nodes, `undefined` otherwise. + */ + private getWorkerById (workerId: number): Worker | undefined { + return this.workerNodes.find(workerNode => workerNode.info.id === workerId) + ?.worker + } + /** * Gets the given worker its worker node key. * @@ -712,7 +732,7 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - if (this.opts.restartWorkerOnError === true) { + if (this.opts.restartWorkerOnError === true && !this.starting) { this.createAndSetupWorker() } }) @@ -763,7 +783,16 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.id != null) { + if (message.workerId != null && message.started != null) { + // Worker started message received + const worker = this.getWorkerById(message.workerId) + if (worker != null) { + this.workerNodes[this.getWorkerNodeKey(worker)].info.started = + message.started + } else { + throw new Error('Worker started message received from unknown worker') + } + } else if (message.id != null) { // Task execution response received const promiseResponse = this.promiseResponseMap.get(message.id) if (promiseResponse != null) { @@ -826,6 +855,7 @@ export abstract class AbstractPool< private pushWorkerNode (worker: Worker): number { this.workerNodes.push({ worker, + info: { id: this.getWorkerId(worker), started: false }, usage: this.getWorkerUsage(), tasksQueue: new Queue>() }) @@ -837,22 +867,39 @@ export abstract class AbstractPool< return this.workerNodes.length } + /** + * Gets the worker id. + * + * @param worker - The worker. + * @returns The worker id. + */ + private getWorkerId (worker: Worker): number | undefined { + if (this.worker === WorkerTypes.thread) { + return worker.threadId + } else if (this.worker === WorkerTypes.cluster) { + return worker.id + } + } + // /** // * Sets the given worker in the pool worker nodes. // * // * @param workerNodeKey - The worker node key. // * @param worker - The worker. + // * @param workerInfo - The worker info. // * @param workerUsage - The worker usage. // * @param tasksQueue - The worker task queue. // */ // private setWorkerNode ( // workerNodeKey: number, // worker: Worker, + // workerInfo: WorkerInfo, // workerUsage: WorkerUsage, // tasksQueue: Queue> // ): void { // this.workerNodes[workerNodeKey] = { // worker, + // info: workerInfo, // usage: workerUsage, // tasksQueue // }