]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
fix(quickadapter): harden numerical guard paths (#79)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 21 Jun 2026 19:26:38 +0000 (21:26 +0200)
committerGitHub <noreply@github.com>
Sun, 21 Jun 2026 19:26:38 +0000 (21:26 +0200)
Shared finite-sample, guarded distribution-fit, safe divide/log-ratio,
and sigmoid-domain helpers. Log/division feature paths route through
the helpers; distribution fits guard empty, non-finite, and constant
samples.

- `Utils.py` helpers: `FiniteSample` dataclass with `finite_sample`;
  `safe_distribution_fit` (documented fallback-length contract);
  `safe_divide`; `safe_log_ratio`.
- `nan_average` finite/zero-weight guards; documented divergence from
  `np.nanmean` (strips +/-inf as well as NaN; bounded for current
  callers).
- `_clip_sigmoid_domain` in `LabelTransformer.py` guards
  `sp.special.logit` against values outside the open `(-1, 1)` domain
  during `sigmoid` inverse normalization.
- `feature_engineering_expand_basic` and Utils log/divide sites
  (`top_log_return`, `bottom_log_return`, `price_retracement_percent`,
  `ewo` normalize, `zigzag` log prices, KC/BB/VWAP widths) route
  through the safe helpers.
- DI Weibull and label `norm` fits in `fit_live_predictions` use
  `safe_distribution_fit`; DI cutoff fallback at
  `_DI_CUTOFF_DEFAULT: Final[float] = 2.0`.

quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/LabelTransformer.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index 233abb4533f953684666f4a700b1cf57165b4df1..ca9a712f3f09109e675eb464268065725ed63f3a 100644 (file)
@@ -54,6 +54,7 @@ from Utils import (
     ensure_datetime_series,
     make_test_set_and_weights,
     fit_regressor,
+    finite_sample,
     format_dict,
     format_number,
     get_causal_mode,
@@ -69,6 +70,7 @@ from Utils import (
     optuna_load_best_params,
     optuna_save_best_params,
     sanitize_and_renormalize,
+    safe_distribution_fit,
     soft_extremum,
     zigzag,
 )
@@ -123,7 +125,10 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     version = "3.12.0"
 
     _TEST_SIZE: Final[float] = 0.1
-
+    # Substituted whenever the Weibull DI cutoff (``weibull_min.ppf``) is
+    # non-finite (cold start or degenerate fit). Preserves the prior
+    # pre-warm-up heuristic for the outlier-quantile cutoff scale.
+    _DI_CUTOFF_DEFAULT: Final[float] = 2.0
     _SKLEARN_TRAIN_TEST_SPLIT_KEYS: Final[frozenset[str]] = frozenset(
         {"test_size", "train_size", "random_state", "shuffle", "stratify"}
     )
@@ -2223,7 +2228,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                 if not warmed_up:
                     min_pred, max_pred = -2.0, 2.0
                     f = [0.0, 0.0, 0.0]
-                    cutoff = 2.0
+                    cutoff = QuickAdapterRegressorV3._DI_CUTOFF_DEFAULT
                 else:
                     min_pred, max_pred = self.min_max_pred(
                         label_col,
@@ -2234,14 +2239,41 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                             pair, QuickAdapterRegressorV3._OPTUNA_NAMESPACES[1]
                         ).get("label_period_candles"),  # "label"
                     )
-                    f = sp.stats.weibull_min.fit(
-                        pd.to_numeric(di_values, errors="coerce").dropna(), floc=0
+                    di_sample = finite_sample(
+                        []
+                        if di_values is None
+                        else pd.to_numeric(di_values, errors="coerce"),
+                        positive_only=True,
+                    )
+                    f = safe_distribution_fit(
+                        di_sample,
+                        sp.stats.weibull_min.fit,
+                        # Intentionally non-ppf-able; the ``weibull_min.ppf``
+                        # downstream returns NaN on degenerate scale and the
+                        # ``np.isfinite(cutoff)`` guard substitutes
+                        # ``_DI_CUTOFF_DEFAULT``.
+                        fallback=(0.0, 0.0, 0.0),
+                        context=f"di_values_weibull_fit:{pair}",
+                        logger=logger,
+                        min_count=2,
+                        require_variance=True,
+                        floc=0,
                     )
                     outlier_quantile = col_prediction_config.get(
                         "outlier_quantile",
                         DEFAULTS_LABEL_PREDICTION["outlier_quantile"],
                     )
                     cutoff = sp.stats.weibull_min.ppf(outlier_quantile, *f)
+                    if not np.isfinite(cutoff):
+                        logger.warning(
+                            "[%s] DI_values Weibull cutoff is invalid "
+                            "(params=%r, quantile=%r); using fallback %r",
+                            pair,
+                            f,
+                            outlier_quantile,
+                            QuickAdapterRegressorV3._DI_CUTOFF_DEFAULT,
+                        )
+                        cutoff = QuickAdapterRegressorV3._DI_CUTOFF_DEFAULT
                 dk.data["extra_returns_per_train"][f"{label_col}_minima_threshold"] = (
                     min_pred
                 )
@@ -2261,7 +2293,25 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             if not warmed_up:
                 f = [0.0, 0.0]
             else:
-                f = sp.stats.norm.fit(pred_label)
+                sample = finite_sample(pred_label)
+                if sample.finite_count == 0:
+                    fallback = (0.0, 0.0)
+                else:
+                    sample_mean = float(np.mean(sample.values))
+                    sample_std = float(np.std(sample.values, ddof=0))
+                    fallback = (
+                        sample_mean if np.isfinite(sample_mean) else 0.0,
+                        sample_std if np.isfinite(sample_std) else 0.0,
+                    )
+                f = safe_distribution_fit(
+                    sample,
+                    sp.stats.norm.fit,
+                    fallback=fallback,
+                    context=f"label_norm_fit:{pair}:{label_col}",
+                    logger=logger,
+                    min_count=2,
+                    require_variance=True,
+                )
             dk.data["labels_mean"][label_col], dk.data["labels_std"][label_col] = (
                 f[0],
                 f[1],
@@ -4019,6 +4069,7 @@ def label_objective(
         df,
         natr_period=label_period_candles,
         natr_multiplier=label_natr_multiplier,
+        logger=logger,
     )
 
     median_amplitude = np.nanmedian(np.asarray(pivots_amplitudes, dtype=float))
index e2673a717074f40d66fed7fcfc9c1a214a736aa7..2a34253b0258ff3e8b4e0a142806fe293bce4397 100644 (file)
@@ -22,6 +22,17 @@ from sklearn.preprocessing import (
 
 logger = logging.getLogger(__name__)
 
+
+def _clip_sigmoid_domain(values: NDArray[np.floating]) -> NDArray[np.floating]:
+    """Clip ``values`` to the open interval ``(-1, 1)``.
+
+    The clip bounds are ``[-1 + eps, 1 - eps]`` with
+    ``eps = np.finfo(float).eps`` so that downstream ``logit((x + 1) / 2)``
+    stays finite at the boundary. NaN values pass through unchanged.
+    """
+    eps = np.finfo(float).eps
+    return np.clip(values, -1.0 + eps, 1.0 - eps)
+
 CombinedMetric = Literal[
     "amplitude",
     "amplitude_threshold_ratio",
@@ -342,7 +353,18 @@ class LabelTransformer(BaseTransform):
             return values
         out = values.copy()
         if inverse:
-            out[mask] = sp.special.logit((values[mask] + 1.0) / 2.0) / scale
+            clipped = _clip_sigmoid_domain(values[mask])
+            clipped_count = int(
+                np.count_nonzero(
+                    (clipped != values[mask]) & ~np.isnan(values[mask])
+                )
+            )
+            if clipped_count:
+                logger.warning(
+                    "sigmoid_inverse_normalize: clipped %d value(s) outside the open (-1, 1) domain",
+                    clipped_count,
+                )
+            out[mask] = sp.special.logit((clipped + 1.0) / 2.0) / scale
         else:
             out[mask] = 2.0 * sp.special.expit(scale * values[mask]) - 1.0
         return out
index 3f01dd3e5321432dd8e7730b35a5b3fbe4c57d01..2beefe6ce526951d1508bcfe3e8b5e677be7827c 100644 (file)
@@ -64,6 +64,7 @@ from Utils import (
     non_zero_diff,
     optuna_load_best_params,
     price_retracement_percent,
+    safe_divide,
     smooth,
     top_log_return,
     validate_range,
@@ -654,10 +655,16 @@ class QuickAdapterV3(IStrategy):
             length=period,
         )
         # TODO [BREAKING]: Rename %-tcp-period -> %-top_log_return-period
-        dataframe["%-tcp-period"] = top_log_return(dataframe, period=period)
+        dataframe["%-tcp-period"] = top_log_return(
+            dataframe, period=period, logger=logger
+        )
         # TODO [BREAKING]: Rename %-bcp-period -> %-bottom_log_return-period
-        dataframe["%-bcp-period"] = bottom_log_return(dataframe, period=period)
-        dataframe["%-prp-period"] = price_retracement_percent(dataframe, period=period)
+        dataframe["%-bcp-period"] = bottom_log_return(
+            dataframe, period=period, logger=logger
+        )
+        dataframe["%-prp-period"] = price_retracement_percent(
+            dataframe, period=period, logger=logger
+        )
         dataframe["%-cti-period"] = pta.cti(closes, length=period)
         dataframe["%-chop-period"] = pta.chop(
             highs,
@@ -682,7 +689,20 @@ class QuickAdapterV3(IStrategy):
         volumes = dataframe.get("volume")
 
         # TODO [BREAKING]: Rename %-close_pct_change -> %-close_log_return
-        dataframe["%-close_pct_change"] = np.log(closes).diff()
+        close_values = closes.to_numpy(dtype=float)
+        invalid_close_count = int(
+            np.count_nonzero(~np.isfinite(close_values) | (close_values <= 0.0))
+        )
+        if invalid_close_count:
+            logger.debug(
+                "feature_engineering_expand_basic: %d close values are non-finite or non-positive; close log return is NaN at those positions",
+                invalid_close_count,
+            )
+        with np.errstate(divide="ignore", invalid="ignore"):
+            dataframe["%-close_pct_change"] = Series(
+                np.where(np.isfinite(close_values) & (close_values > 0.0), np.log(close_values), np.nan),
+                index=dataframe.index,
+            ).diff()
         dataframe["%-raw_volume"] = volumes
         dataframe["%-obv"] = ta.OBV(dataframe)
         label_period_candles = self.get_label_period_candles(str(metadata.get("pair")))
@@ -698,6 +718,7 @@ class QuickAdapterV3(IStrategy):
             mamode="ema",
             zero_lag=True,
             normalize=True,
+            logger=logger,
         )
         dataframe["%-diff_to_psar"] = closes - ta.SAR(
             dataframe, acceleration=0.02, maximum=0.2
@@ -713,8 +734,13 @@ class QuickAdapterV3(IStrategy):
         dataframe["kc_middleband"] = kc["KCBe_14_2.0"]
         dataframe["kc_upperband"] = kc["KCUe_14_2.0"]
         dataframe["%-kc_width"] = (
-            dataframe["kc_upperband"] - dataframe["kc_lowerband"]
-        ) / dataframe["kc_middleband"]
+            safe_divide(
+                dataframe["kc_upperband"] - dataframe["kc_lowerband"],
+                dataframe["kc_middleband"],
+                context="feature_engineering_expand_basic:kc_width",
+                logger=logger,
+            )
+        )
         (
             dataframe["bb_upperband"],
             dataframe["bb_middleband"],
@@ -726,8 +752,13 @@ class QuickAdapterV3(IStrategy):
             nbdevdn=2.2,
         )
         dataframe["%-bb_width"] = (
-            dataframe["bb_upperband"] - dataframe["bb_lowerband"]
-        ) / dataframe["bb_middleband"]
+            safe_divide(
+                dataframe["bb_upperband"] - dataframe["bb_lowerband"],
+                dataframe["bb_middleband"],
+                context="feature_engineering_expand_basic:bb_width",
+                logger=logger,
+            )
+        )
         dataframe["%-ibs"] = (closes - lows) / non_zero_diff(highs, lows)
         dataframe["jaw"], dataframe["teeth"], dataframe["lips"] = alligator(
             dataframe, pricemode="median", zero_lag=True
@@ -758,8 +789,13 @@ class QuickAdapterV3(IStrategy):
             dataframe["vwap_upperband"],
         ) = vwapb(dataframe, 20, 1.0)
         dataframe["%-vwap_width"] = (
-            dataframe["vwap_upperband"] - dataframe["vwap_lowerband"]
-        ) / dataframe["vwap_middleband"]
+            safe_divide(
+                dataframe["vwap_upperband"] - dataframe["vwap_lowerband"],
+                dataframe["vwap_middleband"],
+                context="feature_engineering_expand_basic:vwap_width",
+                logger=logger,
+            )
+        )
         dataframe["%-dist_to_vwap_upperband"] = get_distance(
             closes, dataframe["vwap_upperband"]
         )
@@ -1113,6 +1149,7 @@ class QuickAdapterV3(IStrategy):
         return nan_average(
             np.array([entry_natr, current_natr, median_natr]),
             weights=np.array([entry_weight, current_weight, median_weight]),
+            logger=logger,
         )
 
     def get_trade_quantile_interpolation_natr(
index 33e579d618e44db68efe196de27fcddebf6f598c..82e213007b88d3cce886aa99ffe008d96b83875d 100644 (file)
@@ -17,7 +17,6 @@ from typing import (
     Final,
     Literal,
     TypeVar,
-    Union,
 )
 
 import numpy as np
@@ -62,6 +61,246 @@ else:
 T = TypeVar("T", pd.Series, float)
 
 
+@dataclass(frozen=True, slots=True)
+class FiniteSample:
+    """Filtered finite-only sample produced by :func:`finite_sample`.
+
+    ``values`` holds the subset of the input that survives the finite (and
+    optionally positive) mask. ``total_count``, ``finite_count`` and
+    ``dropped_count`` describe the input partition; the invariant
+    ``dropped_count == total_count - finite_count`` always holds. Construct
+    via :func:`finite_sample`; instances bypassing the factory do NOT
+    enforce the finite-only invariant on ``values``.
+    """
+
+    values: NDArray[np.floating]
+    total_count: int
+    finite_count: int
+    dropped_count: int
+
+
+def finite_sample(
+    values: Any,
+    *,
+    positive_only: bool = False,
+) -> FiniteSample:
+    """Return a :class:`FiniteSample` from ``values``.
+
+    Flattens ``values`` to 1-d, coerces to ``float64``, strips non-finite
+    entries. With ``positive_only=True`` also strips entries ``<= 0.0``
+    (strict; signed zero is rejected).
+    """
+    arr = np.asarray(values, dtype=float).reshape(-1)
+    mask = np.isfinite(arr)
+    if positive_only:
+        mask &= arr > 0.0
+    sample = arr[mask]
+    return FiniteSample(
+        values=sample,
+        total_count=int(arr.size),
+        finite_count=int(sample.size),
+        dropped_count=int(arr.size - sample.size),
+    )
+
+
+def safe_distribution_fit(
+    sample: FiniteSample,
+    fit_fn: Callable[..., Any],
+    *,
+    fallback: Sequence[float],
+    context: str,
+    logger: Logger | None = None,
+    min_count: int = 2,
+    require_variance: bool = True,
+    **fit_kwargs: Any,
+) -> tuple[float, ...]:
+    """Fit a scipy distribution with finite/variance/error guards.
+
+    Caller is responsible for constructing ``sample`` via
+    :func:`finite_sample` (with ``positive_only=True`` for strictly
+    positive distributions like ``weibull_min``). The ``fallback`` length
+    must match the parameter count returned by ``fit_fn`` (e.g. 3 for
+    ``weibull_min`` with ``floc=0``, 2 for ``norm``); a length mismatch
+    is treated as a fit failure and ``fallback`` is returned.
+    """
+    fallback_tuple = tuple(float(v) for v in fallback)
+
+    if sample.finite_count < min_count:
+        if logger is not None:
+            logger.warning(
+                "%s: insufficient finite sample for distribution fit "
+                "(usable=%d, total=%d, dropped=%d); using fallback %r",
+                context,
+                sample.finite_count,
+                sample.total_count,
+                sample.dropped_count,
+                fallback_tuple,
+            )
+        return fallback_tuple
+
+    sample_range = float(np.max(sample.values) - np.min(sample.values))
+    if require_variance and np.isclose(sample_range, 0.0):
+        if logger is not None:
+            logger.warning(
+                "%s: constant finite sample for distribution fit "
+                "(usable=%d, dropped=%d); using fallback %r",
+                context,
+                sample.finite_count,
+                sample.dropped_count,
+                fallback_tuple,
+            )
+        return fallback_tuple
+
+    try:
+        params = tuple(float(v) for v in fit_fn(sample.values, **fit_kwargs))
+    except (RuntimeError, ValueError, FloatingPointError, np.linalg.LinAlgError) as exc:
+        if logger is not None:
+            logger.warning(
+                "%s: distribution fit failed (%s); using fallback %r",
+                context,
+                exc,
+                fallback_tuple,
+            )
+        return fallback_tuple
+
+    if len(params) != len(fallback_tuple) or not all(np.isfinite(params)):
+        if logger is not None:
+            logger.warning(
+                "%s: distribution fit returned invalid params %r; using fallback %r",
+                context,
+                params,
+                fallback_tuple,
+            )
+        return fallback_tuple
+
+    if sample.dropped_count and logger is not None:
+        logger.debug(
+            "%s: dropped %d/%d non-finite values before distribution fit",
+            context,
+            sample.dropped_count,
+            sample.total_count,
+        )
+    return params
+
+
+def _result_index(*values: Any) -> pd.Index | None:
+    for value in values:
+        if isinstance(value, pd.Series):
+            return value.index
+    return None
+
+
+def _safe_numeric_result(result: NDArray[np.floating], *values: Any) -> Any:
+    """Attach the first input Series's index to a numeric result (positional).
+
+    The result is positionally aligned with the first input Series found in
+    ``values``; pandas index alignment is NOT performed. Callers passing
+    multiple Series must ensure they share a common index.
+    """
+    index = _result_index(*values)
+    if index is not None and result.ndim == 1 and result.size == len(index):
+        return pd.Series(result, index=index)
+    if result.ndim == 0:
+        return float(result)
+    return result
+
+
+def safe_divide(
+    numerator: Any,
+    denominator: Any,
+    *,
+    fallback: float = np.nan,
+    context: str = "safe_divide",
+    logger: Logger | None = None,
+) -> Any:
+    """Element-wise division with non-finite and near-zero denominator guards.
+
+    Replaces results from divisions whose numerator or denominator is non-finite,
+    or whose denominator satisfies ``np.isclose(denom, 0.0)`` (default
+    ``atol=1e-8``), with ``fallback``. The fallback is also substituted for
+    any non-finite division output (e.g. ``inf`` from a subnormal denominator
+    that escapes the ``np.isclose`` gate).
+
+    Returns a ``pd.Series`` indexed on the first Series among the inputs when
+    shapes align, a Python ``float`` for 0-d results, otherwise an ``ndarray``.
+    """
+    numerator_arr = np.asarray(numerator, dtype=float)
+    denominator_arr = np.asarray(denominator, dtype=float)
+    valid_mask = (
+        np.isfinite(numerator_arr)
+        & np.isfinite(denominator_arr)
+        & ~np.isclose(denominator_arr, 0.0)
+    )
+    with np.errstate(divide="ignore", invalid="ignore"):
+        result = np.divide(
+            numerator_arr,
+            denominator_arr,
+            out=np.full(
+                np.broadcast_shapes(numerator_arr.shape, denominator_arr.shape),
+                fallback,
+                dtype=float,
+            ),
+            where=valid_mask,
+        )
+    finite_mask = np.isfinite(result)
+    invalid_count = int(np.size(result) - np.count_nonzero(finite_mask))
+    result = np.where(finite_mask, result, fallback)
+    if invalid_count and logger is not None:
+        logger.debug(
+            "%s: replaced %d invalid division result(s) with %r",
+            context,
+            invalid_count,
+            fallback,
+        )
+    return _safe_numeric_result(np.asarray(result, dtype=float), numerator, denominator)
+
+
+def safe_log_ratio(
+    numerator: Any,
+    denominator: Any,
+    *,
+    fallback: float = np.nan,
+    context: str = "safe_log_ratio",
+    logger: Logger | None = None,
+) -> Any:
+    """Element-wise ``log(numerator / denominator)`` with positivity guards.
+
+    Requires both operands to be finite and strictly positive; otherwise the
+    output position is set to ``fallback``. Any non-finite log output is also
+    coerced to ``fallback``.
+
+    Returns a ``pd.Series`` indexed on the first Series among the inputs when
+    shapes align, a Python ``float`` for 0-d results, otherwise an ``ndarray``.
+    """
+    numerator_arr = np.asarray(numerator, dtype=float)
+    denominator_arr = np.asarray(denominator, dtype=float)
+    valid_mask = (
+        np.isfinite(numerator_arr)
+        & np.isfinite(denominator_arr)
+        & (numerator_arr > 0.0)
+        & (denominator_arr > 0.0)
+    )
+    with np.errstate(divide="ignore", invalid="ignore"):
+        log_num = np.log(
+            np.where(valid_mask, numerator_arr, 1.0),
+        )
+        log_den = np.log(
+            np.where(valid_mask, denominator_arr, 1.0),
+        )
+        result = np.where(valid_mask, log_num - log_den, fallback)
+    finite_mask = np.isfinite(result)
+    invalid_count = int(np.size(result) - np.count_nonzero(finite_mask))
+    result = np.where(finite_mask, result, fallback)
+    if invalid_count and logger is not None:
+        logger.debug(
+            "%s: replaced %d invalid log-ratio result(s) with %r",
+            context,
+            invalid_count,
+            fallback,
+        )
+    return _safe_numeric_result(np.asarray(result, dtype=float), numerator, denominator)
+
+
 @dataclass(frozen=True, slots=True)
 class _EnumValidator:
     valid_values: tuple[str, ...]
@@ -948,19 +1187,50 @@ def compose_sample_weights(
 def nan_average(
     values: NDArray[np.floating],
     weights: NDArray[np.floating] | None = None,
+    *,
+    logger: Logger | None = None,
 ) -> float:
+    """Weighted nan-aware mean with finite/zero-weight guards.
+
+    Returns ``np.nan`` when no finite (value, weight) pair survives, when
+    ``weights.shape != values.shape``, or when the finite-weights subset
+    sums to zero. Diverges from ``np.nanmean`` by stripping ``+/-inf``
+    along with ``NaN``; current call sites feed bounded quantities so the
+    ``+/-inf`` strip is a no-op in practice.
+    """
     values = np.asarray(values, dtype=float)
     if values.size == 0:
         return np.nan
 
     if weights is None:
-        return float(np.nanmean(values))
+        finite_values = values[np.isfinite(values)]
+        if finite_values.size == 0:
+            return np.nan
+        return float(np.mean(finite_values))
 
     weights = np.asarray(weights, dtype=float)
+    if weights.shape != values.shape:
+        if logger is not None:
+            logger.warning(
+                "nan_average: values/weights shape mismatch (%r != %r); using fallback NaN",
+                values.shape,
+                weights.shape,
+            )
+        return np.nan
+
     mask = np.isfinite(values) & np.isfinite(weights)
     if not mask.any():
         return np.nan
 
+    weight_sum = float(np.sum(weights[mask]))
+    if not np.isfinite(weight_sum) or np.isclose(weight_sum, 0.0):
+        if logger is not None:
+            logger.warning(
+                "nan_average: finite weights sum to %g; using fallback NaN",
+                weight_sum,
+            )
+        return np.nan
+
     return float(np.average(values[mask], weights=weights[mask]))
 
 
@@ -1858,7 +2128,12 @@ def calculate_n_extrema(series: pd.Series) -> int:
     return sp.signal.find_peaks(-series)[0].size + sp.signal.find_peaks(series)[0].size
 
 
-def top_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def top_log_return(
+    dataframe: pd.DataFrame,
+    period: int,
+    *,
+    logger: Logger | None = None,
+) -> pd.Series:
     """Logarithmic return from rolling maximum: ``log(close / rolling_max)``.
 
     Measures distance below the highest close in previous ``period`` bars.
@@ -1871,10 +2146,20 @@ def top_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
         dataframe.get("close").rolling(period, min_periods=period).max().shift(1)
     )
 
-    return np.log(dataframe.get("close") / previous_close_top)
+    return safe_log_ratio(
+        dataframe.get("close"),
+        previous_close_top,
+        context="top_log_return",
+        logger=logger,
+    )
 
 
-def bottom_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def bottom_log_return(
+    dataframe: pd.DataFrame,
+    period: int,
+    *,
+    logger: Logger | None = None,
+) -> pd.Series:
     """Logarithmic return from rolling minimum: ``log(close / rolling_min)``.
 
     Measures distance above the lowest close in previous ``period`` bars.
@@ -1887,10 +2172,20 @@ def bottom_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
         dataframe.get("close").rolling(period, min_periods=period).min().shift(1)
     )
 
-    return np.log(dataframe.get("close") / previous_close_bottom)
+    return safe_log_ratio(
+        dataframe.get("close"),
+        previous_close_bottom,
+        context="bottom_log_return",
+        logger=logger,
+    )
 
 
-def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def price_retracement_percent(
+    dataframe: pd.DataFrame,
+    period: int,
+    *,
+    logger: Logger | None = None,
+) -> pd.Series:
     """Normalized log-scale position of close within rolling high/low range.
 
     Formula: ``log(close / low) / log(high / low)``. Returns 0 at bottom, 1
@@ -1906,10 +2201,26 @@ def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series
     previous_close_high = (
         dataframe.get("close").rolling(period, min_periods=period).max().shift(1)
     )
-    denominator = np.log(previous_close_high / previous_close_low)
-    return (np.log(dataframe.get("close") / previous_close_low) / denominator).where(
-        ~np.isclose(denominator, 0.0), 0.0
+    denominator = safe_log_ratio(
+        previous_close_high,
+        previous_close_low,
+        context="price_retracement_percent:denominator",
+        logger=logger,
+    )
+    numerator = safe_log_ratio(
+        dataframe.get("close"),
+        previous_close_low,
+        context="price_retracement_percent:numerator",
+        logger=logger,
     )
+    result = safe_divide(
+        numerator,
+        denominator,
+        fallback=np.nan,
+        context="price_retracement_percent",
+        logger=logger,
+    )
+    return result.where(~np.isclose(denominator, 0.0), 0.0)
 
 
 # VWAP bands
@@ -2092,6 +2403,8 @@ def ewo(
     mamode: str = "sma",
     zero_lag: bool = False,
     normalize: bool = False,
+    *,
+    logger: Logger | None = None,
 ) -> pd.Series:
     """
     Calculate the Elliott Wave Oscillator (EWO) using two moving averages.
@@ -2112,7 +2425,12 @@ def ewo(
     ma2 = ma_fn(prices, timeperiod=ma2_length)
     madiff = ma1 - ma2
     if normalize:
-        madiff = (madiff / prices) * 100.0
+        madiff = safe_divide(
+            madiff,
+            prices,
+            context="ewo:normalize",
+            logger=logger,
+        ) * 100.0
     return madiff
 
 
@@ -2193,6 +2511,8 @@ def zigzag(
     natr_period: int = 14,
     natr_multiplier: float = 9.0,
     normalize: bool = False,
+    *,
+    logger: Logger | None = None,
 ) -> tuple[
     list[int],
     list[float],
@@ -2222,9 +2542,28 @@ def zigzag(
 
     indices: list[int] = df.index.tolist()
     thresholds: NDArray[np.floating] = natr_values * natr_multiplier
-    closes_log = np.log(df.get("close").to_numpy())
-    highs_log = np.log(df.get("high").to_numpy())
-    lows_log = np.log(df.get("low").to_numpy())
+    closes = df.get("close").to_numpy(dtype=float)
+    highs = df.get("high").to_numpy(dtype=float)
+    lows = df.get("low").to_numpy(dtype=float)
+    invalid_price_count = int(
+        np.count_nonzero(
+            ~np.isfinite(closes)
+            | ~np.isfinite(highs)
+            | ~np.isfinite(lows)
+            | (closes <= 0.0)
+            | (highs <= 0.0)
+            | (lows <= 0.0)
+        )
+    )
+    if invalid_price_count and logger is not None:
+        logger.debug(
+            "zigzag: %d rows have non-finite or non-positive OHLC values; derived pivot metrics are NaN at those positions",
+            invalid_price_count,
+        )
+    with np.errstate(divide="ignore", invalid="ignore"):
+        closes_log = np.where(np.isfinite(closes) & (closes > 0.0), np.log(closes), np.nan)
+        highs_log = np.where(np.isfinite(highs) & (highs > 0.0), np.log(highs), np.nan)
+        lows_log = np.where(np.isfinite(lows) & (lows > 0.0), np.log(lows), np.nan)
     volumes = df.get("volume").to_numpy()
 
     state: TrendDirection = TrendDirection.NEUTRAL
@@ -2693,7 +3032,7 @@ REGRESSORS: Final[tuple[Regressor, ...]] = (
     "catboost",
 )
 
-RegressorCallback = Union[Callable[..., Any], XGBoostTrainingCallback]
+RegressorCallback = Callable[..., Any] | XGBoostTrainingCallback
 
 _EARLY_STOPPING_ROUNDS_DEFAULT: Final[int] = 50