]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(qav3): share common code between freqai model and strategy
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 6 Aug 2025 17:37:53 +0000 (19:37 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 6 Aug 2025 17:37:53 +0000 (19:37 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/user_data/freqaimodels/ReforceXY.py
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index a13e08dac4a9abec20ab28b03804102c8449eea9..0c315f0e4dd37182e4bd1b72367ac9e7940a7658 100644 (file)
@@ -1,13 +1,13 @@
 import copy
-from functools import lru_cache
 import gc
 import json
 import logging
-import warnings
 import time
+import warnings
 from enum import Enum
+from functools import lru_cache
 from pathlib import Path
-from typing import Any, Callable, Dict, Optional, Type, Tuple
+from typing import Any, Callable, Dict, Optional, Tuple, Type
 
 import matplotlib
 import matplotlib.pyplot as plt
@@ -20,9 +20,9 @@ from optuna import Trial, TrialPruned, create_study
 from optuna.exceptions import ExperimentalWarning
 from optuna.pruners import HyperbandPruner
 from optuna.samplers import TPESampler
-from optuna.study import Study, StudyDirection
 from optuna.storages import BaseStorage, JournalStorage, RDBStorage
 from optuna.storages.journal import JournalFileBackend
+from optuna.study import Study, StudyDirection
 from pandas import DataFrame, concat, merge
 from sb3_contrib.common.maskable.callbacks import MaskableEvalCallback
 from stable_baselines3.common.callbacks import (
@@ -37,7 +37,6 @@ from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack, VecMoni
 
 from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
 from freqtrade.freqai.RL.Base5ActionRLEnv import Actions, Base5ActionRLEnv, Positions
-from freqtrade.freqai.RL.BaseEnvironment import BaseEnvironment
 from freqtrade.freqai.RL.BaseReinforcementLearningModel import (
     BaseReinforcementLearningModel,
 )
index 7e177fb1ff3e36e9796b4457aef083f1f48e9e3a..934765e209f0032f0a7f0dd514c5b9f26a0317a1 100644 (file)
@@ -1,26 +1,33 @@
 import copy
-from enum import IntEnum
-import logging
 import json
-import math
+import logging
 import random
-from statistics import median
 import time
+import warnings
+from functools import cached_property
+from pathlib import Path
+from typing import Any, Callable, Optional
+
 import numpy as np
+import optuna
 import pandas as pd
 import scipy as sp
-import optuna
-import sklearn
 import skimage
-import warnings
-import talib.abstract as ta
-
-from functools import cached_property, lru_cache
-from typing import Any, Callable, Optional
-from pathlib import Path
+import sklearn
 from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
 from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
 
+from Utils import (
+    calculate_min_extrema,
+    calculate_n_extrema,
+    fit_regressor,
+    get_callbacks,
+    get_optuna_study_model_parameters,
+    largest_divisor,
+    round_to_nearest_int,
+    zigzag,
+)
+
 debug = False
 
 TEST_SIZE = 0.1
@@ -51,7 +58,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.108"
+    version = "3.7.109"
 
     @cached_property
     def _optuna_config(self) -> dict[str, Any]:
@@ -1171,85 +1178,6 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             return False
 
 
-regressors = {"xgboost", "lightgbm"}
-
-
-def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]:
-    if regressor == "xgboost":
-        callbacks = [
-            optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
-        ]
-    elif regressor == "lightgbm":
-        callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")]
-    else:
-        raise ValueError(
-            f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
-        )
-    return callbacks
-
-
-def fit_regressor(
-    regressor: str,
-    X: pd.DataFrame,
-    y: pd.DataFrame,
-    train_weights: np.ndarray,
-    eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]],
-    eval_weights: Optional[list[np.ndarray]],
-    model_training_parameters: dict[str, Any],
-    init_model: Any = None,
-    callbacks: Optional[list[Callable]] = None,
-) -> Any:
-    if regressor == "xgboost":
-        from xgboost import XGBRegressor
-
-        model = XGBRegressor(
-            objective="reg:squarederror",
-            eval_metric="rmse",
-            callbacks=callbacks,
-            **model_training_parameters,
-        )
-        model.fit(
-            X=X,
-            y=y,
-            sample_weight=train_weights,
-            eval_set=eval_set,
-            sample_weight_eval_set=eval_weights,
-            xgb_model=init_model,
-        )
-    elif regressor == "lightgbm":
-        from lightgbm import LGBMRegressor
-
-        model = LGBMRegressor(objective="regression", **model_training_parameters)
-        model.fit(
-            X=X,
-            y=y,
-            sample_weight=train_weights,
-            eval_set=eval_set,
-            eval_sample_weight=eval_weights,
-            eval_metric="rmse",
-            init_model=init_model,
-            callbacks=callbacks,
-        )
-    else:
-        raise ValueError(
-            f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
-        )
-    return model
-
-
-@lru_cache(maxsize=128)
-def calculate_min_extrema(
-    size: int, fit_live_predictions_candles: int, min_extrema: int = 4
-) -> int:
-    return int(round(size / fit_live_predictions_candles) * min_extrema)
-
-
-def calculate_n_extrema(extrema: pd.Series) -> int:
-    return (
-        sp.signal.find_peaks(-extrema)[0].size + sp.signal.find_peaks(extrema)[0].size
-    )
-
-
 def train_objective(
     trial: optuna.trial.Trial,
     regressor: str,
@@ -1360,136 +1288,6 @@ def train_objective(
     )
 
 
-def get_optuna_study_model_parameters(
-    trial: optuna.trial.Trial,
-    regressor: str,
-    model_training_best_parameters: dict[str, Any],
-    expansion_ratio: float,
-) -> dict[str, Any]:
-    if regressor not in regressors:
-        raise ValueError(
-            f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
-        )
-    default_ranges = {
-        "n_estimators": (100, 2000),
-        "learning_rate": (1e-3, 0.5),
-        "min_child_weight": (1e-8, 100.0),
-        "subsample": (0.5, 1.0),
-        "colsample_bytree": (0.5, 1.0),
-        "reg_alpha": (1e-8, 100.0),
-        "reg_lambda": (1e-8, 100.0),
-        "max_depth": (3, 13),
-        "gamma": (1e-8, 10.0),
-        "num_leaves": (8, 256),
-        "min_split_gain": (1e-8, 10.0),
-        "min_child_samples": (10, 100),
-    }
-
-    log_scaled_params = {
-        "learning_rate",
-        "min_child_weight",
-        "reg_alpha",
-        "reg_lambda",
-        "gamma",
-        "min_split_gain",
-    }
-
-    ranges = copy.deepcopy(default_ranges)
-    if model_training_best_parameters:
-        for param, (default_min, default_max) in default_ranges.items():
-            center_value = model_training_best_parameters.get(param)
-
-            if (
-                center_value is None
-                or not isinstance(center_value, (int, float))
-                or not np.isfinite(center_value)
-            ):
-                continue
-
-            if param in log_scaled_params:
-                new_min = center_value / (1 + expansion_ratio)
-                new_max = center_value * (1 + expansion_ratio)
-            else:
-                margin = (default_max - default_min) * expansion_ratio / 2
-                new_min = center_value - margin
-                new_max = center_value + margin
-
-            param_min = max(default_min, new_min)
-            param_max = min(default_max, new_max)
-
-            if param_min < param_max:
-                ranges[param] = (param_min, param_max)
-
-    study_model_parameters = {
-        "n_estimators": trial.suggest_int(
-            "n_estimators",
-            int(ranges["n_estimators"][0]),
-            int(ranges["n_estimators"][1]),
-        ),
-        "learning_rate": trial.suggest_float(
-            "learning_rate",
-            ranges["learning_rate"][0],
-            ranges["learning_rate"][1],
-            log=True,
-        ),
-        "min_child_weight": trial.suggest_float(
-            "min_child_weight",
-            ranges["min_child_weight"][0],
-            ranges["min_child_weight"][1],
-            log=True,
-        ),
-        "subsample": trial.suggest_float(
-            "subsample", ranges["subsample"][0], ranges["subsample"][1]
-        ),
-        "colsample_bytree": trial.suggest_float(
-            "colsample_bytree",
-            ranges["colsample_bytree"][0],
-            ranges["colsample_bytree"][1],
-        ),
-        "reg_alpha": trial.suggest_float(
-            "reg_alpha", ranges["reg_alpha"][0], ranges["reg_alpha"][1], log=True
-        ),
-        "reg_lambda": trial.suggest_float(
-            "reg_lambda", ranges["reg_lambda"][0], ranges["reg_lambda"][1], log=True
-        ),
-    }
-    if regressor == "xgboost":
-        study_model_parameters.update(
-            {
-                "max_depth": trial.suggest_int(
-                    "max_depth",
-                    int(ranges["max_depth"][0]),
-                    int(ranges["max_depth"][1]),
-                ),
-                "gamma": trial.suggest_float(
-                    "gamma", ranges["gamma"][0], ranges["gamma"][1], log=True
-                ),
-            }
-        )
-    elif regressor == "lightgbm":
-        study_model_parameters.update(
-            {
-                "num_leaves": trial.suggest_int(
-                    "num_leaves",
-                    int(ranges["num_leaves"][0]),
-                    int(ranges["num_leaves"][1]),
-                ),
-                "min_split_gain": trial.suggest_float(
-                    "min_split_gain",
-                    ranges["min_split_gain"][0],
-                    ranges["min_split_gain"][1],
-                    log=True,
-                ),
-                "min_child_samples": trial.suggest_int(
-                    "min_child_samples",
-                    int(ranges["min_child_samples"][0]),
-                    int(ranges["min_child_samples"][1]),
-                ),
-            }
-        )
-    return study_model_parameters
-
-
 def hp_objective(
     trial: optuna.trial.Trial,
     regressor: str,
@@ -1525,270 +1323,6 @@ def hp_objective(
     )
 
 
-def calculate_quantile(values: np.ndarray, value: float) -> float:
-    if values.size == 0:
-        return np.nan
-
-    first_value = values[0]
-    if np.all(np.isclose(values, first_value)):
-        return (
-            0.5
-            if np.isclose(value, first_value)
-            else (0.0 if value < first_value else 1.0)
-        )
-
-    return np.sum(values <= value) / values.size
-
-
-class TrendDirection(IntEnum):
-    NEUTRAL = 0
-    UP = 1
-    DOWN = -1
-
-
-def zigzag(
-    df: pd.DataFrame,
-    natr_period: int = 14,
-    natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[TrendDirection], list[float]]:
-    n = len(df)
-    if df.empty or n < natr_period:
-        return [], [], [], []
-
-    natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
-
-    indices: list[int] = df.index.tolist()
-    thresholds: np.ndarray = natr_values * natr_ratio
-    closes = df.get("close").to_numpy()
-    highs = df.get("high").to_numpy()
-    lows = df.get("low").to_numpy()
-
-    state: TrendDirection = TrendDirection.NEUTRAL
-
-    pivots_indices: list[int] = []
-    pivots_values: list[float] = []
-    pivots_directions: list[TrendDirection] = []
-    pivots_thresholds: list[float] = []
-    last_pivot_pos: int = -1
-
-    candidate_pivot_pos: int = -1
-    candidate_pivot_value: float = np.nan
-
-    volatility_quantile_cache: dict[int, float] = {}
-
-    def calculate_volatility_quantile(pos: int) -> float:
-        if pos not in volatility_quantile_cache:
-            start_pos = max(0, pos + 1 - natr_period)
-            end_pos = min(pos + 1, n)
-            if start_pos >= end_pos:
-                volatility_quantile_cache[pos] = np.nan
-            else:
-                volatility_quantile_cache[pos] = calculate_quantile(
-                    natr_values[start_pos:end_pos], natr_values[pos]
-                )
-
-        return volatility_quantile_cache[pos]
-
-    def calculate_slopes_ok_threshold(
-        pos: int,
-        min_threshold: float = 0.75,
-        max_threshold: float = 0.95,
-    ) -> float:
-        volatility_quantile = calculate_volatility_quantile(pos)
-        if np.isnan(volatility_quantile):
-            return median([min_threshold, max_threshold])
-
-        return max_threshold - (max_threshold - min_threshold) * volatility_quantile
-
-    def update_candidate_pivot(pos: int, value: float):
-        nonlocal candidate_pivot_pos, candidate_pivot_value
-        if 0 <= pos < n:
-            candidate_pivot_pos = pos
-            candidate_pivot_value = value
-
-    def reset_candidate_pivot():
-        nonlocal candidate_pivot_pos, candidate_pivot_value
-        candidate_pivot_pos = -1
-        candidate_pivot_value = np.nan
-
-    def add_pivot(pos: int, value: float, direction: TrendDirection):
-        nonlocal last_pivot_pos
-        if pivots_indices and indices[pos] == pivots_indices[-1]:
-            return
-        pivots_indices.append(indices[pos])
-        pivots_values.append(value)
-        pivots_directions.append(direction)
-        pivots_thresholds.append(thresholds[pos])
-        last_pivot_pos = pos
-        reset_candidate_pivot()
-
-    slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {}
-
-    def get_slope_ok(
-        pos: int,
-        candidate_pivot_pos: int,
-        direction: TrendDirection,
-        min_slope: float,
-    ) -> bool:
-        cache_key = (
-            pos,
-            candidate_pivot_pos,
-            direction,
-            min_slope,
-        )
-
-        if cache_key in slope_ok_cache:
-            return slope_ok_cache[cache_key]
-
-        if pos <= candidate_pivot_pos:
-            slope_ok_cache[cache_key] = False
-            return slope_ok_cache[cache_key]
-
-        log_candidate_pivot_close = np.log(closes[candidate_pivot_pos])
-        log_current_close = np.log(closes[pos])
-
-        log_slope_close = (log_current_close - log_candidate_pivot_close) / (
-            pos - candidate_pivot_pos
-        )
-
-        if direction == TrendDirection.UP:
-            slope_ok_cache[cache_key] = log_slope_close > min_slope
-        elif direction == TrendDirection.DOWN:
-            slope_ok_cache[cache_key] = log_slope_close < -min_slope
-        else:
-            slope_ok_cache[cache_key] = False
-
-        return slope_ok_cache[cache_key]
-
-    def is_pivot_confirmed(
-        pos: int,
-        candidate_pivot_pos: int,
-        direction: TrendDirection,
-        min_slope: float = np.finfo(float).eps,
-        alpha: float = 0.05,
-    ) -> bool:
-        start_pos = min(candidate_pivot_pos + 1, n)
-        end_pos = min(pos + 1, n)
-        n_slopes = max(0, end_pos - start_pos)
-
-        if n_slopes < 1:
-            return False
-
-        slopes_ok: list[bool] = []
-        for i in range(start_pos, end_pos):
-            slopes_ok.append(
-                get_slope_ok(
-                    pos=i,
-                    candidate_pivot_pos=candidate_pivot_pos,
-                    direction=direction,
-                    min_slope=min_slope,
-                )
-            )
-
-        slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos)
-        n_slopes_ok = sum(slopes_ok)
-        binomtest = sp.stats.binomtest(
-            k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater"
-        )
-
-        return (
-            binomtest.pvalue <= alpha
-            and (n_slopes_ok / n_slopes) >= slopes_ok_threshold
-        )
-
-    start_pos = 0
-    initial_high_pos = start_pos
-    initial_low_pos = start_pos
-    initial_high = highs[initial_high_pos]
-    initial_low = lows[initial_low_pos]
-    for i in range(start_pos + 1, n):
-        current_high = highs[i]
-        current_low = lows[i]
-        if current_high > initial_high:
-            initial_high, initial_high_pos = current_high, i
-        if current_low < initial_low:
-            initial_low, initial_low_pos = current_low, i
-
-        initial_move_from_high = (initial_high - current_low) / initial_high
-        initial_move_from_low = (current_high - initial_low) / initial_low
-        is_initial_high_move_significant = (
-            initial_move_from_high >= thresholds[initial_high_pos]
-        )
-        is_initial_low_move_significant = (
-            initial_move_from_low >= thresholds[initial_low_pos]
-        )
-        if is_initial_high_move_significant and is_initial_low_move_significant:
-            if initial_move_from_high > initial_move_from_low:
-                add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
-                state = TrendDirection.DOWN
-                break
-            else:
-                add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
-                state = TrendDirection.UP
-                break
-        else:
-            if is_initial_high_move_significant:
-                add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
-                state = TrendDirection.DOWN
-                break
-            elif is_initial_low_move_significant:
-                add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
-                state = TrendDirection.UP
-                break
-    else:
-        return [], [], [], []
-
-    for i in range(last_pivot_pos + 1, n):
-        current_high = highs[i]
-        current_low = lows[i]
-
-        if state == TrendDirection.UP:
-            if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
-                update_candidate_pivot(i, current_high)
-            if (
-                candidate_pivot_value - current_low
-            ) / candidate_pivot_value >= thresholds[
-                candidate_pivot_pos
-            ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
-                add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
-                state = TrendDirection.DOWN
-
-        elif state == TrendDirection.DOWN:
-            if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
-                update_candidate_pivot(i, current_low)
-            if (
-                current_high - candidate_pivot_value
-            ) / candidate_pivot_value >= thresholds[
-                candidate_pivot_pos
-            ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
-                add_pivot(
-                    candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
-                )
-                state = TrendDirection.UP
-
-    return pivots_indices, pivots_values, pivots_directions, pivots_thresholds
-
-
-@lru_cache(maxsize=8)
-def largest_divisor(integer: int, step: int) -> Optional[int]:
-    if not isinstance(integer, int) or integer <= 0:
-        raise ValueError("integer must be a positive integer")
-    if not isinstance(step, int) or step <= 0:
-        raise ValueError("step must be a positive integer")
-
-    q_start = math.floor(0.5 * step) + 1
-    q_end = math.ceil(1.5 * step) - 1
-
-    if q_start > q_end:
-        return None
-
-    for q in range(q_start, q_end + 1):
-        if integer % q == 0:
-            return int(integer / q)
-
-    return None
-
-
 def label_objective(
     trial: optuna.trial.Trial,
     df: pd.DataFrame,
@@ -1835,34 +1369,3 @@ def label_objective(
     )
 
     return np.median(pivots_thresholds), len(pivots_values)
-
-
-def soft_extremum(series: pd.Series, alpha: float) -> float:
-    np_array = series.to_numpy()
-    if np_array.size == 0:
-        return np.nan
-    if np.isclose(alpha, 0):
-        return np.mean(np_array)
-    scaled_np_array = alpha * np_array
-    max_scaled_np_array = np.max(scaled_np_array)
-    if np.isinf(max_scaled_np_array):
-        return np_array[np.argmax(scaled_np_array)]
-    shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array)
-    numerator = np.sum(np_array * shifted_exponentials)
-    denominator = np.sum(shifted_exponentials)
-    if denominator == 0:
-        return np.max(np_array)
-    return numerator / denominator
-
-
-def round_to_nearest_int(value: float, step: int) -> int:
-    """
-    Round a value to the nearest multiple of a given step.
-    :param value: The value to round.
-    :param step: The step size to round to (must be non-zero).
-    :return: The rounded value.
-    :raises ValueError: If step is zero.
-    """
-    if not isinstance(step, int) or step <= 0:
-        raise ValueError("step must be a positive integer")
-    return int(round(value / step) * step)
index 7a40f2dad30d625b06891c81e239064081f9f690..7a6f472c230077e0aefb87cdeaa0c8b99c7dc94f 100644 (file)
@@ -1,19 +1,20 @@
+import datetime
 import json
 import logging
-from functools import lru_cache, reduce, cached_property
-import datetime
 import math
+from functools import cached_property, lru_cache, reduce
 from pathlib import Path
-import talib.abstract as ta
-from pandas import DataFrame, Series, isna
 from typing import Any, Callable, Literal, Optional
+
+import numpy as np
+import pandas_ta as pta
+import talib.abstract as ta
 from freqtrade.exchange import timeframe_to_minutes, timeframe_to_prev_date
-from freqtrade.strategy.interface import IStrategy
+from freqtrade.persistence import Trade
 from freqtrade.strategy import stoploss_from_absolute
+from freqtrade.strategy.interface import IStrategy
+from pandas import DataFrame, Series, isna
 from technical.pivots_points import pivots_points
-from freqtrade.persistence import Trade
-import numpy as np
-import pandas_ta as pta
 
 from Utils import (
     TrendDirection,
@@ -21,17 +22,17 @@ from Utils import (
     bottom_change_percent,
     calculate_n_extrema,
     calculate_quantile,
-    get_zl_ma_fn,
-    zero_phase,
-    zigzag,
     ewo,
+    get_distance,
+    get_gaussian_std,
+    get_odd_window,
+    get_zl_ma_fn,
     non_zero_diff,
     price_retracement_percent,
-    vwapb,
     top_change_percent,
-    get_distance,
-    get_odd_window,
-    get_gaussian_std,
+    vwapb,
+    zero_phase,
+    zigzag,
     zlema,
 )
 
@@ -64,7 +65,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.138"
+        return "3.3.139"
 
     timeframe = "5m"
 
index 9f5a64bc04e32bf6fcdf7310d173c64559491cac..6eb70c27213f0469ed4dc982e4846ff3f7e8a505 100644 (file)
@@ -1,13 +1,19 @@
+import copy
+import math
 from enum import IntEnum
 from functools import lru_cache
 from statistics import median
+from typing import Any, Callable, Literal, Optional, TypeVar
+
 import numpy as np
+import optuna
 import pandas as pd
 import scipy as sp
 import talib.abstract as ta
-from typing import Callable, Literal, TypeVar
+
 from technical import qtpylib
 
+
 T = TypeVar("T", pd.Series, float)
 
 
@@ -71,10 +77,15 @@ def zero_phase(
     return pd.Series(filtered_values, index=series.index)
 
 
-def calculate_n_extrema(extrema: pd.Series) -> int:
-    return (
-        sp.signal.find_peaks(-extrema)[0].size + sp.signal.find_peaks(extrema)[0].size
-    )
+@lru_cache(maxsize=128)
+def calculate_min_extrema(
+    size: int, fit_live_predictions_candles: int, min_extrema: int = 4
+) -> int:
+    return int(round(size / fit_live_predictions_candles) * min_extrema)
+
+
+def calculate_n_extrema(series: pd.Series) -> int:
+    return sp.signal.find_peaks(-series)[0].size + sp.signal.find_peaks(series)[0].size
 
 
 def top_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
@@ -615,3 +626,250 @@ def zigzag(
                 state = TrendDirection.UP
 
     return pivots_indices, pivots_values, pivots_directions, pivots_thresholds
+
+
+regressors = {"xgboost", "lightgbm"}
+
+
+def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]:
+    if regressor == "xgboost":
+        callbacks = [
+            optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
+        ]
+    elif regressor == "lightgbm":
+        callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")]
+    else:
+        raise ValueError(
+            f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+        )
+    return callbacks
+
+
+def fit_regressor(
+    regressor: str,
+    X: pd.DataFrame,
+    y: pd.DataFrame,
+    train_weights: np.ndarray,
+    eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]],
+    eval_weights: Optional[list[np.ndarray]],
+    model_training_parameters: dict[str, Any],
+    init_model: Any = None,
+    callbacks: Optional[list[Callable]] = None,
+) -> Any:
+    if regressor == "xgboost":
+        from xgboost import XGBRegressor
+
+        model = XGBRegressor(
+            objective="reg:squarederror",
+            eval_metric="rmse",
+            callbacks=callbacks,
+            **model_training_parameters,
+        )
+        model.fit(
+            X=X,
+            y=y,
+            sample_weight=train_weights,
+            eval_set=eval_set,
+            sample_weight_eval_set=eval_weights,
+            xgb_model=init_model,
+        )
+    elif regressor == "lightgbm":
+        from lightgbm import LGBMRegressor
+
+        model = LGBMRegressor(objective="regression", **model_training_parameters)
+        model.fit(
+            X=X,
+            y=y,
+            sample_weight=train_weights,
+            eval_set=eval_set,
+            eval_sample_weight=eval_weights,
+            eval_metric="rmse",
+            init_model=init_model,
+            callbacks=callbacks,
+        )
+    else:
+        raise ValueError(
+            f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+        )
+    return model
+
+
+def get_optuna_study_model_parameters(
+    trial: optuna.trial.Trial,
+    regressor: str,
+    model_training_best_parameters: dict[str, Any],
+    expansion_ratio: float,
+) -> dict[str, Any]:
+    if regressor not in regressors:
+        raise ValueError(
+            f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+        )
+    default_ranges = {
+        "n_estimators": (100, 2000),
+        "learning_rate": (1e-3, 0.5),
+        "min_child_weight": (1e-8, 100.0),
+        "subsample": (0.5, 1.0),
+        "colsample_bytree": (0.5, 1.0),
+        "reg_alpha": (1e-8, 100.0),
+        "reg_lambda": (1e-8, 100.0),
+        "max_depth": (3, 13),
+        "gamma": (1e-8, 10.0),
+        "num_leaves": (8, 256),
+        "min_split_gain": (1e-8, 10.0),
+        "min_child_samples": (10, 100),
+    }
+
+    log_scaled_params = {
+        "learning_rate",
+        "min_child_weight",
+        "reg_alpha",
+        "reg_lambda",
+        "gamma",
+        "min_split_gain",
+    }
+
+    ranges = copy.deepcopy(default_ranges)
+    if model_training_best_parameters:
+        for param, (default_min, default_max) in default_ranges.items():
+            center_value = model_training_best_parameters.get(param)
+
+            if (
+                center_value is None
+                or not isinstance(center_value, (int, float))
+                or not np.isfinite(center_value)
+            ):
+                continue
+
+            if param in log_scaled_params:
+                new_min = center_value / (1 + expansion_ratio)
+                new_max = center_value * (1 + expansion_ratio)
+            else:
+                margin = (default_max - default_min) * expansion_ratio / 2
+                new_min = center_value - margin
+                new_max = center_value + margin
+
+            param_min = max(default_min, new_min)
+            param_max = min(default_max, new_max)
+
+            if param_min < param_max:
+                ranges[param] = (param_min, param_max)
+
+    study_model_parameters = {
+        "n_estimators": trial.suggest_int(
+            "n_estimators",
+            int(ranges["n_estimators"][0]),
+            int(ranges["n_estimators"][1]),
+        ),
+        "learning_rate": trial.suggest_float(
+            "learning_rate",
+            ranges["learning_rate"][0],
+            ranges["learning_rate"][1],
+            log=True,
+        ),
+        "min_child_weight": trial.suggest_float(
+            "min_child_weight",
+            ranges["min_child_weight"][0],
+            ranges["min_child_weight"][1],
+            log=True,
+        ),
+        "subsample": trial.suggest_float(
+            "subsample", ranges["subsample"][0], ranges["subsample"][1]
+        ),
+        "colsample_bytree": trial.suggest_float(
+            "colsample_bytree",
+            ranges["colsample_bytree"][0],
+            ranges["colsample_bytree"][1],
+        ),
+        "reg_alpha": trial.suggest_float(
+            "reg_alpha", ranges["reg_alpha"][0], ranges["reg_alpha"][1], log=True
+        ),
+        "reg_lambda": trial.suggest_float(
+            "reg_lambda", ranges["reg_lambda"][0], ranges["reg_lambda"][1], log=True
+        ),
+    }
+    if regressor == "xgboost":
+        study_model_parameters.update(
+            {
+                "max_depth": trial.suggest_int(
+                    "max_depth",
+                    int(ranges["max_depth"][0]),
+                    int(ranges["max_depth"][1]),
+                ),
+                "gamma": trial.suggest_float(
+                    "gamma", ranges["gamma"][0], ranges["gamma"][1], log=True
+                ),
+            }
+        )
+    elif regressor == "lightgbm":
+        study_model_parameters.update(
+            {
+                "num_leaves": trial.suggest_int(
+                    "num_leaves",
+                    int(ranges["num_leaves"][0]),
+                    int(ranges["num_leaves"][1]),
+                ),
+                "min_split_gain": trial.suggest_float(
+                    "min_split_gain",
+                    ranges["min_split_gain"][0],
+                    ranges["min_split_gain"][1],
+                    log=True,
+                ),
+                "min_child_samples": trial.suggest_int(
+                    "min_child_samples",
+                    int(ranges["min_child_samples"][0]),
+                    int(ranges["min_child_samples"][1]),
+                ),
+            }
+        )
+    return study_model_parameters
+
+
+@lru_cache(maxsize=8)
+def largest_divisor(integer: int, step: int) -> Optional[int]:
+    if not isinstance(integer, int) or integer <= 0:
+        raise ValueError("integer must be a positive integer")
+    if not isinstance(step, int) or step <= 0:
+        raise ValueError("step must be a positive integer")
+
+    q_start = math.floor(0.5 * step) + 1
+    q_end = math.ceil(1.5 * step) - 1
+
+    if q_start > q_end:
+        return None
+
+    for q in range(q_start, q_end + 1):
+        if integer % q == 0:
+            return int(integer / q)
+
+    return None
+
+
+def soft_extremum(series: pd.Series, alpha: float) -> float:
+    np_array = series.to_numpy()
+    if np_array.size == 0:
+        return np.nan
+    if np.isclose(alpha, 0):
+        return np.mean(np_array)
+    scaled_np_array = alpha * np_array
+    max_scaled_np_array = np.max(scaled_np_array)
+    if np.isinf(max_scaled_np_array):
+        return np_array[np.argmax(scaled_np_array)]
+    shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array)
+    numerator = np.sum(np_array * shifted_exponentials)
+    denominator = np.sum(shifted_exponentials)
+    if denominator == 0:
+        return np.max(np_array)
+    return numerator / denominator
+
+
+def round_to_nearest_int(value: float, step: int) -> int:
+    """
+    Round a value to the nearest multiple of a given step.
+    :param value: The value to round.
+    :param step: The step size to round to (must be non-zero).
+    :return: The rounded value.
+    :raises ValueError: If step is zero.
+    """
+    if not isinstance(step, int) or step <= 0:
+        raise ValueError("step must be a positive integer")
+    return int(round(value / step) * step)