return sp.signal.find_peaks(-series)[0].size + sp.signal.find_peaks(series)[0].size
-def top_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def top_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
"""
- Percentage change of the current close relative to the top close price in the previous `period` bars.
+ Logarithmic return from rolling maximum: log(close / rolling_max).
- :param dataframe: OHLCV DataFrame
- :param period: The previous period window size to look back (>=1)
- :return: The top change percentage series
+ Measures distance below the highest close in previous `period` bars.
+ Returns ≤ 0 (e.g., -0.10 ≈ -9.5% below peak). Zero when at peak.
+
+ :param dataframe: OHLCV DataFrame with 'close' column
+ :param period: Lookback window (>=1)
+ :return: Log return series (≤ 0)
"""
if period < 1:
raise ValueError(f"Invalid period value {period!r}: must be >= 1")
return np.log(dataframe.get("close") / previous_close_top)
-def bottom_change_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
+def bottom_log_return(dataframe: pd.DataFrame, period: int) -> pd.Series:
"""
- Percentage change of the current close relative to the bottom close price in the previous `period` bars.
+ Logarithmic return from rolling minimum: log(close / rolling_min).
+
+ Measures distance above the lowest close in previous `period` bars.
+ Returns ≥ 0 (e.g., +0.10 ≈ +10.5% above bottom). Zero when at bottom.
- :param dataframe: OHLCV DataFrame
- :param period: The previous period window size to look back (>=1)
- :return: The bottom change percentage series
+ :param dataframe: OHLCV DataFrame with 'close' column
+ :param period: Lookback window (>=1)
+ :return: Log return series (≥ 0)
"""
if period < 1:
raise ValueError(f"Invalid period value {period!r}: must be >= 1")
def price_retracement_percent(dataframe: pd.DataFrame, period: int) -> pd.Series:
"""
- Calculate the percentage retracement of the current close within the high/low close price range
- of the previous `period` bars.
+ Normalized position (0-1) of close within rolling high/low range, using log scale.
+
+ Formula: log(close / low) / log(high / low)
- :param dataframe: OHLCV DataFrame
- :param period: Window size for calculating historical closes high/low (>=1)
- :return: Retracement percentage series
+ Returns 0 at bottom, 1 at top, 0.5 at geometric midpoint (not arithmetic).
+ Example: range [100, 200] → midpoint at ~141, not 150.
+
+ :param dataframe: OHLCV DataFrame with 'close' column
+ :param period: Lookback window (>=1)
+ :return: Normalized position (0 to 1)
"""
if period < 1:
raise ValueError(f"Invalid period value {period!r}: must be >= 1")
closes = df.get("close").to_numpy()
closes_log = np.log(closes)
highs = df.get("high").to_numpy()
+ highs_log = np.log(highs)
lows = df.get("low").to_numpy()
+ lows_log = np.log(lows)
volumes = df.get("volume").to_numpy()
state: TrendDirection = TrendDirection.NEUTRAL
pivots_indices: list[int] = []
- pivots_values: list[float] = []
+ pivots_values_log: list[float] = []
pivots_directions: list[TrendDirection] = []
pivots_amplitudes: list[float] = []
pivots_amplitude_threshold_ratios: list[float] = []
last_pivot_pos: int = -1
candidate_pivot_pos: int = -1
- candidate_pivot_value: float = np.nan
+ candidate_pivot_value_log: float = np.nan
volatility_quantile_cache: dict[int, float] = {}
return max_threshold - (max_threshold - min_threshold) * volatility_quantile
- def update_candidate_pivot(pos: int, value: float):
- nonlocal candidate_pivot_pos, candidate_pivot_value
+ def update_candidate_pivot(pos: int, value_log: float):
+ nonlocal candidate_pivot_pos, candidate_pivot_value_log
if 0 <= pos < n:
candidate_pivot_pos = pos
- candidate_pivot_value = value
+ candidate_pivot_value_log = value_log
def reset_candidate_pivot():
- nonlocal candidate_pivot_pos, candidate_pivot_value
+ nonlocal candidate_pivot_pos, candidate_pivot_value_log
candidate_pivot_pos = -1
- candidate_pivot_value = np.nan
+ candidate_pivot_value_log = np.nan
def calculate_pivot_metrics(
*,
previous_pos: int,
- previous_value: float,
+ previous_value_log: float,
current_pos: int,
- current_value: float,
+ current_value_log: float,
) -> tuple[float, float, float]:
if previous_pos < 0 or current_pos < 0:
return np.nan, np.nan, np.nan
if previous_pos >= n or current_pos >= n:
return np.nan, np.nan, np.nan
- if np.isclose(previous_value, 0.0) or np.isclose(current_value, 0.0):
+ if not np.isfinite(previous_value_log) or not np.isfinite(current_value_log):
return np.nan, np.nan, np.nan
- amplitude = abs(np.log(current_value) - np.log(previous_value))
+ amplitude = abs(current_value_log - previous_value_log)
if not (np.isfinite(amplitude) and amplitude >= 0):
return np.nan, np.nan, np.nan
if (end_pos - start_pos) < 2:
return np.nan
- closes_slice = closes[start_pos:end_pos]
- close_diffs = np.diff(closes_slice)
- path_length = np.nansum(np.abs(close_diffs))
- net_move = abs(closes_slice[-1] - closes_slice[0])
+ path_length = np.nansum(np.abs(np.diff(closes_log[start_pos:end_pos])))
+ net_move = abs(closes_log[end_pos - 1] - closes_log[start_pos])
if not (np.isfinite(path_length) and np.isfinite(net_move)):
return np.nan
total_volume = np.nansum(volumes_slice)
if not np.isfinite(total_volume) or np.isclose(total_volume, 0.0):
return np.nan
- volume_weights = volumes_slice / total_volume
- closes_slice = closes[start_pos:end_pos]
- vw_close_diffs = np.diff(closes_slice) * volume_weights
+ vw_close_diffs = np.diff(closes_log[start_pos:end_pos]) * (
+ volumes_slice / total_volume
+ )
vw_path_length = np.nansum(np.abs(vw_close_diffs))
vw_net_move = abs(np.nansum(vw_close_diffs))
return vw_net_move / vw_path_length
- def add_pivot(pos: int, value: float, direction: TrendDirection):
+ def add_pivot(pos: int, value_log: float, direction: TrendDirection):
nonlocal last_pivot_pos
if pivots_indices and indices[pos] == pivots_indices[-1]:
return
if (
- pivots_values
+ pivots_values_log
and last_pivot_pos >= 0
- and len(pivots_values) == len(pivots_amplitudes)
+ and len(pivots_values_log) == len(pivots_amplitudes)
):
amplitude, amplitude_threshold_ratio, speed = calculate_pivot_metrics(
previous_pos=last_pivot_pos,
- previous_value=pivots_values[-1],
+ previous_value_log=pivots_values_log[-1],
current_pos=pos,
- current_value=value,
+ current_value_log=value_log,
)
volume_rate = calculate_pivot_volume_rate(
previous_pos=last_pivot_pos,
)
pivots_indices.append(indices[pos])
- pivots_values.append(value)
+ pivots_values_log.append(value_log)
pivots_directions.append(direction)
pivots_amplitudes.append(np.nan)
start_pos = 0
initial_high_pos = start_pos
initial_low_pos = start_pos
- initial_high = highs[initial_high_pos]
- initial_low = lows[initial_low_pos]
+ initial_high_log = highs_log[initial_high_pos]
+ initial_low_log = lows_log[initial_low_pos]
for i in range(start_pos + 1, n):
- current_high = highs[i]
- current_low = lows[i]
- if current_high > initial_high:
- initial_high, initial_high_pos = current_high, i
- if current_low < initial_low:
- initial_low, initial_low_pos = current_low, i
-
- initial_move_from_high = abs(np.log(current_low) - np.log(initial_high))
- initial_move_from_low = abs(np.log(current_high) - np.log(initial_low))
+ if highs_log[i] > initial_high_log:
+ initial_high_log, initial_high_pos = highs_log[i], i
+ if lows_log[i] < initial_low_log:
+ initial_low_log, initial_low_pos = lows_log[i], i
+
+ initial_move_from_high = abs(lows_log[i] - highs_log[initial_high_pos])
+ initial_move_from_low = abs(highs_log[i] - lows_log[initial_low_pos])
is_initial_high_move_significant: bool = initial_move_from_high >= np.log1p(
thresholds[initial_high_pos]
)
)
if is_initial_high_move_significant and is_initial_low_move_significant:
if initial_move_from_high > initial_move_from_low:
- add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
+ add_pivot(initial_high_pos, initial_high_log, TrendDirection.UP)
state = TrendDirection.DOWN
break
else:
- add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
+ add_pivot(initial_low_pos, initial_low_log, TrendDirection.DOWN)
state = TrendDirection.UP
break
else:
if is_initial_high_move_significant:
- add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
+ add_pivot(initial_high_pos, initial_high_log, TrendDirection.UP)
state = TrendDirection.DOWN
break
elif is_initial_low_move_significant:
- add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
+ add_pivot(initial_low_pos, initial_low_log, TrendDirection.DOWN)
state = TrendDirection.UP
break
else:
)
for i in range(last_pivot_pos + 1, n):
- current_high = highs[i]
- current_low = lows[i]
-
if state == TrendDirection.UP:
- if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
- update_candidate_pivot(i, current_high)
- move_down = abs(np.log(current_low) - np.log(candidate_pivot_value))
+ if (
+ np.isnan(candidate_pivot_value_log)
+ or highs_log[i] > highs_log[candidate_pivot_pos]
+ ):
+ update_candidate_pivot(i, highs_log[i])
+ move_down = abs(lows_log[i] - candidate_pivot_value_log)
if move_down >= np.log1p(
thresholds[candidate_pivot_pos]
) and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
- add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
+ add_pivot(
+ candidate_pivot_pos,
+ highs_log[candidate_pivot_pos],
+ TrendDirection.UP,
+ )
state = TrendDirection.DOWN
elif state == TrendDirection.DOWN:
- if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
- update_candidate_pivot(i, current_low)
- move_up = abs(np.log(current_high) - np.log(candidate_pivot_value))
+ if (
+ np.isnan(candidate_pivot_value_log)
+ or lows_log[i] < lows_log[candidate_pivot_pos]
+ ):
+ update_candidate_pivot(i, lows_log[i])
+ move_up = abs(highs_log[i] - candidate_pivot_value_log)
if move_up >= np.log1p(
thresholds[candidate_pivot_pos]
) and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
add_pivot(
- candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
+ candidate_pivot_pos,
+ lows_log[candidate_pivot_pos],
+ TrendDirection.DOWN,
)
state = TrendDirection.UP
return (
pivots_indices,
- pivots_values,
+ pivots_values_log,
pivots_directions,
pivots_amplitudes,
pivots_amplitude_threshold_ratios,