]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(quickadapter): add format_dict helper and improve numeric formatting
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 26 Jan 2026 12:23:51 +0000 (13:23 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 26 Jan 2026 12:23:51 +0000 (13:23 +0100)
- Add format_dict() with singledispatch for type-safe dict/params formatting
- Refactor format_number() with unified significant digits formula
- Replace raw dict logging with format_dict() across strategy and model
- Remove redundant _format_label_method_config method
- Bump version to 3.11.1

quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index ddac8dc83fa6016dce5a9522383bc7053f4235a8..cdee30510a89cf3b168389ff4756557bd7b3b36a 100644 (file)
@@ -51,6 +51,7 @@ from Utils import (
     Regressor,
     eval_set_and_weights,
     fit_regressor,
+    format_dict,
     format_number,
     get_label_defaults,
     get_label_pipeline_config,
@@ -93,7 +94,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.11.0"
+    version = "3.11.1"
 
     _TEST_SIZE: Final[float] = 0.1
 
@@ -736,10 +737,6 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
 
         return config
 
-    @staticmethod
-    def _format_label_method_config(config: dict[str, Any]) -> str:
-        return ", ".join(f"{k}={v}" for k, v in config.items())
-
     _CONFIG_KEY_TO_TUNABLE_SUFFIX: Final[dict[str, str]] = {
         "distance_metric": "metric",
     }
@@ -1121,7 +1118,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         )
         logger.info("Feature Parameters Configuration:")
         logger.info(f"  scaler: {scaler}")
-        logger.info(f"  range: {feature_range}")
+        logger.info(
+            f"  range: ({format_number(feature_range[0])}, {format_number(feature_range[1])})"
+        )
 
         logger.info("=" * 60)
 
@@ -2904,7 +2903,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                     "label_method", QuickAdapterRegressorV3.LABEL_METHOD_DEFAULT
                 )
             )
-            metric_log_msg = f" ({QuickAdapterRegressorV3._format_label_method_config(label_config)})"
+            metric_log_msg = f" ({format_dict(label_config, style='params')})"
         logger.info(
             f"[{pair}] Optuna {namespace} {objective_type} objective hyperopt completed"
             f"{metric_log_msg} ({time_spent:.2f} secs)"
@@ -2914,6 +2913,10 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             if study_best_results
             else 20
         )
