X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=5eeb57911903587c0760dac8fee11e30407cb86a;hb=5b95eb9bcafda56ce57003da834cf4e153bb0509;hp=c5ad9cb3608ac8e2095c23c023c949e0f48ae7eb;hpb=ded253e27e59ae936fe91d789d8454b7eb11dd6a;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c5ad9cb3..5eeb5791 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -117,6 +117,10 @@ export abstract class AbstractPool< * Whether the pool is destroying or not. */ private destroying: boolean + /** + * Whether the minimum number of workers is starting or not. + */ + private startingMinimumNumberOfWorkers: boolean /** * Whether the pool ready event has been emitted or not. */ @@ -175,6 +179,7 @@ export abstract class AbstractPool< this.starting = false this.destroying = false this.readyEventEmitted = false + this.startingMinimumNumberOfWorkers = false if (this.opts.startWorkers === true) { this.start() } @@ -949,6 +954,23 @@ export abstract class AbstractPool< }) } + /** + * Starts the minimum number of workers. + */ + private startMinimumNumberOfWorkers (): void { + this.startingMinimumNumberOfWorkers = true + while ( + this.workerNodes.reduce( + (accumulator, workerNode) => + !workerNode.info.dynamic ? accumulator + 1 : accumulator, + 0 + ) < this.minimumNumberOfWorkers + ) { + this.createAndSetupWorkerNode() + } + this.startingMinimumNumberOfWorkers = false + } + /** @inheritdoc */ public start (): void { if (this.started) { @@ -961,15 +983,7 @@ export abstract class AbstractPool< throw new Error('Cannot start a destroying pool') } this.starting = true - while ( - this.workerNodes.reduce( - (accumulator, workerNode) => - !workerNode.info.dynamic ? accumulator + 1 : accumulator, - 0 - ) < this.minimumNumberOfWorkers - ) { - this.createAndSetupWorkerNode() - } + this.startMinimumNumberOfWorkers() this.starting = false this.started = true } @@ -993,7 +1007,6 @@ export abstract class AbstractPool< ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() - this.emitter?.removeAllListeners() this.readyEventEmitted = false this.destroying = false this.started = false @@ -1235,7 +1248,7 @@ export abstract class AbstractPool< 'error', this.opts.errorHandler ?? EMPTY_FUNCTION ) - workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.registerOnceWorkerEventHandler('error', (error: Error) => { workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) if ( @@ -1245,8 +1258,8 @@ export abstract class AbstractPool< ) { if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() - } else { - this.createAndSetupWorkerNode() + } else if (!this.startingMinimumNumberOfWorkers) { + this.startMinimumNumberOfWorkers() } } if ( @@ -1257,7 +1270,7 @@ export abstract class AbstractPool< this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - workerNode?.terminate().catch(error => { + workerNode?.terminate().catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1267,6 +1280,13 @@ export abstract class AbstractPool< ) workerNode.registerOnceWorkerEventHandler('exit', () => { this.removeWorkerNode(workerNode) + if ( + this.started && + !this.startingMinimumNumberOfWorkers && + !this.destroying + ) { + this.startMinimumNumberOfWorkers() + } }) const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) @@ -1298,7 +1318,7 @@ export abstract class AbstractPool< ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) - this.destroyWorkerNode(localWorkerNodeKey).catch(error => { + this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } @@ -1312,7 +1332,7 @@ export abstract class AbstractPool< taskFunctionOperation: 'add', taskFunctionName, taskFunction: taskFunction.toString() - }).catch(error => { + }).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } @@ -1614,7 +1634,7 @@ export abstract class AbstractPool< this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) - .catch(error => { + .catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) }