https://github.com/sponsors/robcaulk
"""
- version = "3.7.131"
+ version = "3.7.132"
_SQRT_2: Final[float] = np.sqrt(2.0)
raise ValueError("label_weights must contain only finite values")
if np.any(np_weights < 0):
raise ValueError("label_weights values must be non-negative")
- label_weights_sum = np.sum(np.abs(np_weights))
+ label_weights_sum = np.nansum(np.abs(np_weights))
if np.isclose(label_weights_sum, 0.0):
raise ValueError("label_weights sum cannot be zero")
np_weights = np_weights / label_weights_sum
np_weights = 1 / variances
return (
np.sqrt(
- np.sum(
+ np.nansum(
np_weights
* (np_sqrt_normalized_matrix - np.sqrt(ideal_point)) ** 2,
axis=1,
ideal_point, p=p, weights=np_weights
) - sp.stats.pmean(normalized_matrix, p=p, weights=np_weights, axis=1)
elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[8]: # "weighted_sum"
- return np.sum(np_weights * (ideal_point - normalized_matrix), axis=1)
+ return (ideal_point - normalized_matrix) @ np_weights
elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[16]: # "medoid"
label_medoid_metric = self.ft_params.get(
"label_medoid_metric",
if label_p_order is not None and np.isfinite(label_p_order)
else 2.0
)
- best_medoid_position = np.argmin(
+ best_medoid_position = np.nanargmin(
self._pairwise_distance_sums(
normalized_matrix[best_cluster_indices],
label_kmeans_metric,
metric=label_kmeans_metric,
**cdist_kwargs,
).flatten()
- min_distance_position = np.argmin(best_cluster_distances)
+ min_distance_position = np.nanargmin(best_cluster_distances)
best_trial_index = best_cluster_indices[min_distance_position]
trial_distances[best_trial_index] = best_cluster_distances[
min_distance_position
label_kmedoids_selection = self.ft_params.get(
"label_kmedoids_selection", "min"
)
- best_medoid_distance_position = np.argmin(medoid_distances_to_ideal)
+ best_medoid_distance_position = np.nanargmin(medoid_distances_to_ideal)
best_medoid_index = medoid_indices[best_medoid_distance_position]
cluster_index = cluster_labels[best_medoid_index]
best_cluster_indices = np.flatnonzero(cluster_labels == cluster_index)
metric=label_kmedoids_metric,
**cdist_kwargs,
).flatten()
- min_distance_position = np.argmin(best_cluster_distances)
+ min_distance_position = np.nanargmin(best_cluster_distances)
best_trial_index = best_cluster_indices[min_distance_position]
trial_distances[best_trial_index] = best_cluster_distances[
min_distance_position
normalized_matrix, metric=label_metric, metrics=metrics
)
- return best_trials[np.argmin(trial_distances)]
+ return best_trials[np.nanargmin(trial_distances)]
def optuna_optimize(
self,
get_label_defaults,
get_weighted_extrema,
get_zl_ma_fn,
+ nan_average,
non_zero_diff,
price_retracement_percent,
smooth_extrema,
_TRADING_MODES: Final[tuple[TradingMode, ...]] = ("spot", "margin", "futures")
def version(self) -> str:
- return "3.3.181"
+ return "3.3.182"
timeframe = "5m"
"aggregation_normalization"
]
+ if weighting_aggregation == HYBRID_AGGREGATIONS[
+ 1
+ ] and weighting_normalization in {
+ NORMALIZATION_TYPES[0], # "minmax"
+ NORMALIZATION_TYPES[5], # "rank"
+ }:
+ logger.warning(
+ f"extrema_weighting aggregation='{weighting_aggregation}' with normalization='{weighting_normalization}' "
+ "can produce zero weights (gmean collapses to 0 when any source has min value). "
+ f"Consider using normalization='{NORMALIZATION_TYPES[1]}' (sigmoid) or aggregation='{HYBRID_AGGREGATIONS[0]}' (weighted_sum)."
+ )
+
return {
"strategy": weighting_strategy,
"source_weights": weighting_source_weights,
total_weight = entry_weight + current_weight + median_weight
if np.isclose(total_weight, 0.0):
return np.nanmean([entry_natr, current_natr, median_natr])
- entry_weight /= total_weight
- current_weight /= total_weight
- median_weight /= total_weight
-
- return (
- entry_natr * entry_weight
- + current_natr * current_weight
- + median_natr * median_weight
+ return nan_average(
+ np.array([entry_natr, current_natr, median_natr]),
+ weights=np.array([entry_weight, current_weight, median_weight]),
)
def get_trade_interpolation_natr(
import talib.abstract as ta
from numpy.typing import NDArray
from scipy.ndimage import gaussian_filter1d
+from scipy.stats import gmean
from technical import qtpylib
T = TypeVar("T", pd.Series, float)
return (value1 + value2) / 2
+def nan_average(
+ values: NDArray[np.floating],
+ weights: NDArray[np.floating] | None = None,
+) -> float:
+ values = np.asarray(values, dtype=float)
+ if values.size == 0:
+ return np.nan
+
+ if weights is None:
+ return np.nanmean(values)
+
+ weights = np.asarray(weights, dtype=float)
+ mask = np.isfinite(values) & np.isfinite(weights)
+ if not mask.any():
+ return np.nan
+
+ return np.average(values[mask], weights=weights[mask])
+
+
def non_zero_diff(s1: pd.Series, s2: pd.Series) -> pd.Series:
"""Returns the difference of two series and replaces zeros with epsilon."""
diff = s1 - s2
gamma: float = DEFAULTS_EXTREMA_WEIGHTING["gamma"],
) -> NDArray[np.floating]:
"""
- 3-phase weight normalization:
+ 3-phase weights normalization:
1. Standardization: zscore (w-μ)/σ | robust (w-median)/IQR | mmad (w-median)/MAD | none
2. Normalization: minmax, sigmoid, softmax, l1, l2, rank, none
3. Post-processing: gamma correction w^γ
return weights
weights_finite_mask = np.isfinite(weights)
- if not weights_finite_mask.any():
- return np.full_like(weights, DEFAULT_EXTREMA_WEIGHT, dtype=float)
-
weights = _impute_weights(
weights,
finite_mask=weights_finite_mask,
)
if aggregation == HYBRID_AGGREGATIONS[0]: # "weighted_sum"
- combined_source_weights = np.zeros(n, dtype=float)
- for source_weight, values in zip(np_source_weights, normalized_source_weights):
- combined_source_weights = combined_source_weights + source_weight * values
+ combined_source_weights = np.average(
+ np.vstack(normalized_source_weights), axis=0, weights=np_source_weights
+ )
elif aggregation == HYBRID_AGGREGATIONS[1]: # "geometric_mean"
- combined_source_weights = sp.stats.gmean(
+ combined_source_weights = gmean(
np.vstack([np.abs(values) for values in normalized_source_weights]),
axis=0,
- weights=np_source_weights,
+ weights=np_source_weights[:, np.newaxis],
)
else:
raise ValueError(f"Unknown hybrid aggregation method: {aggregation}")
if len(indices) == 0 or len(weights) == 0:
return pd.Series(DEFAULT_EXTREMA_WEIGHT, index=extrema.index)
- if len(indices) != len(weights):
- raise ValueError(
- f"Length mismatch: {len(indices)} indices but {len(weights)} weights"
- )
-
normalized_weights = normalize_weights(
weights,
standardization=standardization,
gamma=gamma,
)
- if normalized_weights.size == 0 or np.allclose(
- normalized_weights, normalized_weights[0]
- ):
- normalized_weights = np.full_like(normalized_weights, DEFAULT_EXTREMA_WEIGHT)
-
return _weights_array_to_series(
index=extrema.index,
indices=indices,
def _apply_weights(extrema: pd.Series, weights: pd.Series) -> pd.Series:
- if weights.empty:
+ weights_values = weights.to_numpy(dtype=float)
+ if weights_values.size == 0:
+ return extrema
+
+ if not np.isfinite(weights_values).all() or np.allclose(
+ weights_values, weights_values[0]
+ ):
return extrema
- if np.allclose(weights.to_numpy(dtype=float), DEFAULT_EXTREMA_WEIGHT):
+
+ if np.allclose(weights_values, DEFAULT_EXTREMA_WEIGHT):
return extrema
+
return extrema * weights
np_array = series.to_numpy()
if np_array.size == 0:
return np.nan
+ finite_mask = np.isfinite(np_array)
+ if not finite_mask.any():
+ return np.nan
if np.isclose(alpha, 0.0):
return np.nanmean(np_array)
scaled_np_array = alpha * np_array
- max_scaled_np_array = np.max(scaled_np_array)
- if np.isinf(max_scaled_np_array):
- return np_array[np.argmax(scaled_np_array)]
+ max_scaled_np_array = np.nanmax(scaled_np_array)
+ if np.isinf(max_scaled_np_array) or np.isnan(max_scaled_np_array):
+ return np_array[np.nanargmax(scaled_np_array)]
shifted_exponentials = np.exp(scaled_np_array - max_scaled_np_array)
- numerator = np.sum(np_array * shifted_exponentials)
- denominator = np.sum(shifted_exponentials)
- if denominator == 0:
- return np.max(np_array)
- return numerator / denominator
+ sum_exponentials = np.nansum(shifted_exponentials)
+ if sum_exponentials == 0:
+ return np.nanmax(np_array)
+ return nan_average(np_array, weights=shifted_exponentials)
@lru_cache(maxsize=8)