ListOrNone,
)
from numpy.typing import ArrayLike, NDArray
+from sklearn.preprocessing import (
+ MaxAbsScaler,
+ MinMaxScaler,
+ PowerTransformer,
+ RobustScaler,
+ StandardScaler,
+)
WeightStrategy = Literal[
"none",
"volume_weighted_efficiency_ratio",
)
-StandardizationType = Literal["none", "zscore", "robust", "mmad"]
+StandardizationType = Literal["none", "zscore", "robust", "mmad", "power_yj"]
STANDARDIZATION_TYPES: Final[tuple[StandardizationType, ...]] = (
- "none", # 0 - w (identity)
+ "none", # 0 - w
"zscore", # 1 - (w - μ) / σ
"robust", # 2 - (w - median) / IQR
- "mmad", # 3 - (w - median) / MAD
+ "mmad", # 3 - (w - median) / (MAD · k)
+ "power_yj", # 4 - YJ(w) (standardized)
)
-NormalizationType = Literal["minmax", "sigmoid", "none"]
+NormalizationType = Literal["maxabs", "minmax", "sigmoid", "none"]
NORMALIZATION_TYPES: Final[tuple[NormalizationType, ...]] = (
- "minmax", # 0 - (w - min) / (max - min)
- "sigmoid", # 1 - 1 / (1 + exp(-scale × w))
- "none", # 2 - w (identity)
+ "maxabs", # 0 - w / max(|w|)
+ "minmax", # 1 - low + (w - min) / (max - min) · (high - low)
+ "sigmoid", # 2 - 2·σ(scale · w) - 1
+ "none", # 3 - w
)
DEFAULTS_EXTREMA_WEIGHTING: Final[dict[str, Any]] = {
"robust_quantiles": (0.25, 0.75),
"mmad_scaling_factor": 1.4826,
# Phase 2: Normalization
- "normalization": NORMALIZATION_TYPES[0], # "minmax"
+ "normalization": NORMALIZATION_TYPES[0], # "maxabs"
"minmax_range": (-1.0, 1.0),
"sigmoid_scale": 1.0,
# Phase 3: Post-processing
super().__init__(name="ExtremaWeightingTransformer")
self.extrema_weighting = {**DEFAULTS_EXTREMA_WEIGHTING, **extrema_weighting}
self._fitted = False
- self._mean = 0.0
- self._std = 1.0
- self._min = 0.0
- self._max = 1.0
+ # Phase 1: Standardization
+ self._standard_scaler: StandardScaler | None = None
+ self._robust_scaler: RobustScaler | None = None
+ self._power_transformer: PowerTransformer | None = None
self._median = 0.0
- self._iqr = 1.0
self._mad = 1.0
+ # Phase 2: Normalization
+ self._minmax_scaler: MinMaxScaler | None = None
+ self._maxabs_scaler: MaxAbsScaler | None = None
def _standardize(
self,
return values
out = values.copy()
if method == STANDARDIZATION_TYPES[1]: # "zscore"
- out[mask] = (values[mask] - self._mean) / self._std
+ if self._standard_scaler is None:
+ raise RuntimeError("StandardScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._standard_scaler.transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
elif method == STANDARDIZATION_TYPES[2]: # "robust"
- out[mask] = (values[mask] - self._median) / self._iqr
+ if self._robust_scaler is None:
+ raise RuntimeError("RobustScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._robust_scaler.transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
elif method == STANDARDIZATION_TYPES[3]: # "mmad"
- mmad_scaling_factor = self.extrema_weighting["mmad_scaling_factor"]
- out[mask] = (values[mask] - self._median) / (
- self._mad * mmad_scaling_factor
- )
+ if values[mask].size > 0:
+ mmad_scaling_factor = self.extrema_weighting["mmad_scaling_factor"]
+ out[mask] = (values[mask] - self._median) / (
+ self._mad * mmad_scaling_factor
+ )
+ elif method == STANDARDIZATION_TYPES[4]: # "power_yj"
+ if self._power_transformer is None:
+ raise RuntimeError("PowerTransformer not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._power_transformer.transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
else:
raise ValueError(
f"Invalid standardization {method!r}. "
mask: NDArray[np.bool_],
) -> NDArray[np.floating]:
method = self.extrema_weighting["normalization"]
- if method == NORMALIZATION_TYPES[2]: # "none"
+ if method == NORMALIZATION_TYPES[3]: # "none"
return values
out = values.copy()
- if method == NORMALIZATION_TYPES[0]: # "minmax"
- minmax_range = self.extrema_weighting["minmax_range"]
- value_range = self._max - self._min
- low, high = minmax_range
- scale_range = high - low
-
- if (
- not np.isfinite(value_range)
- or np.isclose(value_range, 0.0)
- or not np.isfinite(scale_range)
- or np.isclose(scale_range, 0.0)
- ):
- return out
-
- out[mask] = low + (values[mask] - self._min) / value_range * scale_range
- elif method == NORMALIZATION_TYPES[1]: # "sigmoid"
+ if method == NORMALIZATION_TYPES[0]: # "maxabs"
+ if self._maxabs_scaler is None:
+ raise RuntimeError("MaxAbsScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._maxabs_scaler.transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
+ elif method == NORMALIZATION_TYPES[1]: # "minmax"
+ if self._minmax_scaler is None:
+ raise RuntimeError("MinMaxScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._minmax_scaler.transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
+ elif method == NORMALIZATION_TYPES[2]: # "sigmoid"
sigmoid_scale = self.extrema_weighting["sigmoid_scale"]
- out[mask] = sp.special.expit(sigmoid_scale * values[mask])
+ out[mask] = 2.0 * sp.special.expit(sigmoid_scale * values[mask]) - 1.0
else:
raise ValueError(
f"Invalid normalization {method!r}. "
return values
out = values.copy()
if method == STANDARDIZATION_TYPES[1]: # "zscore"
- out[mask] = values[mask] * self._std + self._mean
+ if self._standard_scaler is None:
+ raise RuntimeError("StandardScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._standard_scaler.inverse_transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
elif method == STANDARDIZATION_TYPES[2]: # "robust"
- out[mask] = values[mask] * self._iqr + self._median
+ if self._robust_scaler is None:
+ raise RuntimeError("RobustScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._robust_scaler.inverse_transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
elif method == STANDARDIZATION_TYPES[3]: # "mmad"
- mmad_scaling_factor = self.extrema_weighting["mmad_scaling_factor"]
- out[mask] = values[mask] * (self._mad * mmad_scaling_factor) + self._median
+ if values[mask].size > 0:
+ mmad_scaling_factor = self.extrema_weighting["mmad_scaling_factor"]
+ out[mask] = (
+ values[mask] * (self._mad * mmad_scaling_factor) + self._median
+ )
+ elif method == STANDARDIZATION_TYPES[4]: # "power_yj"
+ if self._power_transformer is None:
+ raise RuntimeError("PowerTransformer not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._power_transformer.inverse_transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
else:
raise ValueError(
f"Invalid standardization {method!r}. "
mask: NDArray[np.bool_],
) -> NDArray[np.floating]:
method = self.extrema_weighting["normalization"]
- if method == NORMALIZATION_TYPES[2]: # "none"
+ if method == NORMALIZATION_TYPES[3]: # "none"
return values
out = values.copy()
- if method == NORMALIZATION_TYPES[0]: # "minmax"
- minmax_range = self.extrema_weighting["minmax_range"]
- low, high = minmax_range
- value_range = self._max - self._min
- scale_range = high - low
-
- if (
- not np.isfinite(value_range)
- or np.isclose(value_range, 0.0)
- or not np.isfinite(scale_range)
- or np.isclose(scale_range, 0.0)
- ):
- return out
-
- out[mask] = self._min + (values[mask] - low) / scale_range * value_range
- elif method == NORMALIZATION_TYPES[1]: # "sigmoid"
+ if method == NORMALIZATION_TYPES[0]: # "maxabs"
+ if self._maxabs_scaler is None:
+ raise RuntimeError("MaxAbsScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._maxabs_scaler.inverse_transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
+ elif method == NORMALIZATION_TYPES[1]: # "minmax"
+ if self._minmax_scaler is None:
+ raise RuntimeError("MinMaxScaler not fitted")
+ if values[mask].size > 0:
+ out[mask] = self._minmax_scaler.inverse_transform(
+ values[mask].reshape(-1, 1)
+ ).flatten()
+ elif method == NORMALIZATION_TYPES[2]: # "sigmoid"
sigmoid_scale = self.extrema_weighting["sigmoid_scale"]
- out[mask] = sp.special.logit(values[mask]) / sigmoid_scale
+ out[mask] = sp.special.logit((values[mask] + 1.0) / 2.0) / sigmoid_scale
else:
raise ValueError(
f"Invalid normalization {method!r}. "
**kwargs,
) -> tuple[ArrayLike, ArrayOrNone, ArrayOrNone, ListOrNone]:
values = np.asarray(X, dtype=float)
- non_zero_finite_values = values[np.isfinite(values) & ~np.isclose(values, 0.0)]
+ finite_values = values[np.isfinite(values)]
+
+ standardization = self.extrema_weighting["standardization"]
+ normalization = self.extrema_weighting["normalization"]
+
+ if finite_values.size == 0:
+ if standardization == STANDARDIZATION_TYPES[1]: # "zscore"
+ self._standard_scaler = StandardScaler()
+ self._standard_scaler.fit([[0.0], [1.0]])
+ elif standardization == STANDARDIZATION_TYPES[2]: # "robust"
+ self._robust_scaler = RobustScaler(
+ quantile_range=(
+ self.extrema_weighting["robust_quantiles"][0] * 100,
+ self.extrema_weighting["robust_quantiles"][1] * 100,
+ )
+ )
+ self._robust_scaler.fit([[0.0], [1.0]])
+ elif standardization == STANDARDIZATION_TYPES[3]: # "mmad"
+ self._median = 0.0
+ self._mad = 1.0
+ elif standardization == STANDARDIZATION_TYPES[4]: # "power_yj"
+ self._power_transformer = PowerTransformer(
+ method="yeo-johnson", standardize=True
+ )
+ self._power_transformer.fit(np.array([[0.0], [1.0]]))
+
+ if normalization == NORMALIZATION_TYPES[0]: # "maxabs"
+ self._maxabs_scaler = MaxAbsScaler()
+ self._maxabs_scaler.fit([[0.0], [1.0]])
+ elif normalization == NORMALIZATION_TYPES[1]: # "minmax"
+ self._minmax_scaler = MinMaxScaler(
+ feature_range=self.extrema_weighting["minmax_range"]
+ )
+ self._minmax_scaler.fit(np.array([[0.0], [1.0]]))
- if non_zero_finite_values.size == 0:
- self._mean = 0.0
- self._std = 1.0
- self._min = 0.0
- self._max = 1.0
- self._median = 0.0
- self._iqr = 1.0
- self._mad = 1.0
self._fitted = True
return X, y, sample_weight, feature_list
- robust_quantiles = self.extrema_weighting["robust_quantiles"]
+ if standardization == STANDARDIZATION_TYPES[1]: # "zscore"
+ self._standard_scaler = StandardScaler()
+ self._standard_scaler.fit(finite_values.reshape(-1, 1))
+ elif standardization == STANDARDIZATION_TYPES[2]: # "robust"
+ self._robust_scaler = RobustScaler(
+ quantile_range=(
+ self.extrema_weighting["robust_quantiles"][0] * 100,
+ self.extrema_weighting["robust_quantiles"][1] * 100,
+ )
+ )
+ self._robust_scaler.fit(finite_values.reshape(-1, 1))
+ elif standardization == STANDARDIZATION_TYPES[3]: # "mmad"
+ self._median = np.median(finite_values)
+ mad = np.median(np.abs(finite_values - self._median))
+ self._mad = mad if np.isfinite(mad) and not np.isclose(mad, 0.0) else 1.0
+ elif standardization == STANDARDIZATION_TYPES[4]: # "power_yj"
+ self._power_transformer = PowerTransformer(
+ method="yeo-johnson", standardize=True
+ )
+ self._power_transformer.fit(finite_values.reshape(-1, 1))
+
+ finite_mask = np.ones(len(finite_values), dtype=bool)
+ standardized_values = self._standardize(finite_values, finite_mask)
- self._mean = np.mean(non_zero_finite_values)
- std = np.std(non_zero_finite_values, ddof=1)
- self._std = std if np.isfinite(std) and not np.isclose(std, 0.0) else 1.0
- self._min = np.min(non_zero_finite_values)
- self._max = np.max(non_zero_finite_values)
- if np.isclose(self._max, self._min):
- self._max = self._min + 1.0
- self._median = np.median(non_zero_finite_values)
- q1, q3 = (
- np.quantile(non_zero_finite_values, robust_quantiles[0]),
- np.quantile(non_zero_finite_values, robust_quantiles[1]),
- )
- iqr = q3 - q1
- self._iqr = iqr if np.isfinite(iqr) and not np.isclose(iqr, 0.0) else 1.0
- mad = np.median(np.abs(non_zero_finite_values - self._median))
- self._mad = mad if np.isfinite(mad) and not np.isclose(mad, 0.0) else 1.0
+ if normalization == NORMALIZATION_TYPES[0]: # "maxabs"
+ self._maxabs_scaler = MaxAbsScaler()
+ self._maxabs_scaler.fit(standardized_values.reshape(-1, 1))
+ elif normalization == NORMALIZATION_TYPES[1]: # "minmax"
+ self._minmax_scaler = MinMaxScaler(
+ feature_range=self.extrema_weighting["minmax_range"]
+ )
+ self._minmax_scaler.fit(standardized_values.reshape(-1, 1))
self._fitted = True
return X, y, sample_weight, feature_list
)
arr = np.asarray(X, dtype=float)
- # Exclude non-finite values and zeros (NEUTRAL extrema label) from transformation
- mask = np.isfinite(arr) & ~np.isclose(arr, 0.0)
+ mask = np.isfinite(arr)
standardized = self._standardize(arr, mask)
normalized = self._normalize(standardized, mask)
)
arr = np.asarray(X, dtype=float)
- # Exclude non-finite values and zeros (NEUTRAL extrema label) from transformation
- mask = np.isfinite(arr) & ~np.isclose(arr, 0.0)
+ mask = np.isfinite(arr)
degammaized = self._inverse_gamma(arr, mask)
denormalized = self._inverse_normalize(degammaized, mask)