ensure_datetime_series,
make_test_set_and_weights,
fit_regressor,
+ finite_sample,
format_dict,
format_number,
get_causal_mode,
optuna_load_best_params,
optuna_save_best_params,
sanitize_and_renormalize,
+ safe_distribution_fit,
soft_extremum,
zigzag,
)
version = "3.12.0"
_TEST_SIZE: Final[float] = 0.1
-
+ # Substituted whenever the Weibull DI cutoff (``weibull_min.ppf``) is
+ # non-finite (cold start or degenerate fit). Preserves the prior
+ # pre-warm-up heuristic for the outlier-quantile cutoff scale.
+ _DI_CUTOFF_DEFAULT: Final[float] = 2.0
_SKLEARN_TRAIN_TEST_SPLIT_KEYS: Final[frozenset[str]] = frozenset(
{"test_size", "train_size", "random_state", "shuffle", "stratify"}
)
if not warmed_up:
min_pred, max_pred = -2.0, 2.0
f = [0.0, 0.0, 0.0]
- cutoff = 2.0
+ cutoff = QuickAdapterRegressorV3._DI_CUTOFF_DEFAULT
else:
min_pred, max_pred = self.min_max_pred(
label_col,
pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]
).get("label_period_candles"), # "label"
)
- f = sp.stats.weibull_min.fit(
- pd.to_numeric(di_values, errors="coerce").dropna(), floc=0
+ di_sample = finite_sample(
+ []
+ if di_values is None
+ else pd.to_numeric(di_values, errors="coerce"),
+ positive_only=True,
+ )
+ f = safe_distribution_fit(
+ di_sample,
+ sp.stats.weibull_min.fit,
+ # Intentionally non-ppf-able; the ``weibull_min.ppf``
+ # downstream returns NaN on degenerate scale and the
+ # ``np.isfinite(cutoff)`` guard substitutes
+ # ``_DI_CUTOFF_DEFAULT``.
+ fallback=(0.0, 0.0, 0.0),
+ context=f"di_values_weibull_fit:{pair}",
+ logger=logger,
+ min_count=2,
+ require_variance=True,
+ floc=0,
)
outlier_quantile = col_prediction_config.get(
"outlier_quantile",
DEFAULTS_LABEL_PREDICTION["outlier_quantile"],
)
cutoff = sp.stats.weibull_min.ppf(outlier_quantile, *f)
+ if not np.isfinite(cutoff):
+ logger.warning(
+ "[%s] DI_values Weibull cutoff is invalid "
+ "(params=%r, quantile=%r); using fallback %r",
+ pair,
+ f,
+ outlier_quantile,
+ QuickAdapterRegressorV3._DI_CUTOFF_DEFAULT,
+ )
+ cutoff = QuickAdapterRegressorV3._DI_CUTOFF_DEFAULT
dk.data["extra_returns_per_train"][f"{label_col}_minima_threshold"] = (
min_pred
)
if not warmed_up:
f = [0.0, 0.0]
else:
- f = sp.stats.norm.fit(pred_label)
+ sample = finite_sample(pred_label)
+ if sample.finite_count == 0:
+ fallback = (0.0, 0.0)
+ else:
+ sample_mean = float(np.mean(sample.values))
+ sample_std = float(np.std(sample.values, ddof=0))
+ fallback = (
+ sample_mean if np.isfinite(sample_mean) else 0.0,
+ sample_std if np.isfinite(sample_std) else 0.0,
+ )
+ f = safe_distribution_fit(
+ sample,
+ sp.stats.norm.fit,
+ fallback=fallback,
+ context=f"label_norm_fit:{pair}:{label_col}",
+ logger=logger,
+ min_count=2,
+ require_variance=True,
+ )
dk.data["labels_mean"][label_col], dk.data["labels_std"][label_col] = (
f[0],
f[1],
df,
natr_period=label_period_candles,
natr_multiplier=label_natr_multiplier,
+ logger=logger,
)
median_amplitude = np.nanmedian(np.asarray(pivots_amplitudes, dtype=float))
non_zero_diff,
optuna_load_best_params,
price_retracement_percent,
+ safe_divide,
smooth,
top_log_return,
validate_range,
length=period,
)
# TODO [BREAKING]: Rename %-tcp-period -> %-top_log_return-period
- dataframe["%-tcp-period"] = top_log_return(dataframe, period=period)
+ dataframe["%-tcp-period"] = top_log_return(
+ dataframe, period=period, logger=logger
+ )
# TODO [BREAKING]: Rename %-bcp-period -> %-bottom_log_return-period
- dataframe["%-bcp-period"] = bottom_log_return(dataframe, period=period)
- dataframe["%-prp-period"] = price_retracement_percent(dataframe, period=period)
+ dataframe["%-bcp-period"] = bottom_log_return(
+ dataframe, period=period, logger=logger
+ )
+ dataframe["%-prp-period"] = price_retracement_percent(
+ dataframe, period=period, logger=logger
+ )
dataframe["%-cti-period"] = pta.cti(closes, length=period)
dataframe["%-chop-period"] = pta.chop(
highs,
volumes = dataframe.get("volume")
# TODO [BREAKING]: Rename %-close_pct_change -> %-close_log_return
- dataframe["%-close_pct_change"] = np.log(closes).diff()
+ close_values = closes.to_numpy(dtype=float)
+ invalid_close_count = int(
+ np.count_nonzero(~np.isfinite(close_values) | (close_values <= 0.0))
+ )
+ if invalid_close_count:
+ logger.debug(
+ "feature_engineering_expand_basic: %d close values are non-finite or non-positive; close log return is NaN at those positions",
+ invalid_close_count,
+ )
+ with np.errstate(divide="ignore", invalid="ignore"):
+ dataframe["%-close_pct_change"] = Series(
+ np.where(np.isfinite(close_values) & (close_values > 0.0), np.log(close_values), np.nan),
+ index=dataframe.index,
+ ).diff()
dataframe["%-raw_volume"] = volumes
dataframe["%-obv"] = ta.OBV(dataframe)
label_period_candles = self.get_label_period_candles(str(metadata.get("pair")))
mamode="ema",
zero_lag=True,
normalize=True,
+ logger=logger,
)
dataframe["%-diff_to_psar"] = closes - ta.SAR(
dataframe, acceleration=0.02, maximum=0.2
dataframe["kc_middleband"] = kc["KCBe_14_2.0"]
dataframe["kc_upperband"] = kc["KCUe_14_2.0"]
dataframe["%-kc_width"] = (
- dataframe["kc_upperband"] - dataframe["kc_lowerband"]
- ) / dataframe["kc_middleband"]
+ safe_divide(
+ dataframe["kc_upperband"] - dataframe["kc_lowerband"],
+ dataframe["kc_middleband"],
+ context="feature_engineering_expand_basic:kc_width",
+ logger=logger,
+ )
+ )
(
dataframe["bb_upperband"],
dataframe["bb_middleband"],
nbdevdn=2.2,
)
dataframe["%-bb_width"] = (
- dataframe["bb_upperband"] - dataframe["bb_lowerband"]
- ) / dataframe["bb_middleband"]
+ safe_divide(
+ dataframe["bb_upperband"] - dataframe["bb_lowerband"],
+ dataframe["bb_middleband"],
+ context="feature_engineering_expand_basic:bb_width",
+ logger=logger,
+ )
+ )
dataframe["%-ibs"] = (closes - lows) / non_zero_diff(highs, lows)
dataframe["jaw"], dataframe["teeth"], dataframe["lips"] = alligator(
dataframe, pricemode="median", zero_lag=True
dataframe["vwap_upperband"],
) = vwapb(dataframe, 20, 1.0)
dataframe["%-vwap_width"] = (
- dataframe["vwap_upperband"] - dataframe["vwap_lowerband"]
- ) / dataframe["vwap_middleband"]
+ safe_divide(
+ dataframe["vwap_upperband"] - dataframe["vwap_lowerband"],
+ dataframe["vwap_middleband"],
+ context="feature_engineering_expand_basic:vwap_width",
+ logger=logger,
+ )
+ )
dataframe["%-dist_to_vwap_upperband"] = get_distance(
closes, dataframe["vwap_upperband"]
)
return nan_average(
np.array([entry_natr, current_natr, median_natr]),
weights=np.array([entry_weight, current_weight, median_weight]),
+ logger=logger,
)
def get_trade_quantile_interpolation_natr(
Final,
Literal,
TypeVar,
- Union,
)
import numpy as np
T = TypeVar("T", pd.Series, float)
+@dataclass(frozen=True, slots=True)
+class FiniteSample:
+ """Filtered finite-only sample produced by :func:`finite_sample`.
+
+ ``values`` holds the subset of the input that survives the finite (and
+ optionally positive) mask. ``total_count``, ``finite_count`` and
+ ``dropped_count`` describe the input partition; the invariant
+ ``dropped_count == total_count - finite_count`` always holds. Construct
+ via :func:`finite_sample`; instances bypassing the factory do NOT
+ enforce the finite-only invariant on ``values``.
+ """
+
+ values: NDArray[np.floating]
+ total_count: int
+ finite_count: int
+ dropped_count: int
+
+
+def finite_sample(
+ values: Any,
+ *,
+ positive_only: bool = False,
+) -> FiniteSample:
+ """Return a :class:`FiniteSample` from ``values``.
+
+ Flattens ``values`` to 1-d, coerces to ``float64``, strips non-finite
+ entries. With ``positive_only=True`` also strips entries ``<= 0.0``
+ (strict; signed zero is rejected).
+ """
+ arr = np.asarray(values, dtype=float).reshape(-1)
+ mask = np.isfinite(arr)
+ if positive_only:
+ mask &= arr > 0.0
+ sample = arr[mask]
+ return FiniteSample(
+ values=sample,
+ total_count=int(arr.size),
+ finite_count=int(sample.size),
+ dropped_count=int(arr.size - sample.size),
+ )
+
+
+def safe_distribution_fit(
+ sample: FiniteSample,
+ fit_fn: Callable[..., Any],
+ *,
+ fallback: Sequence[float],
+ context: str,
+ logger: Logger | None = None,
+ min_count: int = 2,
+ require_variance: bool = True,
+ **fit_kwargs: Any,
+) -> tuple[float, ...]:
+ """Fit a scipy distribution with finite/variance/error guards.
+
+ Caller is responsible for constructing ``sample`` via
+ :func:`finite_sample` (with ``positive_only=True`` for strictly
+ positive distributions like ``weibull_min``). The ``fallback`` length
+ must match the parameter count returned by ``fit_fn`` (e.g. 3 for
+ ``weibull_min`` with ``floc=0``, 2 for ``norm``); a length mismatch
+ is treated as a fit failure and ``fallback`` is returned.
+ """
+ fallback_tuple = tuple(float(v) for v in fallback)
+
+ if sample.finite_count < min_count:
+ if logger is not None:
+ logger.warning(
+ "%s: insufficient finite sample for distribution fit "
+ "(usable=%d, total=%d, dropped=%d); using fallback %r",
+ context,
+ sample.finite_count,
+ sample.total_count,
+ sample.dropped_count,
+ fallback_tuple,
+ )
+ return fallback_tuple
+
+ sample_range = float(np.max(sample.values) - np.min(sample.values))
+ if require_variance and np.isclose(sample_range, 0.0):
+ if logger is not None:
+ logger.warning(
+ "%s: constant finite sample for distribution fit "
+ "(usable=%d, dropped=%d); using fallback %r",
+ context,
+ sample.finite_count,
+ sample.dropped_count,
+ fallback_tuple,
+ )
+ return fallback_tuple
+
+ try:
+ params = tuple(float(v) for v in fit_fn(sample.values, **fit_kwargs))
+ except (RuntimeError, ValueError, FloatingPointError, np.linalg.LinAlgError) as exc:
+ if logger is not None:
+ logger.warning(
+ "%s: distribution fit failed (%s); using fallback %r",
+ context,
+ exc,
+ fallback_tuple,
+ )
+ return fallback_tuple
+
+ if len(params) != len(fallback_tuple) or not all(np.isfinite(params)):
+ if logger is not None:
+ logger.warning(
+ "%s: distribution fit returned invalid params %r; using fallback %r",
+ context,
+ params,
+ fallback_tuple,
+ )
+ return fallback_tuple
+
+ if sample.dropped_count and logger is not None:
+ logger.debug(
+ "%s: dropped %d/%d non-finite values before distribution fit",
+ context,
+ sample.dropped_count,
+ sample.total_count,
+ )
+ return params
+
+
+def _result_index(*values: Any) -> pd.Index | None:
+ for value in values:
+ if isinstance(value, pd.Series):
+ return value.index
+ return None
+
+
+def _safe_numeric_result(result: NDArray[np.floating], *values: Any) -> Any:
+ """Attach the first input Series's index to a numeric result (positional).
+
+ The result is positionally aligned with the first input Series found in
+ ``values``; pandas index alignment is NOT performed. Callers passing
+ multiple Series must ensure they share a common index.
+ """
+ index = _result_index(*values)
+ if index is not None and result.ndim == 1 and result.size == len(index):
+ return pd.Series(result, index=index)
+ if result.ndim == 0:
+ return float(result)
+ return result
+
+
+def safe_divide(
+ numerator: Any,
+ denominator: Any,
+ *,
+ fallback: float = np.nan,
+ context: str = "safe_divide",
+ logger: Logger | None = None,
+) -> Any:
+ """Element-wise division with non-finite and near-zero denominator guards.
+
+ Replaces results from divisions whose numerator or denominator is non-finite,
+ or whose denominator satisfies ``np.isclose(denom, 0.0)`` (default
+ ``atol=1e-8``), with ``fallback``. The fallback is also substituted for
+ any non-finite division output (e.g. ``inf`` from a subnormal denominator
+ that escapes the ``np.isclose`` gate).
+
+ Returns a ``pd.Series`` indexed on the first Series among the inputs when
+ shapes align, a Python ``float`` for 0-d results, otherwise an ``ndarray``.
+ """
+ numerator_arr = np.asarray(numerator, dtype=float)
+ denominator_arr = np.asarray(denominator, dtype=float)
+ valid_mask = (
+ np.isfinite(numerator_arr)
+ & np.isfinite(denominator_arr)
+ & ~np.isclose(denominator_arr, 0.0)
+ )
+ with np.errstate(divide="ignore", invalid="ignore"):
+ result = np.divide(
+ numerator_arr,
+ denominator_arr,
+ out=np.full(
+ np.broadcast_shapes(numerator_arr.shape, denominator_arr.shape),
+ fallback,
+ dtype=float,
+ ),
+ where=valid_mask,
+ )
+ finite_mask = np.isfinite(result)
+ invalid_count = int(np.size(result) - np.count_nonzero(finite_mask))
+ result = np.where(finite_mask, result, fallback)
+ if invalid_count and logger is not None:
+ logger.debug(
+ "%s: replaced %d invalid division result(s) with %r",
+ context,
+ invalid_count,
+ fallback,
+ )
+ return _safe_numeric_result(np.asarray(result, dtype=float), numerator, denominator)
+
+
+def safe_log_ratio(
+ numerator: Any,
+ denominator: Any,
+ *,
+ fallback: float = np.nan,
+ context: str = "safe_log_ratio",
+ logger: Logger | None = None,
+) -> Any:
+ """Element-wise ``log(numerator / denominator)`` with positivity guards.
+
+ Requires both operands to be finite and strictly positive; otherwise the
+ output position is set to ``fallback``. Any non-finite log output is also
+ coerced to ``fallback``.
+
+ Returns a ``pd.Series`` indexed on the first Series among the inputs when
+ shapes align, a Python ``float`` for 0-d results, otherwise an ``ndarray``.
+ """
+ numerator_arr = np.asarray(numerator, dtype=float)
+ denominator_arr = np.asarray(denominator, dtype=float)
+ valid_mask = (
+ np.isfinite(numerator_arr)
+ & np.isfinite(denominator_arr)
+ & (numerator_arr > 0.0)
+ & (denominator_arr > 0.0)
+ )
+ with np.errstate(divide="ignore", invalid="ignore"):
+ log_num = np.log(
+ np.where(valid_mask, numerator_arr, 1.0),
+ )
+ log_den = np.log(
+ np.where(valid_mask, denominator_arr, 1.0),
+ )
+ result = np.where(valid_mask, log_num - log_den, fallback)
+ finite_mask = np.isfinite(result)
+ invalid_count = int(np.size(result) - np.count_nonzero(finite_mask))
+ result = np.where(finite_mask, result, fallback)
+ if invalid_count and logger is not None:
+ logger.debug(
+ "%s: replaced %d invalid log-ratio result(s) with %r",
+ context,
+ invalid_count,
+ fallback,
+ )
+ return _safe_numeric_result(np.asarray(result, dtype=float), numerator, denominator)
+
+
@dataclass(frozen=True, slots=True)
class _EnumValidator:
valid_values: tuple[str, ...]
def nan_average(
values: NDArray[np.floating],
weights: NDArray[np.floating] | None = None,
+ *,
+ logger: Logger | None = None,
) -> float:
+ """Weighted nan-aware mean with finite/zero-weight guards.
+
+ Returns ``np.nan`` when no finite (value, weight) pair survives, when
+ ``weights.shape != values.shape``, or when the finite-weights subset
+ sums to zero. Diverges from ``np.nanmean`` by stripping ``+/-inf``
+ along with ``NaN``; current call sites feed bounded quantities so the
+ ``+/-inf`` strip is a no-op in practice.
+ """
values = np.asarray(values, dtype=float)
if values.size == 0:
return np.nan
if weights is None:
- return float(np.nanmean(values))
+ finite_values = values[np.isfinite(values)]
+ if finite_values.size == 0:
+ return np.nan
+ return float(np.mean(finite_values))
weights = np.asarray(weights, dtype=float)
+ if weights.shape != values.shape:
+ if logger is not None:
+ logger.warning(
+ "nan_average: values/weights shape mismatch (%r != %r); using fallback NaN",
+ values.shape,
+ weights.shape,
+ )
+ return np.nan
+
mask = np.isfinite(values) & np.isfinite(weights)
if not mask.any():
return np.nan
+ weight_sum = float(np.sum(weights[mask]))
+ if not np.isfinite(weight_sum) or np.isclose(weight_sum, 0.0):
+ if logger is not None:
+ logger.warning(
+ "nan_average: finite weights sum to %g; using fallback NaN",
+ weight_sum,
+ )
+ return np.nan
+
return float(np.average(values[mask], weights=weights[mask]))
return sp.signal.find_peaks(-series)[0].size + sp.signal.find_peaks(series)[0].size
-def top_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def top_log_return(
+ dataframe: pd.DataFrame,
+ period: int,
+ *,
+ logger: Logger | None = None,
+) -> pd.Series:
"""Logarithmic return from rolling maximum: ``log(close / rolling_max)``.
Measures distance below the highest close in previous ``period`` bars.
dataframe.get("close").rolling(period, min_periods=period).max().shift(1)
)
- return np.log(dataframe.get("close") / previous_close_top)
+ return safe_log_ratio(
+ dataframe.get("close"),
+ previous_close_top,
+ context="top_log_return",
+ logger=logger,
+ )
-def bottom_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def bottom_log_return(
+ dataframe: pd.DataFrame,
+ period: int,
+ *,
+ logger: Logger | None = None,
+) -> pd.Series:
"""Logarithmic return from rolling minimum: ``log(close / rolling_min)``.
Measures distance above the lowest close in previous ``period`` bars.
dataframe.get("close").rolling(period, min_periods=period).min().shift(1)
)
- return np.log(dataframe.get("close") / previous_close_bottom)
+ return safe_log_ratio(
+ dataframe.get("close"),
+ previous_close_bottom,
+ context="bottom_log_return",
+ logger=logger,
+ )
-def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def price_retracement_percent(
+ dataframe: pd.DataFrame,
+ period: int,
+ *,
+ logger: Logger | None = None,
+) -> pd.Series:
"""Normalized log-scale position of close within rolling high/low range.
Formula: ``log(close / low) / log(high / low)``. Returns 0 at bottom, 1
previous_close_high = (
dataframe.get("close").rolling(period, min_periods=period).max().shift(1)
)
- denominator = np.log(previous_close_high / previous_close_low)
- return (np.log(dataframe.get("close") / previous_close_low) / denominator).where(
- ~np.isclose(denominator, 0.0), 0.0
+ denominator = safe_log_ratio(
+ previous_close_high,
+ previous_close_low,
+ context="price_retracement_percent:denominator",
+ logger=logger,
+ )
+ numerator = safe_log_ratio(
+ dataframe.get("close"),
+ previous_close_low,
+ context="price_retracement_percent:numerator",
+ logger=logger,
)
+ result = safe_divide(
+ numerator,
+ denominator,
+ fallback=np.nan,
+ context="price_retracement_percent",
+ logger=logger,
+ )
+ return result.where(~np.isclose(denominator, 0.0), 0.0)
# VWAP bands
mamode: str = "sma",
zero_lag: bool = False,
normalize: bool = False,
+ *,
+ logger: Logger | None = None,
) -> pd.Series:
"""
Calculate the Elliott Wave Oscillator (EWO) using two moving averages.
ma2 = ma_fn(prices, timeperiod=ma2_length)
madiff = ma1 - ma2
if normalize:
- madiff = (madiff / prices) * 100.0
+ madiff = safe_divide(
+ madiff,
+ prices,
+ context="ewo:normalize",
+ logger=logger,
+ ) * 100.0
return madiff
natr_period: int = 14,
natr_multiplier: float = 9.0,
normalize: bool = False,
+ *,
+ logger: Logger | None = None,
) -> tuple[
list[int],
list[float],
indices: list[int] = df.index.tolist()
thresholds: NDArray[np.floating] = natr_values * natr_multiplier
- closes_log = np.log(df.get("close").to_numpy())
- highs_log = np.log(df.get("high").to_numpy())
- lows_log = np.log(df.get("low").to_numpy())
+ closes = df.get("close").to_numpy(dtype=float)
+ highs = df.get("high").to_numpy(dtype=float)
+ lows = df.get("low").to_numpy(dtype=float)
+ invalid_price_count = int(
+ np.count_nonzero(
+ ~np.isfinite(closes)
+ | ~np.isfinite(highs)
+ | ~np.isfinite(lows)
+ | (closes <= 0.0)
+ | (highs <= 0.0)
+ | (lows <= 0.0)
+ )
+ )
+ if invalid_price_count and logger is not None:
+ logger.debug(
+ "zigzag: %d rows have non-finite or non-positive OHLC values; derived pivot metrics are NaN at those positions",
+ invalid_price_count,
+ )
+ with np.errstate(divide="ignore", invalid="ignore"):
+ closes_log = np.where(np.isfinite(closes) & (closes > 0.0), np.log(closes), np.nan)
+ highs_log = np.where(np.isfinite(highs) & (highs > 0.0), np.log(highs), np.nan)
+ lows_log = np.where(np.isfinite(lows) & (lows > 0.0), np.log(lows), np.nan)
volumes = df.get("volume").to_numpy()
state: TrendDirection = TrendDirection.NEUTRAL
"catboost",
)
-RegressorCallback = Union[Callable[..., Any], XGBoostTrainingCallback]
+RegressorCallback = Callable[..., Any] | XGBoostTrainingCallback
_EARLY_STOPPING_ROUNDS_DEFAULT: Final[int] = 50