+        if study_best_results:
+            logger.info(
+                f"[{pair}] Optuna {namespace} {objective_type} objective hyperopt best params: {format_dict(study_best_results, style='dict')}"
+            )
         for key, value in study_best_results.items():
             if isinstance(value, list):
                 formatted_value = (
index 11e241c3ebce3d3820f0a58254c84434e4a0f7f5..4828c77e625df45f459cdf1e6318aaa7c499c8b8 100644 (file)
@@ -41,6 +41,7 @@ from Utils import (
     bottom_log_return,
     calculate_quantile,
     ewo,
+    format_dict,
     format_number,
     generate_label_data,
     get_callable_sha256,
@@ -108,7 +109,7 @@ class QuickAdapterV3(IStrategy):
     _PLOT_EXTREMA_MIN_EPS: Final[float] = 0.01
 
     def version(self) -> str:
-        return "3.11.0"
+        return "3.11.1"
 
     timeframe = "5m"
     timeframe_minutes = timeframe_to_minutes(timeframe)
@@ -477,7 +478,7 @@ class QuickAdapterV3(IStrategy):
             logger.info("  Weighting:")
             logger.info(f"    strategy: {col_weighting['strategy']}")
             logger.info(
-                f"    metric_coefficients: {col_weighting['metric_coefficients']}"
+                f"    metric_coefficients: {format_dict(col_weighting['metric_coefficients'], style='dict')}"
             )
             logger.info(f"    aggregation: {col_weighting['aggregation']}")
             if col_weighting["aggregation"] == COMBINED_AGGREGATIONS[5]:  # "softmax"
@@ -512,7 +513,9 @@ class QuickAdapterV3(IStrategy):
 
         logger.info("Exit Pricing:")
         logger.info(f"  trade_price_target_method: {self.trade_price_target_method}")
-        logger.info(f"  thresholds_calibration: {self._exit_thresholds_calibration}")
+        logger.info(
+            f"  thresholds_calibration: {format_dict(self._exit_thresholds_calibration, style='dict')}"
+        )
 
         logger.info("Custom Stoploss:")
         logger.info(
@@ -538,15 +541,12 @@ class QuickAdapterV3(IStrategy):
         if self.protections:
             for protection in self.protections:
                 method = protection.get("method", "Unknown")
-                logger.info(f"  {method}:")
-                for key, value in protection.items():
-                    if key != "method":
-                        if isinstance(value, bool):
-                            logger.info(f"    {key}: {value}")
-                        elif isinstance(value, (int, float)):
-                            logger.info(f"    {key}: {format_number(value)}")
-                        else:
-                            logger.info(f"    {key}: {value}")
+                protection_params = {
+                    k: v for k, v in protection.items() if k != "method"
+                }
+                logger.info(
+                    f"  {method}: {format_dict(protection_params, style='dict')}"
+                )
         else:
             logger.info("  No protections enabled")
 
@@ -815,11 +815,11 @@ class QuickAdapterV3(IStrategy):
 
             if len(label_data.indices) == 0:
                 logger.warning(
-                    f"[{pair}] No {label_col!r} labels | label_period: {QuickAdapterV3._td_format(label_period)} | params: {label_params!r}"
+                    f"[{pair}] No {label_col!r} labels | label_period: {QuickAdapterV3._td_format(label_period)} | params: {format_dict(label_params, style='params')}"
                 )
             else:
                 logger.info(
-                    f"[{pair}] {len(label_data.indices)} {label_col!r} labels | label_period: {QuickAdapterV3._td_format(label_period)} | params: {label_params!r}"
+                    f"[{pair}] {len(label_data.indices)} {label_col!r} labels | label_period: {QuickAdapterV3._td_format(label_period)} | params: {format_dict(label_params, style='params')}"
                 )
 
             col_weighting_config = get_label_column_config(
@@ -1730,7 +1730,7 @@ class QuickAdapterV3(IStrategy):
             return False
         if not (0.0 < decay_fraction <= 1.0):
             logger.debug(
-                f"[{pair}] Denied {trade_direction} {order}: invalid decay_fraction {decay_fraction}, must be in (0, 1]"
+                f"[{pair}] Denied {trade_direction} {order}: invalid decay_fraction {format_number(decay_fraction)}, must be in (0, 1]"
             )
             return False
 
index 4a8b64ec8222d7c10da798817fcd342071c82f9b..efc6868ce28132d7eb6a6aff9c6f2699a2bd2c2c 100644 (file)
@@ -4,7 +4,7 @@ import hashlib
 import math
 from dataclasses import dataclass
 from enum import IntEnum
-from functools import lru_cache
+from functools import lru_cache, singledispatch
 from logging import Logger
 from pathlib import Path
 from typing import (
@@ -1086,10 +1086,16 @@ def get_callable_sha256(fn: Callable[..., Any]) -> str:
     return hashlib.sha256(code.co_code).hexdigest()
 
 
+_SCIENTIFIC_THRESHOLD_HIGH = 1e12
+_SCIENTIFIC_THRESHOLD_LOW = 1e-6
+
+
 @lru_cache(maxsize=128)
 def format_number(value: int | float, significant_digits: int = 5) -> str:
-    if not isinstance(value, (int, float)):
+    if not isinstance(value, (int, float, np.integer, np.floating)):
         return str(value)
+    if isinstance(value, (np.integer, np.floating)):
+        value = float(value)
 
     if np.isposinf(value):
         return "+∞"
@@ -1098,27 +1104,182 @@ def format_number(value: int | float, significant_digits: int = 5) -> str:
     if np.isnan(value):
         return "NaN"
 
-    if value == int(value):
-        return str(int(value))
-
     abs_value = abs(value)
 
-    if abs_value >= 1.0:
-        precision = significant_digits
-    else:
-        if abs_value == 0:
-            return "0"
-        order_of_magnitude = math.floor(math.log10(abs_value))
-        leading_zeros = abs(order_of_magnitude) - 1
-        precision = leading_zeros + significant_digits
-    precision = max(0, int(precision))
+    if abs_value >= _SCIENTIFIC_THRESHOLD_HIGH or (
+        0 < abs_value <= _SCIENTIFIC_THRESHOLD_LOW
+    ):
+        return f"{value:.{significant_digits - 1}e}"
+
+    if abs_value == 0:
+        return "0"
+
+    magnitude = math.floor(math.log10(abs_value))
+    precision = significant_digits - 1 - magnitude
+
+    if precision < 0:
+        factor = 10 ** (-precision)
+        rounded = round(value / factor) * factor
+        return f"{rounded:.0f}"
+
+    formatted = f"{value:.{precision}f}"
+    if "." in formatted:
+        formatted = formatted.rstrip("0").rstrip(".")
+    return formatted
+
+
+_MAX_STR_LEN = 50
+_MAX_ITEMS = 10
+_MAX_DEPTH = 2
+
+
+class _FormatContext:
+    __slots__ = ("quote_strings", "sig_digits", "seen")
+
+    def __init__(self, quote_strings: bool, sig_digits: int):
+        self.quote_strings = quote_strings
+        self.sig_digits = sig_digits
+        self.seen: set[int] = set()
+
 
-    formatted_value = f"{value:.{precision}f}"
+@singledispatch
+def _format_value(value: Any, ctx: _FormatContext, depth: int) -> str:
+    return repr(value)
 
-    if "." in formatted_value:
-        formatted_value = formatted_value.rstrip("0").rstrip(".")
 
-    return formatted_value
+@_format_value.register(type(None))
+def _(value: None, ctx: _FormatContext, depth: int) -> str:
+    return "None"
+
+
+@_format_value.register(bool)
+def _(value: bool, ctx: _FormatContext, depth: int) -> str:
+    return str(value)
+
+
+@_format_value.register(int)
+def _(value: int, ctx: _FormatContext, depth: int) -> str:
+    return format_number(float(value), significant_digits=ctx.sig_digits)
+
+
+@_format_value.register(float)
+def _(value: float, ctx: _FormatContext, depth: int) -> str:
+    return format_number(value, significant_digits=ctx.sig_digits)
+
+
+@_format_value.register(np.integer)
+def _(value: np.integer, ctx: _FormatContext, depth: int) -> str:
+    return format_number(float(value), significant_digits=ctx.sig_digits)
+
+
+@_format_value.register(np.floating)
+def _(value: np.floating, ctx: _FormatContext, depth: int) -> str:
+    return format_number(float(value), significant_digits=ctx.sig_digits)
+
+
+@_format_value.register(np.bool_)
+def _(value: np.bool_, ctx: _FormatContext, depth: int) -> str:
+    return str(bool(value))
+
+
+@_format_value.register(str)
+def _(value: str, ctx: _FormatContext, depth: int) -> str:
+    escaped = (
+        value.replace("\\", "\\\\")
+        .replace("\n", "\\n")
+        .replace("\r", "\\r")
+        .replace("\t", "\\t")
+    )
+    if len(escaped) > _MAX_STR_LEN:
+        escaped = escaped[:_MAX_STR_LEN] + "..."
+    if ctx.quote_strings:
+        escaped = escaped.replace("'", "\\'")
+        return f"'{escaped}'"
+    return escaped
+
+
+def _format_collection(
+    value: list | tuple | set,
+    ctx: _FormatContext,
+    depth: int,
+    brackets: tuple[str, str],
+    empty: str,
+    trailing_comma: bool = False,
+) -> str:
+    if not value:
+        return empty
+    obj_id = id(value)
+    if obj_id in ctx.seen:
+        return f"{brackets[0]}<circular>{brackets[1]}"
+    if depth >= _MAX_DEPTH:
+        return f"{brackets[0]}...{brackets[1]}"
+    ctx.seen.add(obj_id)
+    items_iter = sorted(value, key=str) if isinstance(value, set) else value
+    items = [_format_value(v, ctx, depth + 1) for v in list(items_iter)[:_MAX_ITEMS]]
+    if len(value) > _MAX_ITEMS:
+        items.append(f"...+{len(value) - _MAX_ITEMS}")
+    content = ", ".join(items)
+    if trailing_comma and len(value) == 1 and len(items) == 1:
+        content += ","
+    ctx.seen.discard(obj_id)
+    return f"{brackets[0]}{content}{brackets[1]}"
+
+
+@_format_value.register(list)
+def _(value: list, ctx: _FormatContext, depth: int) -> str:
+    return _format_collection(value, ctx, depth, ("[", "]"), "[]")
+
+
+@_format_value.register(tuple)
+def _(value: tuple, ctx: _FormatContext, depth: int) -> str:
+    return _format_collection(value, ctx, depth, ("(", ")"), "()", trailing_comma=True)
+
+
+@_format_value.register(set)
+def _(value: set, ctx: _FormatContext, depth: int) -> str:
+    return _format_collection(value, ctx, depth, ("{", "}"), "set()")
+
+
+@_format_value.register(dict)
+def _(value: dict, ctx: _FormatContext, depth: int) -> str:
+    obj_id = id(value)
+    if obj_id in ctx.seen:
+        return "{<circular>}"
+    if depth >= _MAX_DEPTH:
+        return "{...}"
+    if not value:
+        return "{}"
+    ctx.seen.add(obj_id)
+    sep = ": " if ctx.quote_strings else "="
+    items = [
+        f"{k}{sep}{_format_value(v, ctx, depth + 1)}"
+        for k, v in list(value.items())[:_MAX_ITEMS]
+    ]
+    if len(value) > _MAX_ITEMS:
+        items.append(f"...+{len(value) - _MAX_ITEMS}")
+    ctx.seen.discard(obj_id)
+    return f"{{{', '.join(items)}}}"
+
+
+@_format_value.register(np.ndarray)
+def _(value: np.ndarray, ctx: _FormatContext, depth: int) -> str:
+    return f"array{value.shape}"
+
+
+def format_dict(
+    d: dict[str, Any],
+    style: Literal["dict", "params"] = "dict",
+    significant_digits: int = 5,
+) -> str:
+    if not d:
+        return "{}" if style == "dict" else ""
+
+    ctx = _FormatContext(quote_strings=(style == "dict"), sig_digits=significant_digits)
+    sep = ": " if style == "dict" else "="
+    items = [f"{k}{sep}{_format_value(v, ctx, 0)}" for k, v in d.items()]
+    joined = ", ".join(items)
+
+    return f"{{{joined}}}" if style == "dict" else joined
 
 
 @lru_cache(maxsize=128)