import copy
-from enum import IntEnum
-import logging
import json
-import math
+import logging
import random
-from statistics import median
import time
+import warnings
+from functools import cached_property
+from pathlib import Path
+from typing import Any, Callable, Optional
+
import numpy as np
+import optuna
import pandas as pd
import scipy as sp
-import optuna
-import sklearn
import skimage
-import warnings
-import talib.abstract as ta
-
-from functools import cached_property, lru_cache
-from typing import Any, Callable, Optional
-from pathlib import Path
+import sklearn
from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
+from Utils import (
+ calculate_min_extrema,
+ calculate_n_extrema,
+ fit_regressor,
+ get_callbacks,
+ get_optuna_study_model_parameters,
+ largest_divisor,
+ round_to_nearest_int,
+ zigzag,
+)
+
debug = False
TEST_SIZE = 0.1
https://github.com/sponsors/robcaulk
"""
- version = "3.7.108"
+ version = "3.7.109"
@cached_property
def _optuna_config(self) -> dict[str, Any]:
return False
-regressors = {"xgboost", "lightgbm"}
-
-
-def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]:
- if regressor == "xgboost":
- callbacks = [
- optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
- ]
- elif regressor == "lightgbm":
- callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")]
- else:
- raise ValueError(
- f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
- )
- return callbacks
-
-
-def fit_regressor(
- regressor: str,
- X: pd.DataFrame,
- y: pd.DataFrame,
- train_weights: np.ndarray,
- eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]],
- eval_weights: Optional[list[np.ndarray]],
- model_training_parameters: dict[str, Any],
- init_model: Any = None,
- callbacks: Optional[list[Callable]] = None,
-) -> Any:
- if regressor == "xgboost":
- from xgboost import XGBRegressor
-
- model = XGBRegressor(
- objective="reg:squarederror",
- eval_metric="rmse",
- callbacks=callbacks,
- **model_training_parameters,
- )
- model.fit(
- X=X,
- y=y,
- sample_weight=train_weights,
- eval_set=eval_set,
- sample_weight_eval_set=eval_weights,
- xgb_model=init_model,
- )
- elif regressor == "lightgbm":
- from lightgbm import LGBMRegressor
-
- model = LGBMRegressor(objective="regression", **model_training_parameters)
- model.fit(
- X=X,
- y=y,
- sample_weight=train_weights,
- eval_set=eval_set,
- eval_sample_weight=eval_weights,
- eval_metric="rmse",
- init_model=init_model,
- callbacks=callbacks,
- )
- else:
- raise ValueError(
- f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
- )
- return model
-
-
-@lru_cache(maxsize=128)
-def calculate_min_extrema(
- size: int, fit_live_predictions_candles: int, min_extrema: int = 4
-) -> int:
- return int(round(size / fit_live_predictions_candles) * min_extrema)
-
-
-def calculate_n_extrema(extrema: pd.Series) -> int:
- return (
- sp.signal.find_peaks(-extrema)[0].size + sp.signal.find_peaks(extrema)[0].size
- )
-
-
def train_objective(
trial: optuna.trial.Trial,
regressor: str,
)
-def get_optuna_study_model_parameters(
- trial: optuna.trial.Trial,
- regressor: str,
- model_training_best_parameters: dict[str, Any],
- expansion_ratio: float,
-) -> dict[str, Any]:
- if regressor not in regressors:
- raise ValueError(
- f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
- )
- default_ranges = {
- "n_estimators": (100, 2000),
- "learning_rate": (1e-3, 0.5),
- "min_child_weight": (1e-8, 100.0),
- "subsample": (0.5, 1.0),
- "colsample_bytree": (0.5, 1.0),
- "reg_alpha": (1e-8, 100.0),
- "reg_lambda": (1e-8, 100.0),
- "max_depth": (3, 13),
- "gamma": (1e-8, 10.0),
- "num_leaves": (8, 256),
- "min_split_gain": (1e-8, 10.0),
- "min_child_samples": (10, 100),
- }
-
- log_scaled_params = {
- "learning_rate",
- "min_child_weight",
- "reg_alpha",
- "reg_lambda",
- "gamma",
- "min_split_gain",
- }
-
- ranges = copy.deepcopy(default_ranges)
- if model_training_best_parameters:
- for param, (default_min, default_max) in default_ranges.items():
- center_value = model_training_best_parameters.get(param)
-
- if (
- center_value is None
- or not isinstance(center_value, (int, float))
- or not np.isfinite(center_value)
- ):
- continue
-
- if param in log_scaled_params:
- new_min = center_value / (1 + expansion_ratio)
- new_max = center_value * (1 + expansion_ratio)
- else:
- margin = (default_max - default_min) * expansion_ratio / 2
- new_min = center_value - margin
- new_max = center_value + margin
-
- param_min = max(default_min, new_min)
- param_max = min(default_max, new_max)
-
- if param_min < param_max:
- ranges[param] = (param_min, param_max)
-
- study_model_parameters = {
- "n_estimators": trial.suggest_int(
- "n_estimators",
- int(ranges["n_estimators"][0]),
- int(ranges["n_estimators"][1]),
- ),
- "learning_rate": trial.suggest_float(
- "learning_rate",
- ranges["learning_rate"][0],
- ranges["learning_rate"][1],
- log=True,
- ),
- "min_child_weight": trial.suggest_float(
- "min_child_weight",
- ranges["min_child_weight"][0],
- ranges["min_child_weight"][1],
- log=True,
- ),
- "subsample": trial.suggest_float(
- "subsample", ranges["subsample"][0], ranges["subsample"][1]
- ),
- "colsample_bytree": trial.suggest_float(
- "colsample_bytree",
- ranges["colsample_bytree"][0],
- ranges["colsample_bytree"][1],
- ),
- "reg_alpha": trial.suggest_float(
- "reg_alpha", ranges["reg_alpha"][0], ranges["reg_alpha"][1], log=True
- ),
- "reg_lambda": trial.suggest_float(
- "reg_lambda", ranges["reg_lambda"][0], ranges["reg_lambda"][1], log=True
- ),
- }
- if regressor == "xgboost":
- study_model_parameters.update(
- {
- "max_depth": trial.suggest_int(
- "max_depth",
- int(ranges["max_depth"][0]),
- int(ranges["max_depth"][1]),
- ),
- "gamma": trial.suggest_float(
- "gamma", ranges["gamma"][0], ranges["gamma"][1], log=True
- ),
- }
- )
- elif regressor == "lightgbm":
- study_model_parameters.update(
- {
- "num_leaves": trial.suggest_int(
- "num_leaves",
- int(ranges["num_leaves"][0]),
- int(ranges["num_leaves"][1]),
- ),
- "min_split_gain": trial.suggest_float(
- "min_split_gain",
- ranges["min_split_gain"][0],
- ranges["min_split_gain"][1],
- log=True,
- ),
- "min_child_samples": trial.suggest_int(
- "min_child_samples",
- int(ranges["min_child_samples"][0]),
- int(ranges["min_child_samples"][1]),
- ),
- }
- )
- return study_model_parameters
-
-
def hp_objective(
trial: optuna.trial.Trial,
regressor: str,
)
-def calculate_quantile(values: np.ndarray, value: float) -> float:
- if values.size == 0:
- return np.nan
-
- first_value = values[0]
- if np.all(np.isclose(values, first_value)):
- return (
- 0.5
- if np.isclose(value, first_value)
- else (0.0 if value < first_value else 1.0)
- )
-
- return np.sum(values <= value) / values.size
-
-
-class TrendDirection(IntEnum):
- NEUTRAL = 0
- UP = 1
- DOWN = -1
-
-
-def zigzag(
- df: pd.DataFrame,
- natr_period: int = 14,
- natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[TrendDirection], list[float]]:
- n = len(df)
- if df.empty or n < natr_period:
- return [], [], [], []
-
- natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
-
- indices: list[int] = df.index.tolist()
- thresholds: np.ndarray = natr_values * natr_ratio
- closes = df.get("close").to_numpy()
- highs = df.get("high").to_numpy()
- lows = df.get("low").to_numpy()
-
- state: TrendDirection = TrendDirection.NEUTRAL
-
- pivots_indices: list[int] = []
- pivots_values: list[float] = []
- pivots_directions: list[TrendDirection] = []
- pivots_thresholds: list[float] = []
- last_pivot_pos: int = -1
-
- candidate_pivot_pos: int = -1
- candidate_pivot_value: float = np.nan
-
- volatility_quantile_cache: dict[int, float] = {}
-
- def calculate_volatility_quantile(pos: int) -> float:
- if pos not in volatility_quantile_cache:
- start_pos = max(0, pos + 1 - natr_period)
- end_pos = min(pos + 1, n)
- if start_pos >= end_pos:
- volatility_quantile_cache[pos] = np.nan
- else:
- volatility_quantile_cache[pos] = calculate_quantile(
- natr_values[start_pos:end_pos], natr_values[pos]
- )
-
- return volatility_quantile_cache[pos]
-
- def calculate_slopes_ok_threshold(
- pos: int,
- min_threshold: float = 0.75,
- max_threshold: float = 0.95,
- ) -> float:
- volatility_quantile = calculate_volatility_quantile(pos)
- if np.isnan(volatility_quantile):
- return median([min_threshold, max_threshold])
-
- return max_threshold - (max_threshold - min_threshold) * volatility_quantile
-
- def update_candidate_pivot(pos: int, value: float):
- nonlocal candidate_pivot_pos, candidate_pivot_value
- if 0 <= pos < n:
- candidate_pivot_pos = pos
- candidate_pivot_value = value
-
- def reset_candidate_pivot():
- nonlocal candidate_pivot_pos, candidate_pivot_value
- candidate_pivot_pos = -1
- candidate_pivot_value = np.nan
-
- def add_pivot(pos: int, value: float, direction: TrendDirection):
- nonlocal last_pivot_pos
- if pivots_indices and indices[pos] == pivots_indices[-1]:
- return
- pivots_indices.append(indices[pos])
- pivots_values.append(value)
- pivots_directions.append(direction)
- pivots_thresholds.append(thresholds[pos])
- last_pivot_pos = pos
- reset_candidate_pivot()
-
- slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {}
-
- def get_slope_ok(
- pos: int,
- candidate_pivot_pos: int,
- direction: TrendDirection,
- min_slope: float,
- ) -> bool:
- cache_key = (
- pos,
- candidate_pivot_pos,
- direction,
- min_slope,
- )
-
- if cache_key in slope_ok_cache:
- return slope_ok_cache[cache_key]
-
- if pos <= candidate_pivot_pos:
- slope_ok_cache[cache_key] = False
- return slope_ok_cache[cache_key]
-
- log_candidate_pivot_close = np.log(closes[candidate_pivot_pos])
- log_current_close = np.log(closes[pos])
-
- log_slope_close = (log_current_close - log_candidate_pivot_close) / (
- pos - candidate_pivot_pos
- )
-
- if direction == TrendDirection.UP:
- slope_ok_cache[cache_key] = log_slope_close > min_slope
- elif direction == TrendDirection.DOWN:
- slope_ok_cache[cache_key] = log_slope_close < -min_slope
- else:
- slope_ok_cache[cache_key] = False
-
- return slope_ok_cache[cache_key]
-
- def is_pivot_confirmed(
- pos: int,
- candidate_pivot_pos: int,
- direction: TrendDirection,
- min_slope: float = np.finfo(float).eps,
- alpha: float = 0.05,
- ) -> bool:
- start_pos = min(candidate_pivot_pos + 1, n)
- end_pos = min(pos + 1, n)
- n_slopes = max(0, end_pos - start_pos)
-
- if n_slopes < 1:
- return False
-
- slopes_ok: list[bool] = []
- for i in range(start_pos, end_pos):
- slopes_ok.append(
- get_slope_ok(
- pos=i,
- candidate_pivot_pos=candidate_pivot_pos,
- direction=direction,
- min_slope=min_slope,
- )
- )
-
- slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos)
- n_slopes_ok = sum(slopes_ok)
- binomtest = sp.stats.binomtest(
- k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater"
- )
-
- return (
- binomtest.pvalue <= alpha
- and (n_slopes_ok / n_slopes) >= slopes_ok_threshold
- )
-
- start_pos = 0
- initial_high_pos = start_pos
- initial_low_pos = start_pos
- initial_high = highs[initial_high_pos]
- initial_low = lows[initial_low_pos]
- for i in range(start_pos + 1, n):
- current_high = highs[i]
- current_low = lows[i]
- if current_high > initial_high:
- initial_high, initial_high_pos = current_high, i
- if current_low < initial_low:
- initial_low, initial_low_pos = current_low, i
-
- initial_move_from_high = (initial_high - current_low) / initial_high
- initial_move_from_low = (current_high - initial_low) / initial_low
- is_initial_high_move_significant = (
- initial_move_from_high >= thresholds[initial_high_pos]
- )
- is_initial_low_move_significant = (
- initial_move_from_low >= thresholds[initial_low_pos]
- )
- if is_initial_high_move_significant and is_initial_low_move_significant:
- if initial_move_from_high > initial_move_from_low:
- add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
- state = TrendDirection.DOWN
- break
- else:
- add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
- state = TrendDirection.UP
- break
- else:
- if is_initial_high_move_significant:
- add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
- state = TrendDirection.DOWN
- break
- elif is_initial_low_move_significant:
- add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
- state = TrendDirection.UP
- break
- else:
- return [], [], [], []
-
- for i in range(last_pivot_pos + 1, n):
- current_high = highs[i]
- current_low = lows[i]
-
- if state == TrendDirection.UP:
- if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
- update_candidate_pivot(i, current_high)
- if (
- candidate_pivot_value - current_low
- ) / candidate_pivot_value >= thresholds[
- candidate_pivot_pos
- ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
- add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
- state = TrendDirection.DOWN
-
- elif state == TrendDirection.DOWN:
- if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
- update_candidate_pivot(i, current_low)
- if (
- current_high - candidate_pivot_value
- ) / candidate_pivot_value >= thresholds[
- candidate_pivot_pos
- ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
- add_pivot(
- candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
- )
- state = TrendDirection.UP
-
- return pivots_indices, pivots_values, pivots_directions, pivots_thresholds
-
-
-@lru_cache(maxsize=8)
-def largest_divisor(integer: int, step: int) -> Optional[int]:
- if not isinstance(integer, int) or integer <= 0:
- raise ValueError("integer must be a positive integer")
- if not isinstance(step, int) or step <= 0:
- raise ValueError("step must be a positive integer")
-
- q_start = math.floor(0.5 * step) + 1
- q_end = math.ceil(1.5 * step) - 1
-
- if q_start > q_end:
- return None
-
- for q in range(q_start, q_end + 1):
- if integer % q == 0:
- return int(integer / q)
-
- return None
-
-
def label_objective(
trial: optuna.trial.Trial,
df: pd.DataFrame,
)
return np.median(pivots_thresholds), len(pivots_values)
-
-
-def soft_extremum(series: pd.Series, alpha: float) -> float:
- np_array = series.to_numpy()
- if np_array.size == 0:
- return np.nan
- if np.isclose(alpha, 0):
- return np.mean(np_array)
- scaled_np_array = alpha * np_array
- max_scaled_np_array = np.max(scaled_np_array)
- if np.isinf(max_scaled_np_array):
- return np_array[np.argmax(scaled_np_array)]
- shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array)
- numerator = np.sum(np_array * shifted_exponentials)
- denominator = np.sum(shifted_exponentials)
- if denominator == 0:
- return np.max(np_array)
- return numerator / denominator
-
-
-def round_to_nearest_int(value: float, step: int) -> int:
- """
- Round a value to the nearest multiple of a given step.
- :param value: The value to round.
- :param step: The step size to round to (must be non-zero).
- :return: The rounded value.
- :raises ValueError: If step is zero.
- """
- if not isinstance(step, int) or step <= 0:
- raise ValueError("step must be a positive integer")
- return int(round(value / step) * step)
+import copy
+import math
from enum import IntEnum
from functools import lru_cache
from statistics import median
+from typing import Any, Callable, Literal, Optional, TypeVar
+
import numpy as np
+import optuna
import pandas as pd
import scipy as sp
import talib.abstract as ta
-from typing import Callable, Literal, TypeVar
+
from technical import qtpylib
+
T = TypeVar("T", pd.Series, float)
return pd.Series(filtered_values, index=series.index)
-def calculate_n_extrema(extrema: pd.Series) -> int:
- return (
- sp.signal.find_peaks(-extrema)[0].size + sp.signal.find_peaks(extrema)[0].size
- )
+@lru_cache(maxsize=128)
+def calculate_min_extrema(
+ size: int, fit_live_predictions_candles: int, min_extrema: int = 4
+) -> int:
+ return int(round(size / fit_live_predictions_candles) * min_extrema)
+
+
+def calculate_n_extrema(series: pd.Series) -> int:
+ return sp.signal.find_peaks(-series)[0].size + sp.signal.find_peaks(series)[0].size
def top_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
state = TrendDirection.UP
return pivots_indices, pivots_values, pivots_directions, pivots_thresholds
+
+
+regressors = {"xgboost", "lightgbm"}
+
+
+def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]:
+ if regressor == "xgboost":
+ callbacks = [
+ optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
+ ]
+ elif regressor == "lightgbm":
+ callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")]
+ else:
+ raise ValueError(
+ f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+ )
+ return callbacks
+
+
+def fit_regressor(
+ regressor: str,
+ X: pd.DataFrame,
+ y: pd.DataFrame,
+ train_weights: np.ndarray,
+ eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]],
+ eval_weights: Optional[list[np.ndarray]],
+ model_training_parameters: dict[str, Any],
+ init_model: Any = None,
+ callbacks: Optional[list[Callable]] = None,
+) -> Any:
+ if regressor == "xgboost":
+ from xgboost import XGBRegressor
+
+ model = XGBRegressor(
+ objective="reg:squarederror",
+ eval_metric="rmse",
+ callbacks=callbacks,
+ **model_training_parameters,
+ )
+ model.fit(
+ X=X,
+ y=y,
+ sample_weight=train_weights,
+ eval_set=eval_set,
+ sample_weight_eval_set=eval_weights,
+ xgb_model=init_model,
+ )
+ elif regressor == "lightgbm":
+ from lightgbm import LGBMRegressor
+
+ model = LGBMRegressor(objective="regression", **model_training_parameters)
+ model.fit(
+ X=X,
+ y=y,
+ sample_weight=train_weights,
+ eval_set=eval_set,
+ eval_sample_weight=eval_weights,
+ eval_metric="rmse",
+ init_model=init_model,
+ callbacks=callbacks,
+ )
+ else:
+ raise ValueError(
+ f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+ )
+ return model
+
+
+def get_optuna_study_model_parameters(
+ trial: optuna.trial.Trial,
+ regressor: str,
+ model_training_best_parameters: dict[str, Any],
+ expansion_ratio: float,
+) -> dict[str, Any]:
+ if regressor not in regressors:
+ raise ValueError(
+ f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+ )
+ default_ranges = {
+ "n_estimators": (100, 2000),
+ "learning_rate": (1e-3, 0.5),
+ "min_child_weight": (1e-8, 100.0),
+ "subsample": (0.5, 1.0),
+ "colsample_bytree": (0.5, 1.0),
+ "reg_alpha": (1e-8, 100.0),
+ "reg_lambda": (1e-8, 100.0),
+ "max_depth": (3, 13),
+ "gamma": (1e-8, 10.0),
+ "num_leaves": (8, 256),
+ "min_split_gain": (1e-8, 10.0),
+ "min_child_samples": (10, 100),
+ }
+
+ log_scaled_params = {
+ "learning_rate",
+ "min_child_weight",
+ "reg_alpha",
+ "reg_lambda",
+ "gamma",
+ "min_split_gain",
+ }
+
+ ranges = copy.deepcopy(default_ranges)
+ if model_training_best_parameters:
+ for param, (default_min, default_max) in default_ranges.items():
+ center_value = model_training_best_parameters.get(param)
+
+ if (
+ center_value is None
+ or not isinstance(center_value, (int, float))
+ or not np.isfinite(center_value)
+ ):
+ continue
+
+ if param in log_scaled_params:
+ new_min = center_value / (1 + expansion_ratio)
+ new_max = center_value * (1 + expansion_ratio)
+ else:
+ margin = (default_max - default_min) * expansion_ratio / 2
+ new_min = center_value - margin
+ new_max = center_value + margin
+
+ param_min = max(default_min, new_min)
+ param_max = min(default_max, new_max)
+
+ if param_min < param_max:
+ ranges[param] = (param_min, param_max)
+
+ study_model_parameters = {
+ "n_estimators": trial.suggest_int(
+ "n_estimators",
+ int(ranges["n_estimators"][0]),
+ int(ranges["n_estimators"][1]),
+ ),
+ "learning_rate": trial.suggest_float(
+ "learning_rate",
+ ranges["learning_rate"][0],
+ ranges["learning_rate"][1],
+ log=True,
+ ),
+ "min_child_weight": trial.suggest_float(
+ "min_child_weight",
+ ranges["min_child_weight"][0],
+ ranges["min_child_weight"][1],
+ log=True,
+ ),
+ "subsample": trial.suggest_float(
+ "subsample", ranges["subsample"][0], ranges["subsample"][1]
+ ),
+ "colsample_bytree": trial.suggest_float(
+ "colsample_bytree",
+ ranges["colsample_bytree"][0],
+ ranges["colsample_bytree"][1],
+ ),
+ "reg_alpha": trial.suggest_float(
+ "reg_alpha", ranges["reg_alpha"][0], ranges["reg_alpha"][1], log=True
+ ),
+ "reg_lambda": trial.suggest_float(
+ "reg_lambda", ranges["reg_lambda"][0], ranges["reg_lambda"][1], log=True
+ ),
+ }
+ if regressor == "xgboost":
+ study_model_parameters.update(
+ {
+ "max_depth": trial.suggest_int(
+ "max_depth",
+ int(ranges["max_depth"][0]),
+ int(ranges["max_depth"][1]),
+ ),
+ "gamma": trial.suggest_float(
+ "gamma", ranges["gamma"][0], ranges["gamma"][1], log=True
+ ),
+ }
+ )
+ elif regressor == "lightgbm":
+ study_model_parameters.update(
+ {
+ "num_leaves": trial.suggest_int(
+ "num_leaves",
+ int(ranges["num_leaves"][0]),
+ int(ranges["num_leaves"][1]),
+ ),
+ "min_split_gain": trial.suggest_float(
+ "min_split_gain",
+ ranges["min_split_gain"][0],
+ ranges["min_split_gain"][1],
+ log=True,
+ ),
+ "min_child_samples": trial.suggest_int(
+ "min_child_samples",
+ int(ranges["min_child_samples"][0]),
+ int(ranges["min_child_samples"][1]),
+ ),
+ }
+ )
+ return study_model_parameters
+
+
+@lru_cache(maxsize=8)
+def largest_divisor(integer: int, step: int) -> Optional[int]:
+ if not isinstance(integer, int) or integer <= 0:
+ raise ValueError("integer must be a positive integer")
+ if not isinstance(step, int) or step <= 0:
+ raise ValueError("step must be a positive integer")
+
+ q_start = math.floor(0.5 * step) + 1
+ q_end = math.ceil(1.5 * step) - 1
+
+ if q_start > q_end:
+ return None
+
+ for q in range(q_start, q_end + 1):
+ if integer % q == 0:
+ return int(integer / q)
+
+ return None
+
+
+def soft_extremum(series: pd.Series, alpha: float) -> float:
+ np_array = series.to_numpy()
+ if np_array.size == 0:
+ return np.nan
+ if np.isclose(alpha, 0):
+ return np.mean(np_array)
+ scaled_np_array = alpha * np_array
+ max_scaled_np_array = np.max(scaled_np_array)
+ if np.isinf(max_scaled_np_array):
+ return np_array[np.argmax(scaled_np_array)]
+ shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array)
+ numerator = np.sum(np_array * shifted_exponentials)
+ denominator = np.sum(shifted_exponentials)
+ if denominator == 0:
+ return np.max(np_array)
+ return numerator / denominator
+
+
+def round_to_nearest_int(value: float, step: int) -> int:
+ """
+ Round a value to the nearest multiple of a given step.
+ :param value: The value to round.
+ :param step: The step size to round to (must be non-zero).
+ :return: The rounded value.
+ :raises ValueError: If step is zero.
+ """
+ if not isinstance(step, int) or step <= 0:
+ raise ValueError("step must be a positive integer")
+ return int(round(value / step) * step)