From b0ffbb6e89ada91826bc1e5d723a448b7b1462cf Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 27 Apr 2025 13:42:54 +0200 Subject: [PATCH] fix(qav3): fix zigzag initial pivot identification MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../freqaimodels/QuickAdapterRegressorV3.py | 50 +++++++++++-------- .../user_data/strategies/QuickAdapterV3.py | 2 +- quickadapter/user_data/strategies/Utils.py | 48 ++++++++++-------- 3 files changed, 58 insertions(+), 42 deletions(-) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 0e9c109..35d1e36 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -1,3 +1,4 @@ +from enum import IntEnum import logging import json import time @@ -44,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.18" + version = "3.7.19" @cached_property def _optuna_config(self) -> dict: @@ -859,18 +860,23 @@ def zigzag( highs = df["high"].values lows = df["low"].values + class TrendDirection(IntEnum): + NEUTRAL = 0 + UP = 1 + DOWN = -1 + + state: TrendDirection = TrendDirection.NEUTRAL pivots_indices, pivots_values, pivots_directions = [], [], [] - state = 0 # 0=neutral, 1=up, -1=down last_pivot_pos = -depth - def add_pivot(pos: int, value: float, direction: int): + def add_pivot(pos: int, value: float, direction: TrendDirection): nonlocal last_pivot_pos pivots_indices.append(indices[pos]) pivots_values.append(value) pivots_directions.append(direction) last_pivot_pos = pos - def update_last_pivot(pos: int, value: float, direction: int): + def update_last_pivot(pos: int, value: float, direction: TrendDirection): if pivots_indices: pivots_indices[-1] = indices[pos] pivots_values[-1] = value @@ -886,42 +892,42 @@ def zigzag( if lows[i] < initial_low: initial_low, initial_low_pos = lows[i], i - if (highs[i] - initial_low) / initial_low > thresholds[i]: - add_pivot(initial_low_pos, initial_low, -1) - state = 1 + if (initial_high - lows[i]) / initial_high >= thresholds[i]: + add_pivot(initial_high_pos, initial_high, TrendDirection.UP) + state = TrendDirection.DOWN break - elif (initial_high - lows[i]) / initial_high > thresholds[i]: - add_pivot(initial_high_pos, initial_high, 1) - state = -1 + elif (highs[i] - initial_low) / initial_low >= thresholds[i]: + add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) + state = TrendDirection.UP break else: return [], [], [] for i in range(i + 1, len(df)): last_pivot_val = pivots_values[-1] - if state == 1: + if state == TrendDirection.UP: if highs[i] > last_pivot_val: - update_last_pivot(i, highs[i], 1) + update_last_pivot(i, highs[i], TrendDirection.UP) elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and ( i - last_pivot_pos ) >= depth: - add_pivot(i, lows[i], -1) - state = -1 - elif state == -1: + add_pivot(i, lows[i], TrendDirection.DOWN) + state = TrendDirection.DOWN + elif state == TrendDirection.DOWN: if lows[i] < last_pivot_val: - update_last_pivot(i, lows[i], -1) + update_last_pivot(i, lows[i], TrendDirection.DOWN) elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and ( i - last_pivot_pos ) >= depth: - add_pivot(i, highs[i], 1) - state = 1 + add_pivot(i, highs[i], TrendDirection.UP) + state = TrendDirection.UP - if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth: + if state != TrendDirection.NEUTRAL and (len(df) - 1 - last_pivot_pos) >= depth: final_pos = len(df) - 1 last_pivot_val = pivots_values[-1] price_move = ( (highs[final_pos] - last_pivot_val) / last_pivot_val - if state == 1 + if state == TrendDirection.UP else (last_pivot_val - lows[final_pos]) / last_pivot_val ) @@ -930,7 +936,9 @@ def zigzag( and indices[final_pos] != pivots_indices[-1] ): add_pivot( - final_pos, highs[final_pos] if state == 1 else lows[final_pos], state + final_pos, + highs[final_pos] if state == TrendDirection.UP else lows[final_pos], + state, ) return pivots_indices, pivots_values, pivots_directions diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 42b26d1..8a50f59 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.12" + return "3.3.13" timeframe = "5m" diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 7d13607..0f15c17 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -1,3 +1,4 @@ +from enum import IntEnum import numpy as np import pandas as pd import talib.abstract as ta @@ -314,18 +315,23 @@ def zigzag( highs = df["high"].values lows = df["low"].values + class TrendDirection(IntEnum): + NEUTRAL = 0 + UP = 1 + DOWN = -1 + + state: TrendDirection = TrendDirection.NEUTRAL pivots_indices, pivots_values, pivots_directions = [], [], [] - state = 0 # 0=neutral, 1=up, -1=down last_pivot_pos = -depth - def add_pivot(pos: int, value: float, direction: int): + def add_pivot(pos: int, value: float, direction: TrendDirection): nonlocal last_pivot_pos pivots_indices.append(indices[pos]) pivots_values.append(value) pivots_directions.append(direction) last_pivot_pos = pos - def update_last_pivot(pos: int, value: float, direction: int): + def update_last_pivot(pos: int, value: float, direction: TrendDirection): if pivots_indices: pivots_indices[-1] = indices[pos] pivots_values[-1] = value @@ -341,42 +347,42 @@ def zigzag( if lows[i] < initial_low: initial_low, initial_low_pos = lows[i], i - if (highs[i] - initial_low) / initial_low > thresholds[i]: - add_pivot(initial_low_pos, initial_low, -1) - state = 1 + if (initial_high - lows[i]) / initial_high >= thresholds[i]: + add_pivot(initial_high_pos, initial_high, TrendDirection.UP) + state = TrendDirection.DOWN break - elif (initial_high - lows[i]) / initial_high > thresholds[i]: - add_pivot(initial_high_pos, initial_high, 1) - state = -1 + elif (highs[i] - initial_low) / initial_low >= thresholds[i]: + add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) + state = TrendDirection.UP break else: return [], [], [] for i in range(i + 1, len(df)): last_pivot_val = pivots_values[-1] - if state == 1: + if state == TrendDirection.UP: if highs[i] > last_pivot_val: - update_last_pivot(i, highs[i], 1) + update_last_pivot(i, highs[i], TrendDirection.UP) elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and ( i - last_pivot_pos ) >= depth: - add_pivot(i, lows[i], -1) - state = -1 - elif state == -1: + add_pivot(i, lows[i], TrendDirection.DOWN) + state = TrendDirection.DOWN + elif state == TrendDirection.DOWN: if lows[i] < last_pivot_val: - update_last_pivot(i, lows[i], -1) + update_last_pivot(i, lows[i], TrendDirection.DOWN) elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and ( i - last_pivot_pos ) >= depth: - add_pivot(i, highs[i], 1) - state = 1 + add_pivot(i, highs[i], TrendDirection.UP) + state = TrendDirection.UP - if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth: + if state != TrendDirection.NEUTRAL and (len(df) - 1 - last_pivot_pos) >= depth: final_pos = len(df) - 1 last_pivot_val = pivots_values[-1] price_move = ( (highs[final_pos] - last_pivot_val) / last_pivot_val - if state == 1 + if state == TrendDirection.UP else (last_pivot_val - lows[final_pos]) / last_pivot_val ) @@ -385,7 +391,9 @@ def zigzag( and indices[final_pos] != pivots_indices[-1] ): add_pivot( - final_pos, highs[final_pos] if state == 1 else lows[final_pos], state + final_pos, + highs[final_pos] if state == TrendDirection.UP else lows[final_pos], + state, ) return pivots_indices, pivots_values, pivots_directions -- 2.43.0