From 00710b8508cd6fabc677c358dfa1ae7f062c5b15 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 11 Dec 2025 16:30:01 +0100 Subject: [PATCH] feat(qav3): Add volume-weighted amplitude extrema weighting strategy MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 6 +- quickadapter/user_data/config-template.json | 4 +- .../freqaimodels/QuickAdapterRegressorV3.py | 54 +++-- .../user_data/strategies/QuickAdapterV3.py | 46 +++-- quickadapter/user_data/strategies/Utils.py | 193 +++++++++++++++--- 5 files changed, 232 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index e0bb105..6315e04 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ docker compose up -d --build | freqai.extrema_smoothing.mode | `mirror` | enum {`mirror`,`constant`,`nearest`,`wrap`,`interp`} | Boundary mode for `savgol` and `nadaraya_watson`. | | freqai.extrema_smoothing.bandwidth | 1.0 | float > 0 | Gaussian bandwidth for `nadaraya_watson`. | | _Extrema weighting_ | | | | -| freqai.extrema_weighting.strategy | `none` | enum {`none`,`amplitude`,`amplitude_threshold_ratio`} | Extrema weighting source: unweighted (`none`), swing amplitude (`amplitude`), or volatility-threshold ratio adjusted swing amplitude (`amplitude_threshold_ratio`). | +| freqai.extrema_weighting.strategy | `none` | enum {`none`,`amplitude`,`amplitude_threshold_ratio`,`volume_weighted_amplitude`} | Extrema weighting source: unweighted (`none`), swing amplitude (`amplitude`), volatility-threshold ratio adjusted swing amplitude (`amplitude_threshold_ratio`), or volume-weighted amplitude (`volume_weighted_amplitude`). | | freqai.extrema_weighting.standardization | `none` | enum {`none`,`zscore`,`robust`,`mmad`} | Standardization method applied before normalization. `none`=no standardization, `zscore`=(w-μ)/σ, `robust`=(w-median)/IQR, `mmad`=(w-median)/MAD. | | freqai.extrema_weighting.robust_quantiles | [0.25, 0.75] | list[float] where 0 <= Q1 < Q3 <= 1 | Quantile range for robust standardization, Q1 and Q3. | | freqai.extrema_weighting.mmad_scaling_factor | 1.4826 | float > 0 | Scaling factor for MMAD standardization. | @@ -85,14 +85,14 @@ docker compose up -d --build | freqai.feature_parameters.label_frequency_candles | `auto` | int >= 2 \| `auto` | Reversals labeling frequency. `auto` = max(2, 2 \* number of whitelisted pairs). | | freqai.feature_parameters.label_metric | `euclidean` | string (supported: `euclidean`,`minkowski`,`cityblock`,`chebyshev`,`mahalanobis`,`seuclidean`,`jensenshannon`,`sqeuclidean`,...) | Metric used in distance calculations to ideal point. | | freqai.feature_parameters.label_weights | [1/3,1/3,1/3] | list[float] | Per-objective weights used in distance calculations to ideal point. First objective is the number of detected reversals. Second objective is the median swing amplitude of Zigzag reversals (reversals quality). Third objective is the median volatility-threshold ratio adjusted swing amplitude. | -| freqai.feature_parameters.label_p_order | `None` | float | p-order used by Minkowski / power-mean calculations (optional). | +| freqai.feature_parameters.label_p_order | `None` | float \| None | p-order used by `minkowski` / `power_mean` (optional). | | freqai.feature_parameters.label_medoid_metric | `euclidean` | string | Metric used with `medoid`. | | freqai.feature_parameters.label_kmeans_metric | `euclidean` | string | Metric used for k-means clustering. | | freqai.feature_parameters.label_kmeans_selection | `min` | enum {`min`,`medoid`} | Strategy to select trial in the best kmeans cluster. | | freqai.feature_parameters.label_kmedoids_metric | `euclidean` | string | Metric used for k-medoids clustering. | | freqai.feature_parameters.label_kmedoids_selection | `min` | enum {`min`,`medoid`} | Strategy to select trial in the best k-medoids cluster. | | freqai.feature_parameters.label_knn_metric | `minkowski` | string | Distance metric for KNN. | -| freqai.feature_parameters.label_knn_p_order | `None` | float | p-order for KNN Minkowski metric distance. (optional) | +| freqai.feature_parameters.label_knn_p_order | `None` | float \| None | Tunable for KNN neighbor distances aggregation methods: p-order (`knn_power_mean`, default: 1.0) or quantile (`knn_quantile`, default: 0.5). (optional) | | freqai.feature_parameters.label_knn_n_neighbors | 5 | int >= 1 | Number of neighbors for KNN. | | _Predictions extrema_ | | | | | freqai.predictions_extrema.selection_method | `rank` | enum {`rank`,`values`,`partition`} | Extrema selection method. `values` uses reversal values, `rank` uses ranked extrema values, `partition` uses sign-based partitioning. | diff --git a/quickadapter/user_data/config-template.json b/quickadapter/user_data/config-template.json index 0bda39e..3834791 100644 --- a/quickadapter/user_data/config-template.json +++ b/quickadapter/user_data/config-template.json @@ -125,8 +125,8 @@ "data_kitchen_thread_count": 6, // set to number of CPU threads / 4 "track_performance": false, "extrema_weighting": { - "strategy": "amplitude_threshold_ratio", - "gamma": 1.5 + "strategy": "volume_weighted_amplitude", + "gamma": 1.75 }, "extrema_smoothing": { "method": "kaiser", diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index b3f4248..92075be 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -142,7 +142,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): "kmeans2", "kmedoids", "knn_power_mean", - "knn_percentile", + "knn_quantile", "knn_min", "knn_max", "medoid", @@ -1078,14 +1078,14 @@ class QuickAdapterRegressorV3(BaseRegressionModel): if pred_minima.empty: min_val = np.nan else: - min_val = np.median(pred_minima.to_numpy()) + min_val = np.nanmedian(pred_minima.to_numpy()) if not np.isfinite(min_val): min_val = QuickAdapterRegressorV3.safe_min_pred(pred_extrema) if pred_maxima.empty: max_val = np.nan else: - max_val = np.median(pred_maxima.to_numpy()) + max_val = np.nanmedian(pred_maxima.to_numpy()) if not np.isfinite(max_val): max_val = QuickAdapterRegressorV3.safe_max_pred(pred_extrema) @@ -1133,7 +1133,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): or np.unique(values).size < 3 or np.allclose(values, values[0]) ): - return np.median(values) + return np.nanmedian(values) try: return threshold_func(values) except Exception as e: @@ -1141,7 +1141,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): f"Failed to apply skimage threshold function {threshold_func.__name__} on series {series.name}: {repr(e)}. Falling back to median", exc_info=True, ) - return np.median(values) + return np.nanmedian(values) @staticmethod def _pairwise_distance_sums( @@ -1375,7 +1375,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): QuickAdapterRegressorV3._CUSTOM_METRICS[10], # "kmeans2" QuickAdapterRegressorV3._CUSTOM_METRICS[11], # "kmedoids" QuickAdapterRegressorV3._CUSTOM_METRICS[12], # "knn_power_mean" - QuickAdapterRegressorV3._CUSTOM_METRICS[13], # "knn_percentile" + QuickAdapterRegressorV3._CUSTOM_METRICS[13], # "knn_quantile" QuickAdapterRegressorV3._CUSTOM_METRICS[14], # "knn_min" QuickAdapterRegressorV3._CUSTOM_METRICS[15], # "knn_max" }: @@ -1407,7 +1407,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): }: np_sqrt_normalized_matrix = np.sqrt(normalized_matrix) if metric == QuickAdapterRegressorV3._CUSTOM_METRICS[1]: # "shellinger" - variances = np.var(np_sqrt_normalized_matrix, axis=0, ddof=1) + variances = np.nanvar(np_sqrt_normalized_matrix, axis=0, ddof=1) if np.any(variances <= 0): raise ValueError( "shellinger metric requires non-zero variance for all objectives" @@ -1656,7 +1656,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): return trial_distances elif metric in { QuickAdapterRegressorV3._CUSTOM_METRICS[12], # "knn_power_mean" - QuickAdapterRegressorV3._CUSTOM_METRICS[13], # "knn_percentile" + QuickAdapterRegressorV3._CUSTOM_METRICS[13], # "knn_quantile" QuickAdapterRegressorV3._CUSTOM_METRICS[14], # "knn_min" QuickAdapterRegressorV3._CUSTOM_METRICS[15], # "knn_max" }: @@ -1701,17 +1701,17 @@ class QuickAdapterRegressorV3(BaseRegressionModel): return sp.stats.pmean(neighbor_distances, p=label_knn_p_order, axis=1) elif ( metric == QuickAdapterRegressorV3._CUSTOM_METRICS[13] - ): # "knn_percentile" + ): # "knn_quantile" label_knn_p_order = ( label_knn_p_order if label_knn_p_order is not None and np.isfinite(label_knn_p_order) - else 50.0 + else 0.5 ) - return np.percentile(neighbor_distances, label_knn_p_order, axis=1) + return np.nanquantile(neighbor_distances, label_knn_p_order, axis=1) elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[14]: # "knn_min" - return np.min(neighbor_distances, axis=1) + return np.nanmin(neighbor_distances, axis=1) elif metric == QuickAdapterRegressorV3._CUSTOM_METRICS[15]: # "knn_max" - return np.max(neighbor_distances, axis=1) + return np.nanmax(neighbor_distances, axis=1) else: raise ValueError( f"Unsupported label metric: {metric}. Supported metrics are {', '.join(metrics)}" @@ -2120,7 +2120,7 @@ def train_objective( test_length, fit_live_predictions_candles ) logger.info(f"{test_length=}, {n_test_extrema=}, {min_test_extrema=}") - min_test_period_candles: int = fit_live_predictions_candles * 2 + min_test_period_candles: int = fit_live_predictions_candles * 4 if test_length < min_test_period_candles: logger.warning( f"Insufficient test data: {test_length} < {min_test_period_candles}" @@ -2290,20 +2290,34 @@ def label_objective( if df.empty: return 0, 0.0, 0.0 - _, pivots_values, _, pivots_amplitudes, pivots_amplitude_threshold_ratios = zigzag( + ( + _, + pivots_values, + _, + _, + pivots_amplitude_threshold_ratios, + _, + _, + pivots_volume_weighted_amplitudes, + ) = zigzag( df, natr_period=label_period_candles, natr_ratio=label_natr_ratio, ) - median_amplitude = np.nanmedian(np.asarray(pivots_amplitudes, dtype=float)) - if not np.isfinite(median_amplitude): - median_amplitude = 0.0 - + median_volume_weighted_amplitude = np.nanmedian( + np.asarray(pivots_volume_weighted_amplitudes, dtype=float) + ) + if not np.isfinite(median_volume_weighted_amplitude): + median_volume_weighted_amplitude = 0.0 median_amplitude_threshold_ratio = np.nanmedian( np.asarray(pivots_amplitude_threshold_ratios, dtype=float) ) if not np.isfinite(median_amplitude_threshold_ratio): median_amplitude_threshold_ratio = 0.0 - return len(pivots_values), median_amplitude, median_amplitude_threshold_ratio + return ( + len(pivots_values), + median_volume_weighted_amplitude, + median_amplitude_threshold_ratio, + ) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index c6fea7c..2cf39d6 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -22,6 +22,7 @@ from freqtrade.exchange import timeframe_to_minutes, timeframe_to_prev_date from freqtrade.persistence import Trade from freqtrade.strategy import stoploss_from_absolute from freqtrade.strategy.interface import IStrategy +from numpy.typing import NDArray from pandas import DataFrame, Series, isna from scipy.stats import t from technical.pivots_points import pivots_points @@ -34,8 +35,8 @@ from Utils import ( MINIMA_THRESHOLD_COLUMN, NORMALIZATION_TYPES, RANK_METHODS, - SMOOTHING_MODES, SMOOTHING_METHODS, + SMOOTHING_MODES, STANDARDIZATION_TYPES, WEIGHT_STRATEGIES, TrendDirection, @@ -885,17 +886,24 @@ class QuickAdapterV3(IStrategy): def _get_weights( strategy: WeightStrategy, amplitudes: list[float], + volume_weighted_amplitudes: list[float], amplitude_threshold_ratios: list[float], - ) -> list[float]: + ) -> NDArray[np.floating]: if strategy == WEIGHT_STRATEGIES[1]: # "amplitude" - return amplitudes + return np.array(amplitudes) if strategy == WEIGHT_STRATEGIES[2]: # "amplitude_threshold_ratio" return ( - amplitude_threshold_ratios + np.array(amplitude_threshold_ratios) if len(amplitude_threshold_ratios) == len(amplitudes) - else amplitudes + else np.array(amplitudes) + ) + if strategy == WEIGHT_STRATEGIES[3]: # "volume_weighted_amplitude" + return ( + np.array(volume_weighted_amplitudes) + if len(volume_weighted_amplitudes) == len(amplitudes) + else np.array(amplitudes) ) - return [] + return np.array([]) def set_freqai_targets( self, dataframe: DataFrame, metadata: dict[str, Any], **kwargs @@ -909,6 +917,9 @@ class QuickAdapterV3(IStrategy): pivots_directions, pivots_amplitudes, pivots_amplitude_threshold_ratios, + _, + _, + pivots_volume_weighted_amplitude, ) = zigzag( dataframe, natr_period=label_period_candles, @@ -937,12 +948,13 @@ class QuickAdapterV3(IStrategy): pivot_weights = QuickAdapterV3._get_weights( self.extrema_weighting["strategy"], pivots_amplitudes, + pivots_volume_weighted_amplitude, pivots_amplitude_threshold_ratios, ) weighted_extrema, _ = get_weighted_extrema( extrema=dataframe[EXTREMA_COLUMN], indices=pivots_indices, - weights=np.array(pivot_weights), + weights=pivot_weights, strategy=self.extrema_weighting["strategy"], standardization=self.extrema_weighting["standardization"], robust_quantiles=self.extrema_weighting["robust_quantiles"], @@ -1106,7 +1118,7 @@ class QuickAdapterV3(IStrategy): total_weight = entry_weight + current_weight + median_weight if np.isclose(total_weight, 0.0): - return np.mean([entry_natr, current_natr, median_natr]) + return np.nanmean([entry_natr, current_natr, median_natr]) entry_weight /= total_weight current_weight /= total_weight median_weight /= total_weight @@ -1849,14 +1861,14 @@ class QuickAdapterV3(IStrategy): unrealized_pnl_history = np.asarray(unrealized_pnl_history) velocity = np.diff(unrealized_pnl_history) - velocity_std = np.std(velocity, ddof=1) if velocity.size > 1 else 0.0 + velocity_std = np.nanstd(velocity, ddof=1) if velocity.size > 1 else 0.0 acceleration = np.diff(velocity) acceleration_std = ( - np.std(acceleration, ddof=1) if acceleration.size > 1 else 0.0 + np.nanstd(acceleration, ddof=1) if acceleration.size > 1 else 0.0 ) - mean_velocity = np.mean(velocity) if velocity.size > 0 else 0.0 - mean_acceleration = np.mean(acceleration) if acceleration.size > 0 else 0.0 + mean_velocity = np.nanmean(velocity) if velocity.size > 0 else 0.0 + mean_acceleration = np.nanmean(acceleration) if acceleration.size > 0 else 0.0 if window_size > 0 and len(unrealized_pnl_history) > window_size: recent_unrealized_pnl_history = unrealized_pnl_history[-window_size:] @@ -1865,18 +1877,20 @@ class QuickAdapterV3(IStrategy): recent_velocity = np.diff(recent_unrealized_pnl_history) recent_velocity_std = ( - np.std(recent_velocity, ddof=1) if recent_velocity.size > 1 else 0.0 + np.nanstd(recent_velocity, ddof=1) if recent_velocity.size > 1 else 0.0 ) recent_acceleration = np.diff(recent_velocity) recent_acceleration_std = ( - np.std(recent_acceleration, ddof=1) if recent_acceleration.size > 1 else 0.0 + np.nanstd(recent_acceleration, ddof=1) + if recent_acceleration.size > 1 + else 0.0 ) recent_mean_velocity = ( - np.mean(recent_velocity) if recent_velocity.size > 0 else 0.0 + np.nanmean(recent_velocity) if recent_velocity.size > 0 else 0.0 ) recent_mean_acceleration = ( - np.mean(recent_acceleration) if recent_acceleration.size > 0 else 0.0 + np.nanmean(recent_acceleration) if recent_acceleration.size > 0 else 0.0 ) return ( diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 616f2f6..ddea71e 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -19,11 +19,17 @@ from technical import qtpylib T = TypeVar("T", pd.Series, float) -WeightStrategy = Literal["none", "amplitude", "amplitude_threshold_ratio"] +WeightStrategy = Literal[ + "none", + "amplitude", + "amplitude_threshold_ratio", + "volume_weighted_amplitude", +] WEIGHT_STRATEGIES: Final[tuple[WeightStrategy, ...]] = ( "none", "amplitude", "amplitude_threshold_ratio", + "volume_weighted_amplitude", ) EXTREMA_COLUMN: Final = "&s-extrema" @@ -178,7 +184,7 @@ def _calculate_coeffs( return coeffs / np.sum(coeffs) -def zero_phase( +def zero_phase_filter( series: pd.Series, window: int, win_type: SmoothingKernel, @@ -219,7 +225,7 @@ def smooth_extrema( std = get_gaussian_std(odd_window) if method == SMOOTHING_METHODS[0]: # "gaussian" - return zero_phase( + return zero_phase_filter( series=series, window=odd_window, win_type=SMOOTHING_METHODS[0], @@ -227,7 +233,7 @@ def smooth_extrema( beta=beta, ) elif method == SMOOTHING_METHODS[1]: # "kaiser" - return zero_phase( + return zero_phase_filter( series=series, window=odd_window, win_type=SMOOTHING_METHODS[1], @@ -235,7 +241,7 @@ def smooth_extrema( beta=beta, ) elif method == SMOOTHING_METHODS[2]: # "triang" - return zero_phase( + return zero_phase_filter( series=series, window=odd_window, win_type=SMOOTHING_METHODS[2], @@ -262,7 +268,7 @@ def smooth_extrema( elif method == SMOOTHING_METHODS[6]: # "nadaraya_watson" return nadaraya_watson(series, bandwidth, mode) else: - return zero_phase( + return zero_phase_filter( series=series, window=odd_window, win_type=SMOOTHING_METHODS[0], @@ -310,8 +316,8 @@ def _standardize_robust( if np.isnan(weights).any(): return np.zeros_like(weights, dtype=float) - median = np.median(weights) - q1, q3 = np.quantile(weights, quantiles) + median = np.nanmedian(weights) + q1, q3 = np.nanquantile(weights, quantiles) iqr = q3 - q1 if np.isclose(iqr, 0.0): @@ -332,8 +338,8 @@ def _standardize_mmad( if np.isnan(weights).any(): return np.zeros_like(weights, dtype=float) - median = np.median(weights) - mad = np.median(np.abs(weights - median)) + median = np.nanmedian(weights) + mad = np.nanmedian(np.abs(weights - median)) if np.isclose(mad, 0.0): return np.zeros_like(weights, dtype=float) @@ -658,7 +664,8 @@ def get_weighted_extrema( if strategy in { WEIGHT_STRATEGIES[1], WEIGHT_STRATEGIES[2], - }: # "amplitude" or "amplitude_threshold_ratio" + WEIGHT_STRATEGIES[3], + }: # "amplitude" or "amplitude_threshold_ratio" or "volume_weighted_amplitude" extrema_weights = calculate_extrema_weights( series=extrema, indices=indices, @@ -1089,10 +1096,28 @@ def zigzag( df: pd.DataFrame, natr_period: int = 14, natr_ratio: float = 9.0, -) -> tuple[list[int], list[float], list[TrendDirection], list[float], list[float]]: +) -> tuple[ + list[int], + list[float], + list[TrendDirection], + list[float], + list[float], + list[float], + list[float], + list[float], +]: n = len(df) if df.empty or n < natr_period: - return [], [], [], [], [] + return ( + [], + [], + [], + [], + [], + [], + [], + [], + ) natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy() @@ -1102,6 +1127,7 @@ def zigzag( log_closes = np.log(closes) highs = df.get("high").to_numpy() lows = df.get("low").to_numpy() + volumes = df.get("volume").to_numpy() state: TrendDirection = TrendDirection.NEUTRAL @@ -1110,6 +1136,9 @@ def zigzag( pivots_directions: list[TrendDirection] = [] pivots_amplitudes: list[float] = [] pivots_amplitude_threshold_ratios: list[float] = [] + pivots_volume_spike_ratios: list[float] = [] + pivots_volume_quantiles: list[float] = [] + pivots_volume_weighted_amplitudes: list[float] = [] last_pivot_pos: int = -1 candidate_pivot_pos: int = -1 @@ -1119,8 +1148,9 @@ def zigzag( def calculate_volatility_quantile(pos: int) -> float: if pos not in volatility_quantile_cache: - start_pos = max(0, pos + 1 - natr_period) - end_pos = min(pos + 1, n) + pos_plus_1 = pos + 1 + start_pos = max(0, pos_plus_1 - natr_period) + end_pos = min(pos_plus_1, n) if start_pos >= end_pos: volatility_quantile_cache[pos] = np.nan else: @@ -1130,6 +1160,22 @@ def zigzag( return volatility_quantile_cache[pos] + volume_quantile_cache: dict[int, float] = {} + + def calculate_volume_quantile(pos: int) -> float: + if pos not in volume_quantile_cache: + pos_plus_1 = pos + 1 + start_pos = max(0, pos_plus_1 - natr_period) + end_pos = min(pos_plus_1, n) + if start_pos >= end_pos: + volume_quantile_cache[pos] = np.nan + else: + volume_quantile_cache[pos] = calculate_quantile( + volumes[start_pos:end_pos], volumes[pos] + ) + + return volume_quantile_cache[pos] + def calculate_slopes_ok_threshold( pos: int, min_threshold: float = 0.75, @@ -1152,6 +1198,80 @@ def zigzag( candidate_pivot_pos = -1 candidate_pivot_value = np.nan + def calculate_pivot_amplitude(current_value: float, previous_value: float) -> float: + if np.isclose(previous_value, 0.0): + return np.nan + return abs(current_value - previous_value) / abs(previous_value) + + def calculate_pivot_amplitude_threshold_ratio( + amplitude: float, threshold: float + ) -> float: + if np.isfinite(threshold) and threshold > 0 and np.isfinite(amplitude): + return amplitude / threshold + return np.nan + + def apply_weight_transform(weight: float, transform_type: str = "log1p") -> float: + if not np.isfinite(weight): + return np.nan + + if transform_type == "log1p": + if weight < 0: + return np.nan + return np.log1p(weight) + + elif transform_type == "sqrt": + if weight < 0: + return np.nan + return np.sqrt(weight) + + elif transform_type == "identity": + return weight + + elif transform_type == "rational": + return weight / (1 + weight) + + elif transform_type == "log10p": + if weight < 0: + return np.nan + return np.log10(1 + weight) + + else: + return weight + + def calculate_pivot_volume_metrics( + pos: int, amplitude: float + ) -> tuple[float, float, float]: + if pos < 0 or pos >= n: + return np.nan, np.nan, np.nan + + pivot_volume = volumes[pos] + + start_pos = max(0, pos - natr_period) + if start_pos >= pos: + volume_spike_ratio = np.nan + else: + volumes_slice = volumes[start_pos:pos] + if volumes_slice.size == 0 or np.all(np.isnan(volumes_slice)): + volume_spike_ratio = np.nan + else: + mean_volume = np.nanmean(volumes_slice) + if mean_volume > 0 and np.isfinite(mean_volume): + volume_spike_ratio = pivot_volume / mean_volume + else: + volume_spike_ratio = np.nan + + volume_quantile = calculate_volume_quantile(pos) + + transformed_volume_spike_ratio = apply_weight_transform( + volume_spike_ratio, "log1p" + ) + if np.isfinite(transformed_volume_spike_ratio) and np.isfinite(amplitude): + volume_weighted_amplitude = amplitude * transformed_volume_spike_ratio + else: + volume_weighted_amplitude = np.nan + + return volume_spike_ratio, volume_quantile, volume_weighted_amplitude + def add_pivot(pos: int, value: float, direction: TrendDirection): nonlocal last_pivot_pos if pivots_indices and indices[pos] == pivots_indices[-1]: @@ -1159,26 +1279,27 @@ def zigzag( pivots_indices.append(indices[pos]) pivots_values.append(value) pivots_directions.append(direction) + if len(pivots_values) > 1: prev_pivot_value = pivots_values[-2] - if np.isclose(prev_pivot_value, 0.0): - amplitude = np.nan - else: - amplitude = abs(value - prev_pivot_value) / abs(prev_pivot_value) - current_threshold = thresholds[pos] - if ( - np.isfinite(current_threshold) - and current_threshold > 0 - and np.isfinite(amplitude) - ): - amplitude_threshold_ratio = amplitude / current_threshold - else: - amplitude_threshold_ratio = np.nan + amplitude = calculate_pivot_amplitude(value, prev_pivot_value) + amplitude_threshold_ratio = calculate_pivot_amplitude_threshold_ratio( + amplitude, thresholds[pos] + ) else: amplitude = np.nan amplitude_threshold_ratio = np.nan + + volume_spike_ratio, volume_quantile, volume_weighted_amplitude = ( + calculate_pivot_volume_metrics(pos, amplitude) + ) + pivots_amplitudes.append(amplitude) pivots_amplitude_threshold_ratios.append(amplitude_threshold_ratio) + pivots_volume_spike_ratios.append(volume_spike_ratio) + pivots_volume_quantiles.append(volume_quantile) + pivots_volume_weighted_amplitudes.append(volume_weighted_amplitude) + last_pivot_pos = pos reset_candidate_pivot() @@ -1296,7 +1417,16 @@ def zigzag( state = TrendDirection.UP break else: - return [], [], [], [], [] + return ( + [], + [], + [], + [], + [], + [], + [], + [], + ) for i in range(last_pivot_pos + 1, n): current_high = highs[i] @@ -1332,6 +1462,9 @@ def zigzag( pivots_directions, pivots_amplitudes, pivots_amplitude_threshold_ratios, + pivots_volume_spike_ratios, + pivots_volume_quantiles, + pivots_volume_weighted_amplitudes, ) @@ -1602,7 +1735,7 @@ def soft_extremum(series: pd.Series, alpha: float) -> float: if np_array.size == 0: return np.nan if np.isclose(alpha, 0.0): - return np.mean(np_array) + return np.nanmean(np_array) scaled_np_array = alpha * np_array max_scaled_np_array = np.max(scaled_np_array) if np.isinf(max_scaled_np_array): -- 2.43.0