From a01d94c496b3be7ae6c1838280f981b426203f75 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 6 Aug 2025 19:37:53 +0200 Subject: [PATCH] refactor(qav3): share common code between freqai model and strategy MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ReforceXY/user_data/freqaimodels/ReforceXY.py | 9 +- .../freqaimodels/QuickAdapterRegressorV3.py | 537 +----------------- .../user_data/strategies/QuickAdapterV3.py | 33 +- quickadapter/user_data/strategies/Utils.py | 268 ++++++++- 4 files changed, 304 insertions(+), 543 deletions(-) diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index a13e08d..0c315f0 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -1,13 +1,13 @@ import copy -from functools import lru_cache import gc import json import logging -import warnings import time +import warnings from enum import Enum +from functools import lru_cache from pathlib import Path -from typing import Any, Callable, Dict, Optional, Type, Tuple +from typing import Any, Callable, Dict, Optional, Tuple, Type import matplotlib import matplotlib.pyplot as plt @@ -20,9 +20,9 @@ from optuna import Trial, TrialPruned, create_study from optuna.exceptions import ExperimentalWarning from optuna.pruners import HyperbandPruner from optuna.samplers import TPESampler -from optuna.study import Study, StudyDirection from optuna.storages import BaseStorage, JournalStorage, RDBStorage from optuna.storages.journal import JournalFileBackend +from optuna.study import Study, StudyDirection from pandas import DataFrame, concat, merge from sb3_contrib.common.maskable.callbacks import MaskableEvalCallback from stable_baselines3.common.callbacks import ( @@ -37,7 +37,6 @@ from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack, VecMoni from freqtrade.freqai.data_kitchen import FreqaiDataKitchen from freqtrade.freqai.RL.Base5ActionRLEnv import Actions, Base5ActionRLEnv, Positions -from freqtrade.freqai.RL.BaseEnvironment import BaseEnvironment from freqtrade.freqai.RL.BaseReinforcementLearningModel import ( BaseReinforcementLearningModel, ) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 7e177fb..934765e 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -1,26 +1,33 @@ import copy -from enum import IntEnum -import logging import json -import math +import logging import random -from statistics import median import time +import warnings +from functools import cached_property +from pathlib import Path +from typing import Any, Callable, Optional + import numpy as np +import optuna import pandas as pd import scipy as sp -import optuna -import sklearn import skimage -import warnings -import talib.abstract as ta - -from functools import cached_property, lru_cache -from typing import Any, Callable, Optional -from pathlib import Path +import sklearn from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel from freqtrade.freqai.data_kitchen import FreqaiDataKitchen +from Utils import ( + calculate_min_extrema, + calculate_n_extrema, + fit_regressor, + get_callbacks, + get_optuna_study_model_parameters, + largest_divisor, + round_to_nearest_int, + zigzag, +) + debug = False TEST_SIZE = 0.1 @@ -51,7 +58,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.108" + version = "3.7.109" @cached_property def _optuna_config(self) -> dict[str, Any]: @@ -1171,85 +1178,6 @@ class QuickAdapterRegressorV3(BaseRegressionModel): return False -regressors = {"xgboost", "lightgbm"} - - -def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]: - if regressor == "xgboost": - callbacks = [ - optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse") - ] - elif regressor == "lightgbm": - callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")] - else: - raise ValueError( - f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})" - ) - return callbacks - - -def fit_regressor( - regressor: str, - X: pd.DataFrame, - y: pd.DataFrame, - train_weights: np.ndarray, - eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]], - eval_weights: Optional[list[np.ndarray]], - model_training_parameters: dict[str, Any], - init_model: Any = None, - callbacks: Optional[list[Callable]] = None, -) -> Any: - if regressor == "xgboost": - from xgboost import XGBRegressor - - model = XGBRegressor( - objective="reg:squarederror", - eval_metric="rmse", - callbacks=callbacks, - **model_training_parameters, - ) - model.fit( - X=X, - y=y, - sample_weight=train_weights, - eval_set=eval_set, - sample_weight_eval_set=eval_weights, - xgb_model=init_model, - ) - elif regressor == "lightgbm": - from lightgbm import LGBMRegressor - - model = LGBMRegressor(objective="regression", **model_training_parameters) - model.fit( - X=X, - y=y, - sample_weight=train_weights, - eval_set=eval_set, - eval_sample_weight=eval_weights, - eval_metric="rmse", - init_model=init_model, - callbacks=callbacks, - ) - else: - raise ValueError( - f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})" - ) - return model - - -@lru_cache(maxsize=128) -def calculate_min_extrema( - size: int, fit_live_predictions_candles: int, min_extrema: int = 4 -) -> int: - return int(round(size / fit_live_predictions_candles) * min_extrema) - - -def calculate_n_extrema(extrema: pd.Series) -> int: - return ( - sp.signal.find_peaks(-extrema)[0].size + sp.signal.find_peaks(extrema)[0].size - ) - - def train_objective( trial: optuna.trial.Trial, regressor: str, @@ -1360,136 +1288,6 @@ def train_objective( ) -def get_optuna_study_model_parameters( - trial: optuna.trial.Trial, - regressor: str, - model_training_best_parameters: dict[str, Any], - expansion_ratio: float, -) -> dict[str, Any]: - if regressor not in regressors: - raise ValueError( - f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})" - ) - default_ranges = { - "n_estimators": (100, 2000), - "learning_rate": (1e-3, 0.5), - "min_child_weight": (1e-8, 100.0), - "subsample": (0.5, 1.0), - "colsample_bytree": (0.5, 1.0), - "reg_alpha": (1e-8, 100.0), - "reg_lambda": (1e-8, 100.0), - "max_depth": (3, 13), - "gamma": (1e-8, 10.0), - "num_leaves": (8, 256), - "min_split_gain": (1e-8, 10.0), - "min_child_samples": (10, 100), - } - - log_scaled_params = { - "learning_rate", - "min_child_weight", - "reg_alpha", - "reg_lambda", - "gamma", - "min_split_gain", - } - - ranges = copy.deepcopy(default_ranges) - if model_training_best_parameters: - for param, (default_min, default_max) in default_ranges.items(): - center_value = model_training_best_parameters.get(param) - - if ( - center_value is None - or not isinstance(center_value, (int, float)) - or not np.isfinite(center_value) - ): - continue - - if param in log_scaled_params: - new_min = center_value / (1 + expansion_ratio) - new_max = center_value * (1 + expansion_ratio) - else: - margin = (default_max - default_min) * expansion_ratio / 2 - new_min = center_value - margin - new_max = center_value + margin - - param_min = max(default_min, new_min) - param_max = min(default_max, new_max) - - if param_min < param_max: - ranges[param] = (param_min, param_max) - - study_model_parameters = { - "n_estimators": trial.suggest_int( - "n_estimators", - int(ranges["n_estimators"][0]), - int(ranges["n_estimators"][1]), - ), - "learning_rate": trial.suggest_float( - "learning_rate", - ranges["learning_rate"][0], - ranges["learning_rate"][1], - log=True, - ), - "min_child_weight": trial.suggest_float( - "min_child_weight", - ranges["min_child_weight"][0], - ranges["min_child_weight"][1], - log=True, - ), - "subsample": trial.suggest_float( - "subsample", ranges["subsample"][0], ranges["subsample"][1] - ), - "colsample_bytree": trial.suggest_float( - "colsample_bytree", - ranges["colsample_bytree"][0], - ranges["colsample_bytree"][1], - ), - "reg_alpha": trial.suggest_float( - "reg_alpha", ranges["reg_alpha"][0], ranges["reg_alpha"][1], log=True - ), - "reg_lambda": trial.suggest_float( - "reg_lambda", ranges["reg_lambda"][0], ranges["reg_lambda"][1], log=True - ), - } - if regressor == "xgboost": - study_model_parameters.update( - { - "max_depth": trial.suggest_int( - "max_depth", - int(ranges["max_depth"][0]), - int(ranges["max_depth"][1]), - ), - "gamma": trial.suggest_float( - "gamma", ranges["gamma"][0], ranges["gamma"][1], log=True - ), - } - ) - elif regressor == "lightgbm": - study_model_parameters.update( - { - "num_leaves": trial.suggest_int( - "num_leaves", - int(ranges["num_leaves"][0]), - int(ranges["num_leaves"][1]), - ), - "min_split_gain": trial.suggest_float( - "min_split_gain", - ranges["min_split_gain"][0], - ranges["min_split_gain"][1], - log=True, - ), - "min_child_samples": trial.suggest_int( - "min_child_samples", - int(ranges["min_child_samples"][0]), - int(ranges["min_child_samples"][1]), - ), - } - ) - return study_model_parameters - - def hp_objective( trial: optuna.trial.Trial, regressor: str, @@ -1525,270 +1323,6 @@ def hp_objective( ) -def calculate_quantile(values: np.ndarray, value: float) -> float: - if values.size == 0: - return np.nan - - first_value = values[0] - if np.all(np.isclose(values, first_value)): - return ( - 0.5 - if np.isclose(value, first_value) - else (0.0 if value < first_value else 1.0) - ) - - return np.sum(values <= value) / values.size - - -class TrendDirection(IntEnum): - NEUTRAL = 0 - UP = 1 - DOWN = -1 - - -def zigzag( - df: pd.DataFrame, - natr_period: int = 14, - natr_ratio: float = 6.0, -) -> tuple[list[int], list[float], list[TrendDirection], list[float]]: - n = len(df) - if df.empty or n < natr_period: - return [], [], [], [] - - natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy() - - indices: list[int] = df.index.tolist() - thresholds: np.ndarray = natr_values * natr_ratio - closes = df.get("close").to_numpy() - highs = df.get("high").to_numpy() - lows = df.get("low").to_numpy() - - state: TrendDirection = TrendDirection.NEUTRAL - - pivots_indices: list[int] = [] - pivots_values: list[float] = [] - pivots_directions: list[TrendDirection] = [] - pivots_thresholds: list[float] = [] - last_pivot_pos: int = -1 - - candidate_pivot_pos: int = -1 - candidate_pivot_value: float = np.nan - - volatility_quantile_cache: dict[int, float] = {} - - def calculate_volatility_quantile(pos: int) -> float: - if pos not in volatility_quantile_cache: - start_pos = max(0, pos + 1 - natr_period) - end_pos = min(pos + 1, n) - if start_pos >= end_pos: - volatility_quantile_cache[pos] = np.nan - else: - volatility_quantile_cache[pos] = calculate_quantile( - natr_values[start_pos:end_pos], natr_values[pos] - ) - - return volatility_quantile_cache[pos] - - def calculate_slopes_ok_threshold( - pos: int, - min_threshold: float = 0.75, - max_threshold: float = 0.95, - ) -> float: - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return median([min_threshold, max_threshold]) - - return max_threshold - (max_threshold - min_threshold) * volatility_quantile - - def update_candidate_pivot(pos: int, value: float): - nonlocal candidate_pivot_pos, candidate_pivot_value - if 0 <= pos < n: - candidate_pivot_pos = pos - candidate_pivot_value = value - - def reset_candidate_pivot(): - nonlocal candidate_pivot_pos, candidate_pivot_value - candidate_pivot_pos = -1 - candidate_pivot_value = np.nan - - def add_pivot(pos: int, value: float, direction: TrendDirection): - nonlocal last_pivot_pos - if pivots_indices and indices[pos] == pivots_indices[-1]: - return - pivots_indices.append(indices[pos]) - pivots_values.append(value) - pivots_directions.append(direction) - pivots_thresholds.append(thresholds[pos]) - last_pivot_pos = pos - reset_candidate_pivot() - - slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {} - - def get_slope_ok( - pos: int, - candidate_pivot_pos: int, - direction: TrendDirection, - min_slope: float, - ) -> bool: - cache_key = ( - pos, - candidate_pivot_pos, - direction, - min_slope, - ) - - if cache_key in slope_ok_cache: - return slope_ok_cache[cache_key] - - if pos <= candidate_pivot_pos: - slope_ok_cache[cache_key] = False - return slope_ok_cache[cache_key] - - log_candidate_pivot_close = np.log(closes[candidate_pivot_pos]) - log_current_close = np.log(closes[pos]) - - log_slope_close = (log_current_close - log_candidate_pivot_close) / ( - pos - candidate_pivot_pos - ) - - if direction == TrendDirection.UP: - slope_ok_cache[cache_key] = log_slope_close > min_slope - elif direction == TrendDirection.DOWN: - slope_ok_cache[cache_key] = log_slope_close < -min_slope - else: - slope_ok_cache[cache_key] = False - - return slope_ok_cache[cache_key] - - def is_pivot_confirmed( - pos: int, - candidate_pivot_pos: int, - direction: TrendDirection, - min_slope: float = np.finfo(float).eps, - alpha: float = 0.05, - ) -> bool: - start_pos = min(candidate_pivot_pos + 1, n) - end_pos = min(pos + 1, n) - n_slopes = max(0, end_pos - start_pos) - - if n_slopes < 1: - return False - - slopes_ok: list[bool] = [] - for i in range(start_pos, end_pos): - slopes_ok.append( - get_slope_ok( - pos=i, - candidate_pivot_pos=candidate_pivot_pos, - direction=direction, - min_slope=min_slope, - ) - ) - - slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos) - n_slopes_ok = sum(slopes_ok) - binomtest = sp.stats.binomtest( - k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater" - ) - - return ( - binomtest.pvalue <= alpha - and (n_slopes_ok / n_slopes) >= slopes_ok_threshold - ) - - start_pos = 0 - initial_high_pos = start_pos - initial_low_pos = start_pos - initial_high = highs[initial_high_pos] - initial_low = lows[initial_low_pos] - for i in range(start_pos + 1, n): - current_high = highs[i] - current_low = lows[i] - if current_high > initial_high: - initial_high, initial_high_pos = current_high, i - if current_low < initial_low: - initial_low, initial_low_pos = current_low, i - - initial_move_from_high = (initial_high - current_low) / initial_high - initial_move_from_low = (current_high - initial_low) / initial_low - is_initial_high_move_significant = ( - initial_move_from_high >= thresholds[initial_high_pos] - ) - is_initial_low_move_significant = ( - initial_move_from_low >= thresholds[initial_low_pos] - ) - if is_initial_high_move_significant and is_initial_low_move_significant: - if initial_move_from_high > initial_move_from_low: - add_pivot(initial_high_pos, initial_high, TrendDirection.UP) - state = TrendDirection.DOWN - break - else: - add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) - state = TrendDirection.UP - break - else: - if is_initial_high_move_significant: - add_pivot(initial_high_pos, initial_high, TrendDirection.UP) - state = TrendDirection.DOWN - break - elif is_initial_low_move_significant: - add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) - state = TrendDirection.UP - break - else: - return [], [], [], [] - - for i in range(last_pivot_pos + 1, n): - current_high = highs[i] - current_low = lows[i] - - if state == TrendDirection.UP: - if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value: - update_candidate_pivot(i, current_high) - if ( - candidate_pivot_value - current_low - ) / candidate_pivot_value >= thresholds[ - candidate_pivot_pos - ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN): - add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP) - state = TrendDirection.DOWN - - elif state == TrendDirection.DOWN: - if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value: - update_candidate_pivot(i, current_low) - if ( - current_high - candidate_pivot_value - ) / candidate_pivot_value >= thresholds[ - candidate_pivot_pos - ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP): - add_pivot( - candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN - ) - state = TrendDirection.UP - - return pivots_indices, pivots_values, pivots_directions, pivots_thresholds - - -@lru_cache(maxsize=8) -def largest_divisor(integer: int, step: int) -> Optional[int]: - if not isinstance(integer, int) or integer <= 0: - raise ValueError("integer must be a positive integer") - if not isinstance(step, int) or step <= 0: - raise ValueError("step must be a positive integer") - - q_start = math.floor(0.5 * step) + 1 - q_end = math.ceil(1.5 * step) - 1 - - if q_start > q_end: - return None - - for q in range(q_start, q_end + 1): - if integer % q == 0: - return int(integer / q) - - return None - - def label_objective( trial: optuna.trial.Trial, df: pd.DataFrame, @@ -1835,34 +1369,3 @@ def label_objective( ) return np.median(pivots_thresholds), len(pivots_values) - - -def soft_extremum(series: pd.Series, alpha: float) -> float: - np_array = series.to_numpy() - if np_array.size == 0: - return np.nan - if np.isclose(alpha, 0): - return np.mean(np_array) - scaled_np_array = alpha * np_array - max_scaled_np_array = np.max(scaled_np_array) - if np.isinf(max_scaled_np_array): - return np_array[np.argmax(scaled_np_array)] - shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array) - numerator = np.sum(np_array * shifted_exponentials) - denominator = np.sum(shifted_exponentials) - if denominator == 0: - return np.max(np_array) - return numerator / denominator - - -def round_to_nearest_int(value: float, step: int) -> int: - """ - Round a value to the nearest multiple of a given step. - :param value: The value to round. - :param step: The step size to round to (must be non-zero). - :return: The rounded value. - :raises ValueError: If step is zero. - """ - if not isinstance(step, int) or step <= 0: - raise ValueError("step must be a positive integer") - return int(round(value / step) * step) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 7a40f2d..7a6f472 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -1,19 +1,20 @@ +import datetime import json import logging -from functools import lru_cache, reduce, cached_property -import datetime import math +from functools import cached_property, lru_cache, reduce from pathlib import Path -import talib.abstract as ta -from pandas import DataFrame, Series, isna from typing import Any, Callable, Literal, Optional + +import numpy as np +import pandas_ta as pta +import talib.abstract as ta from freqtrade.exchange import timeframe_to_minutes, timeframe_to_prev_date -from freqtrade.strategy.interface import IStrategy +from freqtrade.persistence import Trade from freqtrade.strategy import stoploss_from_absolute +from freqtrade.strategy.interface import IStrategy +from pandas import DataFrame, Series, isna from technical.pivots_points import pivots_points -from freqtrade.persistence import Trade -import numpy as np -import pandas_ta as pta from Utils import ( TrendDirection, @@ -21,17 +22,17 @@ from Utils import ( bottom_change_percent, calculate_n_extrema, calculate_quantile, - get_zl_ma_fn, - zero_phase, - zigzag, ewo, + get_distance, + get_gaussian_std, + get_odd_window, + get_zl_ma_fn, non_zero_diff, price_retracement_percent, - vwapb, top_change_percent, - get_distance, - get_odd_window, - get_gaussian_std, + vwapb, + zero_phase, + zigzag, zlema, ) @@ -64,7 +65,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.138" + return "3.3.139" timeframe = "5m" diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 9f5a64b..6eb70c2 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -1,13 +1,19 @@ +import copy +import math from enum import IntEnum from functools import lru_cache from statistics import median +from typing import Any, Callable, Literal, Optional, TypeVar + import numpy as np +import optuna import pandas as pd import scipy as sp import talib.abstract as ta -from typing import Callable, Literal, TypeVar + from technical import qtpylib + T = TypeVar("T", pd.Series, float) @@ -71,10 +77,15 @@ def zero_phase( return pd.Series(filtered_values, index=series.index) -def calculate_n_extrema(extrema: pd.Series) -> int: - return ( - sp.signal.find_peaks(-extrema)[0].size + sp.signal.find_peaks(extrema)[0].size - ) +@lru_cache(maxsize=128) +def calculate_min_extrema( + size: int, fit_live_predictions_candles: int, min_extrema: int = 4 +) -> int: + return int(round(size / fit_live_predictions_candles) * min_extrema) + + +def calculate_n_extrema(series: pd.Series) -> int: + return sp.signal.find_peaks(-series)[0].size + sp.signal.find_peaks(series)[0].size def top_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series: @@ -615,3 +626,250 @@ def zigzag( state = TrendDirection.UP return pivots_indices, pivots_values, pivots_directions, pivots_thresholds + + +regressors = {"xgboost", "lightgbm"} + + +def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]: + if regressor == "xgboost": + callbacks = [ + optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse") + ] + elif regressor == "lightgbm": + callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")] + else: + raise ValueError( + f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})" + ) + return callbacks + + +def fit_regressor( + regressor: str, + X: pd.DataFrame, + y: pd.DataFrame, + train_weights: np.ndarray, + eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]], + eval_weights: Optional[list[np.ndarray]], + model_training_parameters: dict[str, Any], + init_model: Any = None, + callbacks: Optional[list[Callable]] = None, +) -> Any: + if regressor == "xgboost": + from xgboost import XGBRegressor + + model = XGBRegressor( + objective="reg:squarederror", + eval_metric="rmse", + callbacks=callbacks, + **model_training_parameters, + ) + model.fit( + X=X, + y=y, + sample_weight=train_weights, + eval_set=eval_set, + sample_weight_eval_set=eval_weights, + xgb_model=init_model, + ) + elif regressor == "lightgbm": + from lightgbm import LGBMRegressor + + model = LGBMRegressor(objective="regression", **model_training_parameters) + model.fit( + X=X, + y=y, + sample_weight=train_weights, + eval_set=eval_set, + eval_sample_weight=eval_weights, + eval_metric="rmse", + init_model=init_model, + callbacks=callbacks, + ) + else: + raise ValueError( + f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})" + ) + return model + + +def get_optuna_study_model_parameters( + trial: optuna.trial.Trial, + regressor: str, + model_training_best_parameters: dict[str, Any], + expansion_ratio: float, +) -> dict[str, Any]: + if regressor not in regressors: + raise ValueError( + f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})" + ) + default_ranges = { + "n_estimators": (100, 2000), + "learning_rate": (1e-3, 0.5), + "min_child_weight": (1e-8, 100.0), + "subsample": (0.5, 1.0), + "colsample_bytree": (0.5, 1.0), + "reg_alpha": (1e-8, 100.0), + "reg_lambda": (1e-8, 100.0), + "max_depth": (3, 13), + "gamma": (1e-8, 10.0), + "num_leaves": (8, 256), + "min_split_gain": (1e-8, 10.0), + "min_child_samples": (10, 100), + } + + log_scaled_params = { + "learning_rate", + "min_child_weight", + "reg_alpha", + "reg_lambda", + "gamma", + "min_split_gain", + } + + ranges = copy.deepcopy(default_ranges) + if model_training_best_parameters: + for param, (default_min, default_max) in default_ranges.items(): + center_value = model_training_best_parameters.get(param) + + if ( + center_value is None + or not isinstance(center_value, (int, float)) + or not np.isfinite(center_value) + ): + continue + + if param in log_scaled_params: + new_min = center_value / (1 + expansion_ratio) + new_max = center_value * (1 + expansion_ratio) + else: + margin = (default_max - default_min) * expansion_ratio / 2 + new_min = center_value - margin + new_max = center_value + margin + + param_min = max(default_min, new_min) + param_max = min(default_max, new_max) + + if param_min < param_max: + ranges[param] = (param_min, param_max) + + study_model_parameters = { + "n_estimators": trial.suggest_int( + "n_estimators", + int(ranges["n_estimators"][0]), + int(ranges["n_estimators"][1]), + ), + "learning_rate": trial.suggest_float( + "learning_rate", + ranges["learning_rate"][0], + ranges["learning_rate"][1], + log=True, + ), + "min_child_weight": trial.suggest_float( + "min_child_weight", + ranges["min_child_weight"][0], + ranges["min_child_weight"][1], + log=True, + ), + "subsample": trial.suggest_float( + "subsample", ranges["subsample"][0], ranges["subsample"][1] + ), + "colsample_bytree": trial.suggest_float( + "colsample_bytree", + ranges["colsample_bytree"][0], + ranges["colsample_bytree"][1], + ), + "reg_alpha": trial.suggest_float( + "reg_alpha", ranges["reg_alpha"][0], ranges["reg_alpha"][1], log=True + ), + "reg_lambda": trial.suggest_float( + "reg_lambda", ranges["reg_lambda"][0], ranges["reg_lambda"][1], log=True + ), + } + if regressor == "xgboost": + study_model_parameters.update( + { + "max_depth": trial.suggest_int( + "max_depth", + int(ranges["max_depth"][0]), + int(ranges["max_depth"][1]), + ), + "gamma": trial.suggest_float( + "gamma", ranges["gamma"][0], ranges["gamma"][1], log=True + ), + } + ) + elif regressor == "lightgbm": + study_model_parameters.update( + { + "num_leaves": trial.suggest_int( + "num_leaves", + int(ranges["num_leaves"][0]), + int(ranges["num_leaves"][1]), + ), + "min_split_gain": trial.suggest_float( + "min_split_gain", + ranges["min_split_gain"][0], + ranges["min_split_gain"][1], + log=True, + ), + "min_child_samples": trial.suggest_int( + "min_child_samples", + int(ranges["min_child_samples"][0]), + int(ranges["min_child_samples"][1]), + ), + } + ) + return study_model_parameters + + +@lru_cache(maxsize=8) +def largest_divisor(integer: int, step: int) -> Optional[int]: + if not isinstance(integer, int) or integer <= 0: + raise ValueError("integer must be a positive integer") + if not isinstance(step, int) or step <= 0: + raise ValueError("step must be a positive integer") + + q_start = math.floor(0.5 * step) + 1 + q_end = math.ceil(1.5 * step) - 1 + + if q_start > q_end: + return None + + for q in range(q_start, q_end + 1): + if integer % q == 0: + return int(integer / q) + + return None + + +def soft_extremum(series: pd.Series, alpha: float) -> float: + np_array = series.to_numpy() + if np_array.size == 0: + return np.nan + if np.isclose(alpha, 0): + return np.mean(np_array) + scaled_np_array = alpha * np_array + max_scaled_np_array = np.max(scaled_np_array) + if np.isinf(max_scaled_np_array): + return np_array[np.argmax(scaled_np_array)] + shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array) + numerator = np.sum(np_array * shifted_exponentials) + denominator = np.sum(shifted_exponentials) + if denominator == 0: + return np.max(np_array) + return numerator / denominator + + +def round_to_nearest_int(value: float, step: int) -> int: + """ + Round a value to the nearest multiple of a given step. + :param value: The value to round. + :param step: The step size to round to (must be non-zero). + :return: The rounded value. + :raises ValueError: If step is zero. + """ + if not isinstance(step, int) or step <= 0: + raise ValueError("step must be a positive integer") + return int(round(value / step) * step) -- 2.43.0