X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=bc77874ff423bb3ee112f6150db81962ae5d6e69;hb=b79570c2ee7d866b615889836c29ece32d559611;hp=5d79cb43b09c18ea6c4f7b5fc46382822ba48a41;hpb=5d240702143a4ab79b203319baf3bc9bbfed751d;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 5d79cb43..bc77874f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -283,10 +283,11 @@ export abstract class AbstractPool< ready: this.ready, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion strategy: this.opts.workerChoiceStrategy!, + strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0, minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.runTime.aggregate === true && + .runTime.aggregate === true && this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.aggregate && { utilization: round(this.utilization) @@ -351,7 +352,7 @@ export abstract class AbstractPool< 0 ), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.runTime.aggregate === true && { + .runTime.aggregate === true && { runTime: { minimum: round( min( @@ -394,7 +395,7 @@ export abstract class AbstractPool< } }), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.waitTime.aggregate === true && { + .waitTime.aggregate === true && { waitTime: { minimum: round( min( @@ -443,6 +444,9 @@ export abstract class AbstractPool< * The pool readiness boolean status. */ private get ready (): boolean { + if (this.empty) { + return false + } return ( this.workerNodes.reduce( (accumulator, workerNode) => @@ -454,6 +458,13 @@ export abstract class AbstractPool< ) } + /** + * The pool emptiness boolean status. + */ + protected get empty (): boolean { + return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 + } + /** * The approximate pool utilization. * @@ -703,7 +714,7 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - const workerId = this.getWorkerInfo(workerNodeKey).id + const workerId = this.getWorkerInfo(workerNodeKey)?.id if ( message.taskFunctionOperationStatus != null && message.workerId === workerId @@ -1157,6 +1168,7 @@ export abstract class AbstractPool< private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( + workerInfo != null && Array.isArray(workerInfo.taskFunctionNames) && workerInfo.taskFunctionNames.length > 2 ) @@ -1243,7 +1255,8 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode.terminate().catch(error => { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1271,7 +1284,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) - const workerUsage = this.workerNodes[localWorkerNodeKey].usage + const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || @@ -1405,9 +1418,9 @@ export abstract class AbstractPool< statistics: { runTime: this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.runTime.aggregate ?? false, + .runTime.aggregate ?? false, elu: - this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu .aggregate ?? false } }) @@ -1536,13 +1549,14 @@ export abstract class AbstractPool< (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { - if (previousStolenTask != null) { + if (workerInfo != null && previousStolenTask != null) { workerInfo.stealing = false } return } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( + workerInfo != null && previousStolenTask != null && workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || @@ -1560,6 +1574,11 @@ export abstract class AbstractPool< this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( @@ -1595,7 +1614,9 @@ export abstract class AbstractPool< this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) - .catch(EMPTY_FUNCTION) + .catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) } private readonly workerNodeStealTask = ( @@ -1660,6 +1681,11 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions!.size! - sizeOffset ) { const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const task = sourceWorkerNode.popTask()! @@ -1687,9 +1713,19 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (taskFunctionNames != null) { // Task function names message received from worker - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(workerId) - ).taskFunctionNames = taskFunctionNames + ) + if (workerInfo != null) { + workerInfo.taskFunctionNames = taskFunctionNames + } + } + } + + private checkAndEmitReadyEvent (): void { + if (!this.readyEventEmitted && this.ready) { + this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true } } @@ -1702,10 +1738,7 @@ export abstract class AbstractPool< this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] workerNode.info.ready = ready workerNode.info.taskFunctionNames = taskFunctionNames - if (!this.readyEventEmitted && this.ready) { - this.emitter?.emit(PoolEvents.ready, this.info) - this.readyEventEmitted = true - } + this.checkAndEmitReadyEvent() } private handleTaskExecutionResponse (message: MessageValue): void { @@ -1733,8 +1766,14 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) - workerNode.emit('taskFinished', taskId) - if (this.opts.enableTasksQueue === true && !this.destroying) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.emit('taskFinished', taskId) + if ( + this.opts.enableTasksQueue === true && + !this.destroying && + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode != null + ) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && @@ -1751,8 +1790,7 @@ export abstract class AbstractPool< workerNodeTasksUsage.sequentiallyStolen === 0 ) { workerNode.emit('idle', { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerId: workerId!, + workerId, workerNodeKey }) } @@ -1783,13 +1821,8 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { - const workerInfo = this.workerNodes[workerNodeKey]?.info - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (workerInfo == null) { - throw new Error(`Worker node with key '${workerNodeKey}' not found`) - } - return workerInfo + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { + return this.workerNodes[workerNodeKey]?.info } /** @@ -1834,6 +1867,13 @@ export abstract class AbstractPool< return workerNodeKey } + private checkAndEmitEmptyEvent (): void { + if (this.empty) { + this.emitter?.emit(PoolEvents.empty, this.info) + this.readyEventEmitted = false + } + } + /** * Removes the worker node from the pool worker nodes. * @@ -1845,10 +1885,14 @@ export abstract class AbstractPool< this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext?.remove(workerNodeKey) } + this.checkAndEmitEmptyEvent() } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { - this.getWorkerInfo(workerNodeKey).ready = false + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.ready = false + } } private hasBackPressure (): boolean {