X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=3f6fa98ae373cfb7054747f089ebaa6fb0c84420;hb=efebef40316438ac5a7244afdbfe41841295514c;hp=474115630a747fa6a83724faa6a3c1953382e7fb;hpb=1851fed0d7a9a5f5449797b01848dd44386c1517;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 47411563..3f6fa98a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -283,6 +283,7 @@ export abstract class AbstractPool< ready: this.ready, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion strategy: this.opts.workerChoiceStrategy!, + strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0, minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() @@ -443,6 +444,9 @@ export abstract class AbstractPool< * The pool readiness boolean status. */ private get ready (): boolean { + if (this.empty) { + return false + } return ( this.workerNodes.reduce( (accumulator, workerNode) => @@ -454,6 +458,13 @@ export abstract class AbstractPool< ) } + /** + * The pool emptiness boolean status. + */ + protected get empty (): boolean { + return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 + } + /** * The approximate pool utilization. * @@ -1244,7 +1255,8 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode.terminate().catch(error => { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1272,7 +1284,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) - const workerUsage = this.workerNodes[localWorkerNodeKey].usage + const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || @@ -1602,7 +1614,9 @@ export abstract class AbstractPool< this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) - .catch(EMPTY_FUNCTION) + .catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) } private readonly workerNodeStealTask = ( @@ -1708,6 +1722,13 @@ export abstract class AbstractPool< } } + private checkAndEmitReadyEvent (): void { + if (!this.readyEventEmitted && this.ready) { + this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true + } + } + private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message if (ready == null || !ready) { @@ -1717,10 +1738,7 @@ export abstract class AbstractPool< this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] workerNode.info.ready = ready workerNode.info.taskFunctionNames = taskFunctionNames - if (!this.readyEventEmitted && this.ready) { - this.emitter?.emit(PoolEvents.ready, this.info) - this.readyEventEmitted = true - } + this.checkAndEmitReadyEvent() } private handleTaskExecutionResponse (message: MessageValue): void { @@ -1748,7 +1766,8 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) - workerNode.emit('taskFinished', taskId) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.emit('taskFinished', taskId) if (this.opts.enableTasksQueue === true && !this.destroying) { const workerNodeTasksUsage = workerNode.usage.tasks if ( @@ -1766,8 +1785,7 @@ export abstract class AbstractPool< workerNodeTasksUsage.sequentiallyStolen === 0 ) { workerNode.emit('idle', { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerId: workerId!, + workerId, workerNodeKey }) } @@ -1844,6 +1862,13 @@ export abstract class AbstractPool< return workerNodeKey } + private checkAndEmitEmptyEvent (): void { + if (this.empty) { + this.emitter?.emit(PoolEvents.empty, this.info) + this.readyEventEmitted = false + } + } + /** * Removes the worker node from the pool worker nodes. * @@ -1855,6 +1880,7 @@ export abstract class AbstractPool< this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext?.remove(workerNodeKey) } + this.checkAndEmitEmptyEvent() } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {