https://github.com/sponsors/robcaulk
"""
- version = "3.7.28"
+ version = "3.7.29"
@cached_property
def _optuna_config(self) -> dict:
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 1.0,
- confirmation_window: int = 3,
+ confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if df.empty or len(df) < natr_period + confirmation_window:
+ if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1):
return [], [], []
indices = df.index.tolist()
lows = df["low"].values
state: TrendDirection = TrendDirection.NEUTRAL
- pivots_indices, pivots_values, pivots_directions = [], [], []
+
last_pivot_pos = -depth - 1
+ pivots_indices, pivots_values, pivots_directions = [], [], []
+
+ candidate_pivot_pos = -1
+ candidate_pivot_value = np.nan
+ candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
+
+ def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
+ nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+ candidate_pivot_pos = pos
+ candidate_pivot_value = value
+ candidate_pivot_direction = direction
def add_pivot(pos: int, value: float, direction: TrendDirection):
nonlocal last_pivot_pos
pivots_values.append(value)
pivots_directions.append(direction)
last_pivot_pos = pos
-
- def update_last_pivot(pos: int, value: float, direction: TrendDirection):
- if pivots_indices and indices[pos] != pivots_indices[-1]:
- pivots_indices[-1] = indices[pos]
- pivots_values[-1] = value
- pivots_directions[-1] = direction
+ update_candidate_pivot(
+ pos,
+ value,
+ TrendDirection.UP
+ if direction == TrendDirection.DOWN
+ else TrendDirection.DOWN,
+ )
def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
- if pos + confirmation_window >= len(df):
+ if pos - confirmation_window < 0 or pos + confirmation_window >= len(df):
return False
- next_closes = closes[pos + 1 : pos + confirmation_window + 1]
+ next_slice = slice(pos + 1, pos + confirmation_window + 1)
+ next_closes = closes[next_slice]
+ next_highs = highs[next_slice]
+ next_lows = lows[next_slice]
+ previous_slice = slice(pos - confirmation_window, pos)
+ previous_closes = closes[previous_slice]
+ previous_highs = highs[previous_slice]
+ previous_lows = lows[previous_slice]
+
if direction == TrendDirection.DOWN:
- return np.all(next_closes < highs[pos])
+ return (
+ np.all(next_closes < highs[pos])
+ and np.all(previous_closes < highs[pos])
+ and np.all(next_highs <= highs[pos])
+ and np.all(previous_highs <= highs[pos])
+ )
elif direction == TrendDirection.UP:
- return np.all(next_closes > lows[pos])
+ return (
+ np.all(next_closes > lows[pos])
+ and np.all(previous_closes > lows[pos])
+ and np.all(next_lows >= lows[pos])
+ and np.all(previous_lows >= lows[pos])
+ )
return False
start_pos = 0
initial_high = highs[initial_high_pos]
initial_low = lows[initial_low_pos]
for i in range(start_pos + 1, len(df)):
- if highs[i] > initial_high:
- initial_high, initial_high_pos = highs[i], i
- if lows[i] < initial_low:
- initial_low, initial_low_pos = lows[i], i
- if (
- i - initial_high_pos < confirmation_window
- or i - initial_low_pos < confirmation_window
- ):
- continue
-
- initial_move_from_high = (initial_high - lows[i]) / initial_high
- initial_move_from_low = (highs[i] - initial_low) / initial_low
- if initial_move_from_high >= thresholds[i] and is_reversal_confirmed(
- initial_high_pos, TrendDirection.DOWN
- ):
+ current_high = highs[i]
+ current_low = lows[i]
+ if current_high > initial_high:
+ initial_high, initial_high_pos = current_high, i
+ if current_low < initial_low:
+ initial_low, initial_low_pos = current_low, i
+
+ initial_move_from_high = (initial_high - current_low) / initial_high
+ initial_move_from_low = (current_high - initial_low) / initial_low
+ if initial_move_from_high >= thresholds[
+ initial_high_pos
+ ] and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed(
- initial_low_pos, TrendDirection.UP
- ):
+ elif initial_move_from_low >= thresholds[
+ initial_low_pos
+ ] and is_reversal_confirmed(initial_low_pos, TrendDirection.UP):
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
break
for i in range(last_pivot_pos + 1, len(df)):
current_high = highs[i]
current_low = lows[i]
- last_pivot_val = pivots_values[-1]
+
if state == TrendDirection.UP:
- if current_high > last_pivot_val:
- update_last_pivot(i, current_high, TrendDirection.UP)
- elif (
- (last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
- and (i - last_pivot_pos) > depth
- and is_reversal_confirmed(i, TrendDirection.DOWN)
+ if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
+ update_candidate_pivot(i, current_high, TrendDirection.UP)
+ if (
+ (candidate_pivot_value - current_low) / candidate_pivot_value
+ >= thresholds[candidate_pivot_pos]
+ and (candidate_pivot_pos - last_pivot_pos) >= depth
+ and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.DOWN)
):
- add_pivot(i, current_low, TrendDirection.DOWN)
+ add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
state = TrendDirection.DOWN
elif state == TrendDirection.DOWN:
- if current_low < last_pivot_val:
- update_last_pivot(i, current_low, TrendDirection.DOWN)
- elif (
- (current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
- and (i - last_pivot_pos) > depth
- and is_reversal_confirmed(i, TrendDirection.UP)
+ if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
+ update_candidate_pivot(i, current_low, TrendDirection.DOWN)
+ if (
+ (current_high - candidate_pivot_value) / candidate_pivot_value
+ >= thresholds[candidate_pivot_pos]
+ and (candidate_pivot_pos - last_pivot_pos) >= depth
+ and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.UP)
):
- add_pivot(i, current_high, TrendDirection.UP)
+ add_pivot(
+ candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
+ )
state = TrendDirection.UP
return pivots_indices, pivots_values, pivots_directions
INTERFACE_VERSION = 3
def version(self) -> str:
- return "3.3.22"
+ return "3.3.23"
timeframe = "5m"
last_candle_natr = last_candle["natr_label_period_candles"]
if isna(last_candle_natr):
return False
- entry_price_fluctuation = (
- last_candle_close * last_candle_natr * self.get_entry_natr_ratio(pair)
- )
+ entry_natr_ratio = self.get_entry_natr_ratio(pair)
if side == "long":
- lower_bound = last_candle_low - entry_price_fluctuation
- upper_bound = last_candle_close + entry_price_fluctuation
+ lower_bound = (
+ last_candle_low - last_candle_low * last_candle_natr * entry_natr_ratio
+ )
+ upper_bound = (
+ last_candle_close
+ + last_candle_close * last_candle_natr * entry_natr_ratio
+ )
if lower_bound < 0:
logger.info(
f"User denied {side} entry for {pair}: calculated lower bound {lower_bound} is below zero"
f"User denied {side} entry for {pair}: rate {rate} outside bounds [{lower_bound}, {upper_bound}]"
)
elif side == "short":
- lower_bound = last_candle_close - entry_price_fluctuation
- upper_bound = last_candle_high + entry_price_fluctuation
+ lower_bound = (
+ last_candle_close
+ - last_candle_close * last_candle_natr * entry_natr_ratio
+ )
+ upper_bound = (
+ last_candle_high
+ + last_candle_high * last_candle_natr * entry_natr_ratio
+ )
if lower_bound < 0:
logger.info(
f"User denied {side} entry for {pair}: calculated lower bound {lower_bound} is below zero"
df: pd.DataFrame,
natr_period: int = 14,
natr_ratio: float = 1.0,
- confirmation_window: int = 3,
+ confirmation_window: int = 2,
depth: int = 12,
) -> tuple[list[int], list[float], list[int]]:
- if df.empty or len(df) < natr_period + confirmation_window:
+ if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1):
return [], [], []
indices = df.index.tolist()
lows = df["low"].values
state: TrendDirection = TrendDirection.NEUTRAL
- pivots_indices, pivots_values, pivots_directions = [], [], []
+
last_pivot_pos = -depth - 1
+ pivots_indices, pivots_values, pivots_directions = [], [], []
+
+ candidate_pivot_pos = -1
+ candidate_pivot_value = np.nan
+ candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
+
+ def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
+ nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+ candidate_pivot_pos = pos
+ candidate_pivot_value = value
+ candidate_pivot_direction = direction
def add_pivot(pos: int, value: float, direction: TrendDirection):
nonlocal last_pivot_pos
pivots_values.append(value)
pivots_directions.append(direction)
last_pivot_pos = pos
-
- def update_last_pivot(pos: int, value: float, direction: TrendDirection):
- if pivots_indices and indices[pos] != pivots_indices[-1]:
- pivots_indices[-1] = indices[pos]
- pivots_values[-1] = value
- pivots_directions[-1] = direction
+ update_candidate_pivot(
+ pos,
+ value,
+ TrendDirection.UP
+ if direction == TrendDirection.DOWN
+ else TrendDirection.DOWN,
+ )
def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
- if pos + confirmation_window >= len(df):
+ if pos - confirmation_window < 0 or pos + confirmation_window >= len(df):
return False
- next_closes = closes[pos + 1 : pos + confirmation_window + 1]
+ next_slice = slice(pos + 1, pos + confirmation_window + 1)
+ next_closes = closes[next_slice]
+ next_highs = highs[next_slice]
+ next_lows = lows[next_slice]
+ previous_slice = slice(pos - confirmation_window, pos)
+ previous_closes = closes[previous_slice]
+ previous_highs = highs[previous_slice]
+ previous_lows = lows[previous_slice]
+
if direction == TrendDirection.DOWN:
- return np.all(next_closes < highs[pos])
+ return (
+ np.all(next_closes < highs[pos])
+ and np.all(previous_closes < highs[pos])
+ and np.all(next_highs <= highs[pos])
+ and np.all(previous_highs <= highs[pos])
+ )
elif direction == TrendDirection.UP:
- return np.all(next_closes > lows[pos])
+ return (
+ np.all(next_closes > lows[pos])
+ and np.all(previous_closes > lows[pos])
+ and np.all(next_lows >= lows[pos])
+ and np.all(previous_lows >= lows[pos])
+ )
return False
start_pos = 0
initial_high = highs[initial_high_pos]
initial_low = lows[initial_low_pos]
for i in range(start_pos + 1, len(df)):
- if highs[i] > initial_high:
- initial_high, initial_high_pos = highs[i], i
- if lows[i] < initial_low:
- initial_low, initial_low_pos = lows[i], i
- if (
- i - initial_high_pos < confirmation_window
- or i - initial_low_pos < confirmation_window
- ):
- continue
-
- initial_move_from_high = (initial_high - lows[i]) / initial_high
- initial_move_from_low = (highs[i] - initial_low) / initial_low
- if initial_move_from_high >= thresholds[i] and is_reversal_confirmed(
- initial_high_pos, TrendDirection.DOWN
- ):
+ current_high = highs[i]
+ current_low = lows[i]
+ if current_high > initial_high:
+ initial_high, initial_high_pos = current_high, i
+ if current_low < initial_low:
+ initial_low, initial_low_pos = current_low, i
+
+ initial_move_from_high = (initial_high - current_low) / initial_high
+ initial_move_from_low = (current_high - initial_low) / initial_low
+ if initial_move_from_high >= thresholds[
+ initial_high_pos
+ ] and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN):
add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
state = TrendDirection.DOWN
break
- elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed(
- initial_low_pos, TrendDirection.UP
- ):
+ elif initial_move_from_low >= thresholds[
+ initial_low_pos
+ ] and is_reversal_confirmed(initial_low_pos, TrendDirection.UP):
add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
state = TrendDirection.UP
break
for i in range(last_pivot_pos + 1, len(df)):
current_high = highs[i]
current_low = lows[i]
- last_pivot_val = pivots_values[-1]
+
if state == TrendDirection.UP:
- if current_high > last_pivot_val:
- update_last_pivot(i, current_high, TrendDirection.UP)
- elif (
- (last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
- and (i - last_pivot_pos) > depth
- and is_reversal_confirmed(i, TrendDirection.DOWN)
+ if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
+ update_candidate_pivot(i, current_high, TrendDirection.UP)
+ if (
+ (candidate_pivot_value - current_low) / candidate_pivot_value
+ >= thresholds[candidate_pivot_pos]
+ and (candidate_pivot_pos - last_pivot_pos) >= depth
+ and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.DOWN)
):
- add_pivot(i, current_low, TrendDirection.DOWN)
+ add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
state = TrendDirection.DOWN
elif state == TrendDirection.DOWN:
- if current_low < last_pivot_val:
- update_last_pivot(i, current_low, TrendDirection.DOWN)
- elif (
- (current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
- and (i - last_pivot_pos) > depth
- and is_reversal_confirmed(i, TrendDirection.UP)
+ if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
+ update_candidate_pivot(i, current_low, TrendDirection.DOWN)
+ if (
+ (current_high - candidate_pivot_value) / candidate_pivot_value
+ >= thresholds[candidate_pivot_pos]
+ and (candidate_pivot_pos - last_pivot_pos) >= depth
+ and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.UP)
):
- add_pivot(i, current_high, TrendDirection.UP)
+ add_pivot(
+ candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
+ )
state = TrendDirection.UP
return pivots_indices, pivots_values, pivots_directions