_LABEL_SELECTION_DISTANCE_METRICS_SET: Final[frozenset[str]] = (
_DISTANCE_METRICS_SET - _PROBABILITY_DISTANCE_METRICS_SET
)
- # Aggregate metrics: distance metrics computed by reduction over
- # objective coordinates rather than SciPy/sklearn pairwise routines
- # (``harmonic_mean``, ``geometric_mean``, ``arithmetic_mean``,
- # ``quadratic_mean``, ``cubic_mean``, ``power_mean``, ``weighted_sum``).
- # Accepted by ``compromise_programming``/``topsis`` via
- # ``_calculate_trial_distance_to_ideal``; rejected by cluster/density
- # categories that route to ``pairwise_distances``/``KMeans``/
- # ``KMedoids``/``NearestNeighbors``.
- _AGGREGATE_DISTANCE_METRICS_SET: Final[frozenset[str]] = (
- _DISTANCE_METRICS_SET - _SCIPY_METRICS_SET - _PROBABILITY_DISTANCE_METRICS_SET
- )
# SciPy-compatible non-probability metrics. Accepted by cluster/density
# categories that route to ``pairwise_distances``/``KMeans``/
- # ``KMedoids``/``NearestNeighbors``; rejected by the aggregate set and
- # by probability metrics.
+ # ``KMedoids``/``NearestNeighbors``; rejected by aggregate metrics
+ # (``harmonic_mean``, ``geometric_mean``, ``arithmetic_mean``,
+ # ``quadratic_mean``, ``cubic_mean``, ``power_mean``, ``weighted_sum``)
+ # and by probability metrics.
_CLUSTER_DENSITY_DISTANCE_METRICS_SET: Final[frozenset[str]] = (
_SCIPY_METRICS_SET - _PROBABILITY_DISTANCE_METRICS_SET
)
if not series_list:
return None
if len(series_list) == 1:
- return series_list[0]
+ return series_list[0].astype(np.int64)
return pd.concat(series_list, axis=1).max(axis=1).astype(np.int64)
@staticmethod
else:
# Cluster/density paths route the metric to SciPy/sklearn APIs
# (pairwise_distances, KMeans, KMedoids, NearestNeighbors) which
- # reject `_AGGREGATE_DISTANCE_METRICS_SET`; restrict the
+ # reject aggregate metrics computed by reduction; restrict the
# valid set to SciPy-compatible non-probability metrics.
valid_metrics = (
QuickAdapterRegressorV3._CLUSTER_DENSITY_DISTANCE_METRICS_SET
)
if feat_dict.get("reverse_train_test_order", False):
- (
- train_features,
- test_features,
- train_labels,
- test_labels,
- train_base_weights,
- test_base_weights,
- train_label_weights,
- test_label_weights,
- ) = (
- test_features,
- train_features,
- test_labels,
- train_labels,
+ train_features, test_features = test_features, train_features
+ train_labels, test_labels = test_labels, train_labels
+ train_base_weights, test_base_weights = (
test_base_weights,
train_base_weights,
+ )
+ train_label_weights, test_label_weights = (
test_label_weights,
train_label_weights,
)
dd["train_weights"] = sanitize_and_renormalize(
dd["train_weights"],
logger=logger,
- context="post_feature_pipeline:train",
+ context=f"[{pair}] post_feature_pipeline:train",
)
dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"])
dd["test_weights"] = sanitize_and_renormalize(
dd["test_weights"],
logger=logger,
- context="post_feature_pipeline:test",
+ context=f"[{pair}] post_feature_pipeline:test",
)
dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"])
_log_known_at_none_once(dk.pair, "timeseries_split causal guard")
if feat_dict.get("reverse_train_test_order", False):
- (
- train_features,
- test_features,
- train_labels,
- test_labels,
- train_base_weights,
- test_base_weights,
- train_label_weights,
- test_label_weights,
- ) = (
- test_features,
- train_features,
- test_labels,
- train_labels,
+ train_features, test_features = test_features, train_features
+ train_labels, test_labels = test_labels, train_labels
+ train_base_weights, test_base_weights = (
test_base_weights,
train_base_weights,
+ )
+ train_label_weights, test_label_weights = (
test_label_weights,
train_label_weights,
)
version_repr = (
"none"
if existing_schema_version is None
- else f"v{existing_schema_version}"
+ else f"v{existing_schema_version!r}"
)
logger.warning(
f"[{pair}] Optuna {namespace} study {study_name}: "
INTERFACE_VERSION = 3
_TRADE_DIRECTIONS: Final[tuple[TradeDirection, ...]] = ("long", "short")
+ _TRADE_DIRECTIONS_SET: Final[frozenset[TradeDirection]] = frozenset(
+ _TRADE_DIRECTIONS
+ )
_INTERPOLATION_DIRECTIONS: Final[tuple[InterpolationDirection, ...]] = (
"direct",
"inverse",
)
_ORDER_TYPES: Final[tuple[OrderType, ...]] = ("entry", "exit")
+ _ORDER_TYPES_SET: Final[frozenset[OrderType]] = frozenset(_ORDER_TYPES)
_TRADING_MODES: Final[tuple[TradingMode, ...]] = ("spot", "margin", "futures")
_CUSTOM_STOPLOSS_NATR_MULTIPLIER_FRACTION: Final[float] = 0.7860
def timeframe_minutes(self) -> int:
return timeframe_to_minutes(self.config.get("timeframe"))
- @staticmethod
- @lru_cache(maxsize=None)
- def _trade_directions_set() -> set[TradeDirection]:
- return set(QuickAdapterV3._TRADE_DIRECTIONS)
-
- @staticmethod
- @lru_cache(maxsize=None)
- def _order_types_set() -> set[OrderType]:
- return set(QuickAdapterV3._ORDER_TYPES)
-
@property
def can_short(self) -> bool:
return self.is_short_allowed()
"""
if df.empty:
return False
- if side not in QuickAdapterV3._trade_directions_set():
+ if side not in QuickAdapterV3._TRADE_DIRECTIONS_SET:
return False
- if order not in QuickAdapterV3._order_types_set():
+ if order not in QuickAdapterV3._ORDER_TYPES_SET:
return False
if not isinstance(rate, (int, float)) or not np.isfinite(rate):
return False
side: str,
**kwargs,
) -> bool:
- if side not in QuickAdapterV3._trade_directions_set():
+ if side not in QuickAdapterV3._TRADE_DIRECTIONS_SET:
return False
if (
side == QuickAdapterV3._TRADE_DIRECTIONS[1] and not self.can_short