class ExtremaWeightingTransformer(BaseTransform):
+ _STANDARDIZATION_SCALERS: dict[str, str] = {
+ "zscore": "_standard_scaler",
+ "robust": "_robust_scaler",
+ "power_yj": "_power_transformer",
+ }
+ _NORMALIZATION_SCALERS: dict[str, str] = {
+ "maxabs": "_maxabs_scaler",
+ "minmax": "_minmax_scaler",
+ }
+
def __init__(self, *, extrema_weighting: dict[str, Any]) -> None:
super().__init__(name="ExtremaWeightingTransformer")
self.extrema_weighting = {**DEFAULTS_EXTREMA_WEIGHTING, **extrema_weighting}
self._minmax_scaler: MinMaxScaler | None = None
self._maxabs_scaler: MaxAbsScaler | None = None
+ def _apply_scaler(
+ self,
+ values: NDArray[np.floating],
+ mask: NDArray[np.bool_],
+ scaler: Any,
+ inverse: bool = False,
+ ) -> NDArray[np.floating]:
+ if values[mask].size == 0:
+ return values
+ out = values.copy()
+ method = scaler.inverse_transform if inverse else scaler.transform
+ out[mask] = method(values[mask].reshape(-1, 1)).flatten()
+ return out
+
+ def _apply_mmad(
+ self,
+ values: NDArray[np.floating],
+ mask: NDArray[np.bool_],
+ inverse: bool = False,
+ ) -> NDArray[np.floating]:
+ if values[mask].size == 0:
+ return values
+ out = values.copy()
+ k = self.extrema_weighting["mmad_scaling_factor"]
+ if inverse:
+ out[mask] = values[mask] * (self._mad * k) + self._median
+ else:
+ out[mask] = (values[mask] - self._median) / (self._mad * k)
+ return out
+
+ def _apply_sigmoid(
+ self,
+ values: NDArray[np.floating],
+ mask: NDArray[np.bool_],
+ inverse: bool = False,
+ ) -> NDArray[np.floating]:
+ if values[mask].size == 0:
+ return values
+ scale = self.extrema_weighting["sigmoid_scale"]
+ if not np.isfinite(scale) or np.isclose(scale, 0.0):
+ return values
+ out = values.copy()
+ if inverse:
+ out[mask] = sp.special.logit((values[mask] + 1.0) / 2.0) / scale
+ else:
+ out[mask] = 2.0 * sp.special.expit(scale * values[mask]) - 1.0
+ return out
+
def _standardize(
self,
values: NDArray[np.floating],
method = self.extrema_weighting["standardization"]
if method == STANDARDIZATION_TYPES[0]: # "none"
return values
- out = values.copy()
- if method == STANDARDIZATION_TYPES[1]: # "zscore"
- if self._standard_scaler is None:
- raise RuntimeError("StandardScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._standard_scaler.transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == STANDARDIZATION_TYPES[2]: # "robust"
- if self._robust_scaler is None:
- raise RuntimeError("RobustScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._robust_scaler.transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == STANDARDIZATION_TYPES[3]: # "mmad"
- if values[mask].size > 0:
- mmad_scaling_factor = self.extrema_weighting["mmad_scaling_factor"]
- out[mask] = (values[mask] - self._median) / (
- self._mad * mmad_scaling_factor
- )
- elif method == STANDARDIZATION_TYPES[4]: # "power_yj"
- if self._power_transformer is None:
- raise RuntimeError("PowerTransformer not fitted")
- if values[mask].size > 0:
- out[mask] = self._power_transformer.transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- else:
+ if method == STANDARDIZATION_TYPES[3]: # "mmad"
+ return self._apply_mmad(values, mask, inverse=False)
+
+ scaler_attr = self._STANDARDIZATION_SCALERS.get(method)
+ if scaler_attr is None:
raise ValueError(
f"Invalid standardization {method!r}. "
f"Supported: {', '.join(STANDARDIZATION_TYPES)}"
)
- return out
+ scaler = getattr(self, scaler_attr)
+ if scaler is None:
+ raise RuntimeError(f"{scaler_attr[1:]} not fitted")
+ return self._apply_scaler(values, mask, scaler, inverse=False)
def _normalize(
self,
mask: NDArray[np.bool_],
) -> NDArray[np.floating]:
method = self.extrema_weighting["normalization"]
+ if method == NORMALIZATION_TYPES[2]: # "sigmoid"
+ return self._apply_sigmoid(values, mask, inverse=False)
if method == NORMALIZATION_TYPES[3]: # "none"
return values
- out = values.copy()
- if method == NORMALIZATION_TYPES[0]: # "maxabs"
- if self._maxabs_scaler is None:
- raise RuntimeError("MaxAbsScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._maxabs_scaler.transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == NORMALIZATION_TYPES[1]: # "minmax"
- if self._minmax_scaler is None:
- raise RuntimeError("MinMaxScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._minmax_scaler.transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == NORMALIZATION_TYPES[2]: # "sigmoid"
- sigmoid_scale = self.extrema_weighting["sigmoid_scale"]
- out[mask] = 2.0 * sp.special.expit(sigmoid_scale * values[mask]) - 1.0
- else:
+
+ scaler_attr = self._NORMALIZATION_SCALERS.get(method)
+ if scaler_attr is None:
raise ValueError(
f"Invalid normalization {method!r}. "
f"Supported: {', '.join(NORMALIZATION_TYPES)}"
)
- return out
+ scaler = getattr(self, scaler_attr)
+ if scaler is None:
+ raise RuntimeError(f"{scaler_attr[1:]} not fitted")
+ return self._apply_scaler(values, mask, scaler, inverse=False)
def _apply_gamma(
self,
method = self.extrema_weighting["standardization"]
if method == STANDARDIZATION_TYPES[0]: # "none"
return values
- out = values.copy()
- if method == STANDARDIZATION_TYPES[1]: # "zscore"
- if self._standard_scaler is None:
- raise RuntimeError("StandardScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._standard_scaler.inverse_transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == STANDARDIZATION_TYPES[2]: # "robust"
- if self._robust_scaler is None:
- raise RuntimeError("RobustScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._robust_scaler.inverse_transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == STANDARDIZATION_TYPES[3]: # "mmad"
- if values[mask].size > 0:
- mmad_scaling_factor = self.extrema_weighting["mmad_scaling_factor"]
- out[mask] = (
- values[mask] * (self._mad * mmad_scaling_factor) + self._median
- )
- elif method == STANDARDIZATION_TYPES[4]: # "power_yj"
- if self._power_transformer is None:
- raise RuntimeError("PowerTransformer not fitted")
- if values[mask].size > 0:
- out[mask] = self._power_transformer.inverse_transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- else:
+ if method == STANDARDIZATION_TYPES[3]: # "mmad"
+ return self._apply_mmad(values, mask, inverse=True)
+
+ scaler_attr = self._STANDARDIZATION_SCALERS.get(method)
+ if scaler_attr is None:
raise ValueError(
f"Invalid standardization {method!r}. "
f"Supported: {', '.join(STANDARDIZATION_TYPES)}"
)
- return out
+ scaler = getattr(self, scaler_attr)
+ if scaler is None:
+ raise RuntimeError(f"{scaler_attr[1:]} not fitted")
+ return self._apply_scaler(values, mask, scaler, inverse=True)
def _inverse_normalize(
self,
mask: NDArray[np.bool_],
) -> NDArray[np.floating]:
method = self.extrema_weighting["normalization"]
+ if method == NORMALIZATION_TYPES[2]: # "sigmoid"
+ return self._apply_sigmoid(values, mask, inverse=True)
if method == NORMALIZATION_TYPES[3]: # "none"
return values
- out = values.copy()
- if method == NORMALIZATION_TYPES[0]: # "maxabs"
- if self._maxabs_scaler is None:
- raise RuntimeError("MaxAbsScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._maxabs_scaler.inverse_transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == NORMALIZATION_TYPES[1]: # "minmax"
- if self._minmax_scaler is None:
- raise RuntimeError("MinMaxScaler not fitted")
- if values[mask].size > 0:
- out[mask] = self._minmax_scaler.inverse_transform(
- values[mask].reshape(-1, 1)
- ).flatten()
- elif method == NORMALIZATION_TYPES[2]: # "sigmoid"
- sigmoid_scale = self.extrema_weighting["sigmoid_scale"]
- out[mask] = sp.special.logit((values[mask] + 1.0) / 2.0) / sigmoid_scale
- else:
+
+ scaler_attr = self._NORMALIZATION_SCALERS.get(method)
+ if scaler_attr is None:
raise ValueError(
f"Invalid normalization {method!r}. "
f"Supported: {', '.join(NORMALIZATION_TYPES)}"
)
- return out
+ scaler = getattr(self, scaler_attr)
+ if scaler is None:
+ raise RuntimeError(f"{scaler_attr[1:]} not fitted")
+ return self._apply_scaler(values, mask, scaler, inverse=True)
def _inverse_gamma(
self,
out[mask] = np.sign(values[mask]) * np.power(np.abs(values[mask]), 1.0 / gamma)
return out
+ def _fit_standardization(self, values: NDArray[np.floating]) -> None:
+ method = self.extrema_weighting["standardization"]
+ if method == STANDARDIZATION_TYPES[0]: # "none"
+ return
+ if method == STANDARDIZATION_TYPES[1]: # "zscore"
+ self._standard_scaler = StandardScaler()
+ self._standard_scaler.fit(values.reshape(-1, 1))
+ return
+ if method == STANDARDIZATION_TYPES[2]: # "robust"
+ q = self.extrema_weighting["robust_quantiles"]
+ self._robust_scaler = RobustScaler(quantile_range=(q[0] * 100, q[1] * 100))
+ self._robust_scaler.fit(values.reshape(-1, 1))
+ return
+ if method == STANDARDIZATION_TYPES[3]: # "mmad"
+ self._median = float(np.median(values))
+ mad = np.median(np.abs(values - self._median))
+ self._mad = (
+ float(mad) if np.isfinite(mad) and not np.isclose(mad, 0.0) else 1.0
+ )
+ return
+ if method == STANDARDIZATION_TYPES[4]: # "power_yj"
+ self._power_transformer = PowerTransformer(
+ method="yeo-johnson", standardize=True
+ )
+ self._power_transformer.fit(values.reshape(-1, 1))
+ return
+
+ raise ValueError(
+ f"Invalid standardization {method!r}. Supported: {', '.join(STANDARDIZATION_TYPES)}"
+ )
+
+ def _fit_normalization(self, values: NDArray[np.floating]) -> None:
+ method = self.extrema_weighting["normalization"]
+ if method == NORMALIZATION_TYPES[0]: # "maxabs"
+ self._maxabs_scaler = MaxAbsScaler()
+ self._maxabs_scaler.fit(values.reshape(-1, 1))
+ return
+ if method == NORMALIZATION_TYPES[1]: # "minmax"
+ self._minmax_scaler = MinMaxScaler(
+ feature_range=self.extrema_weighting["minmax_range"]
+ )
+ self._minmax_scaler.fit(values.reshape(-1, 1))
+ return
+ if method == NORMALIZATION_TYPES[2]: # "sigmoid"
+ return
+ if method == NORMALIZATION_TYPES[3]: # "none"
+ return
+
+ raise ValueError(
+ f"Invalid normalization {method!r}. Supported: {', '.join(NORMALIZATION_TYPES)}"
+ )
+
def fit(
self,
X: ArrayLike,
values = np.asarray(X, dtype=float)
finite_values = values[np.isfinite(values)]
- standardization = self.extrema_weighting["standardization"]
- normalization = self.extrema_weighting["normalization"]
-
- if finite_values.size == 0:
- if standardization == STANDARDIZATION_TYPES[1]: # "zscore"
- self._standard_scaler = StandardScaler()
- self._standard_scaler.fit([[0.0], [1.0]])
- elif standardization == STANDARDIZATION_TYPES[2]: # "robust"
- self._robust_scaler = RobustScaler(
- quantile_range=(
- self.extrema_weighting["robust_quantiles"][0] * 100,
- self.extrema_weighting["robust_quantiles"][1] * 100,
- )
- )
- self._robust_scaler.fit([[0.0], [1.0]])
- elif standardization == STANDARDIZATION_TYPES[3]: # "mmad"
- self._median = 0.0
- self._mad = 1.0
- elif standardization == STANDARDIZATION_TYPES[4]: # "power_yj"
- self._power_transformer = PowerTransformer(
- method="yeo-johnson", standardize=True
- )
- self._power_transformer.fit(np.array([[0.0], [1.0]]))
-
- if normalization == NORMALIZATION_TYPES[0]: # "maxabs"
- self._maxabs_scaler = MaxAbsScaler()
- self._maxabs_scaler.fit([[0.0], [1.0]])
- elif normalization == NORMALIZATION_TYPES[1]: # "minmax"
- self._minmax_scaler = MinMaxScaler(
- feature_range=self.extrema_weighting["minmax_range"]
- )
- self._minmax_scaler.fit(np.array([[0.0], [1.0]]))
-
- self._fitted = True
- return X, y, sample_weight, feature_list
-
- if standardization == STANDARDIZATION_TYPES[1]: # "zscore"
- self._standard_scaler = StandardScaler()
- self._standard_scaler.fit(finite_values.reshape(-1, 1))
- elif standardization == STANDARDIZATION_TYPES[2]: # "robust"
- self._robust_scaler = RobustScaler(
- quantile_range=(
- self.extrema_weighting["robust_quantiles"][0] * 100,
- self.extrema_weighting["robust_quantiles"][1] * 100,
- )
- )
- self._robust_scaler.fit(finite_values.reshape(-1, 1))
- elif standardization == STANDARDIZATION_TYPES[3]: # "mmad"
- self._median = np.median(finite_values)
- mad = np.median(np.abs(finite_values - self._median))
- self._mad = mad if np.isfinite(mad) and not np.isclose(mad, 0.0) else 1.0
- elif standardization == STANDARDIZATION_TYPES[4]: # "power_yj"
- self._power_transformer = PowerTransformer(
- method="yeo-johnson", standardize=True
- )
- self._power_transformer.fit(finite_values.reshape(-1, 1))
+ fit_values = finite_values if finite_values.size > 0 else np.array([0.0, 1.0])
- finite_mask = np.ones(len(finite_values), dtype=bool)
- standardized_values = self._standardize(finite_values, finite_mask)
+ self._fit_standardization(fit_values)
- if normalization == NORMALIZATION_TYPES[0]: # "maxabs"
- self._maxabs_scaler = MaxAbsScaler()
- self._maxabs_scaler.fit(standardized_values.reshape(-1, 1))
- elif normalization == NORMALIZATION_TYPES[1]: # "minmax"
- self._minmax_scaler = MinMaxScaler(
- feature_range=self.extrema_weighting["minmax_range"]
- )
- self._minmax_scaler.fit(standardized_values.reshape(-1, 1))
+ finite_mask = np.ones(len(fit_values), dtype=bool)
+ standardized_fit_values = self._standardize(fit_values, finite_mask)
+ self._fit_normalization(standardized_fit_values)
self._fitted = True
return X, y, sample_weight, feature_list