import math
import time
import warnings
-from collections import defaultdict
+from collections import defaultdict, deque
from collections.abc import Mapping
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Type, Union
raise ValueError(
"FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration"
)
- self.action_masking: bool = (
- self.model_type == "MaskablePPO"
- ) # Enable action masking
+ self.action_masking: bool = self.model_type == "MaskablePPO"
self.rl_config.setdefault("action_masking", self.action_masking)
self.inference_masking: bool = self.rl_config.get("inference_masking", True)
+ self.recurrent: bool = self.model_type == "RecurrentPPO"
self.lr_schedule: bool = self.rl_config.get("lr_schedule", False)
self.cr_schedule: bool = self.rl_config.get("cr_schedule", False)
self.n_envs: int = self.rl_config.get("n_envs", 1)
model_params: Dict[str, Any] = copy.deepcopy(self.model_training_parameters)
- if model_params.get("seed") is None:
- model_params["seed"] = 42
+ model_params.setdefault("seed", 42)
if not self.hyperopt and self.lr_schedule:
lr = model_params.get("learning_rate", 0.0003)
model_params.get("policy_kwargs", {}).get("activation_fn", "relu")
)
model_params["policy_kwargs"]["optimizer_class"] = get_optimizer_class(
- model_params.get("policy_kwargs", {}).get("optimizer_class", "adam")
+ model_params.get("policy_kwargs", {}).get("optimizer_class", "adamw")
)
+ if "RecurrentPPO" in self.model_type:
+ model_params["policy_kwargs"].setdefault("lstm_hidden_size", 256)
+ model_params["policy_kwargs"].setdefault("n_lstm_layers", 1)
self._model_params_cache = model_params
return copy.deepcopy(self._model_params_cache)
logger.info("Eval multiprocessing: %s", self.eval_multiprocessing)
logger.info("Frame stacking: %s", self.frame_stacking)
logger.info("Action masking: %s", self.action_masking)
+ logger.info("Recurrent: %s", self.recurrent)
logger.info("Hyperopt: %s", self.hyperopt)
start_time = time.time()
n = np_dataframe.shape[0]
window_size: int = self.CONV_WIDTH
frame_stacking: int = self.frame_stacking
- frame_stacking_activated: bool = bool(frame_stacking) and frame_stacking > 1
+ frame_stacking_enabled: bool = bool(frame_stacking) and frame_stacking > 1
inference_masking: bool = self.action_masking and self.inference_masking
+ if window_size <= 0 or n < window_size:
+ return DataFrame(
+ {label: [np.nan] * n for label in dk.label_list}, index=dataframe.index
+ )
+
def _update_virtual_position(action: int, position: Positions) -> Positions:
if action == Actions.Long_enter.value and position == Positions.Neutral:
return Positions.Long
return current_virtual_trade_duration + 1
return 0
- frame_buffer: List[NDArray[np.float32]] = []
+ frame_buffer = deque(maxlen=frame_stacking if frame_stacking_enabled else None)
zero_frame: Optional[NDArray[np.float32]] = None
+ lstm_states: Optional[Tuple[NDArray[np.float32], NDArray[np.float32]]] = None
+ episode_start = np.array([True], dtype=bool)
def _predict(start_idx: int) -> int:
- nonlocal zero_frame
+ nonlocal zero_frame, lstm_states, episode_start
end_idx: int = start_idx + window_size
np_observation = np_dataframe[start_idx:end_idx, :]
action_masks_param: Dict[str, Any] = {}
),
(np_observation.shape[0], 1),
)
+ action_mask_position = position
else:
state_block = np.tile(
np.array(
),
(np_observation.shape[0], 1),
)
+ action_mask_position = virtual_position
np_observation = np.concatenate([np_observation, state_block], axis=1)
+ else:
+ action_mask_position = virtual_position
- fb: List[NDArray[np.float32]] = frame_buffer
- if frame_stacking_activated:
- fb.append(np_observation)
- if len(fb) > frame_stacking:
- del fb[0 : len(fb) - frame_stacking]
- if len(fb) < frame_stacking:
- pad_count = frame_stacking - len(fb)
+ if frame_stacking_enabled:
+ frame_buffer.append(np_observation)
+ if len(frame_buffer) < frame_stacking:
+ pad_count = frame_stacking - len(frame_buffer)
if zero_frame is None:
zero_frame = np.zeros_like(np_observation, dtype=np.float32)
- fb_padded = [zero_frame] * pad_count + fb
+ fb_padded = [zero_frame] * pad_count + list(frame_buffer)
else:
- fb_padded = fb
+ fb_padded = list(frame_buffer)
stacked_observations = np.concatenate(fb_padded, axis=1)
observations = stacked_observations.reshape(
1, stacked_observations.shape[0], stacked_observations.shape[1]
if inference_masking:
action_masks_param["action_masks"] = ReforceXY.get_action_masks(
- self.can_short, virtual_position
+ self.can_short, action_mask_position
+ )
+
+ if self.recurrent:
+ action, lstm_states = model.predict(
+ observations,
+ state=lstm_states,
+ episode_start=episode_start,
+ deterministic=True,
+ **action_masks_param,
+ )
+ episode_start[:] = False
+ else:
+ action, _ = model.predict(
+ observations, deterministic=True, **action_masks_param
)
- action, _ = model.predict(
- observations, deterministic=True, **action_masks_param
- )
return int(action)
predicted_actions: List[int] = []
pad_count = max(0, n - len(predicted_actions))
actions_list = ([np.nan] * pad_count) + predicted_actions
- actions = DataFrame({"action": actions_list}, index=dataframe.index)
+ actions_df = DataFrame({"action": actions_list}, index=dataframe.index)
- return DataFrame({label: actions["action"] for label in dk.label_list})
+ return DataFrame({label: actions_df["action"] for label in dk.label_list})
@staticmethod
def study_delete(study_name: str, storage: BaseStorage) -> None:
return train_env, eval_env
+ def get_optuna_params(self, trial: Trial) -> Dict[str, Any]:
+ if "RecurrentPPO" in self.model_type:
+ return sample_params_recurrentppo(trial)
+ elif "PPO" in self.model_type:
+ return sample_params_ppo(trial)
+ elif "QRDQN" in self.model_type:
+ return sample_params_qrdqn(trial)
+ elif "DQN" in self.model_type:
+ return sample_params_dqn(trial)
+ else:
+ raise NotImplementedError(f"{self.model_type} not supported for hyperopt")
+
def objective(
self, trial: Trial, dk: FreqaiDataKitchen, total_timesteps: int
) -> float:
"""
- Defines a single trial for hyperparameter optimization using Optuna
+ Objective function for Optuna trials hyperparameter optimization
"""
logger.info("------------ Hyperopt trial %d ------------", trial.number)
+ params = self.get_optuna_params(trial)
+
if "PPO" in self.model_type:
- params = sample_params_ppo(trial, self.n_envs)
- n_steps = params.get("n_steps", 0)
+ n_steps = params.get("n_steps")
if n_steps * self.n_envs > total_timesteps:
raise TrialPruned(
f"{n_steps=} * n_envs={self.n_envs}={n_steps * self.n_envs} is greater than {total_timesteps=}"
)
- elif "QRDQN" in self.model_type:
- params = sample_params_qrdqn(trial)
- elif "DQN" in self.model_type:
- params = sample_params_dqn(trial)
- else:
- raise NotImplementedError
+ batch_size = params.get("batch_size")
+ if (n_steps * self.n_envs) % batch_size != 0:
+ raise TrialPruned(
+ f"{n_steps=} * {self.n_envs=} = {n_steps * self.n_envs} is not divisible by {batch_size=}"
+ )
if "DQN" in self.model_type:
gradient_steps = params.get("gradient_steps")
raise TrialPruned(
f"{batch_size=} * {gradient_steps=}={batch_size * gradient_steps} is greater than {buffer_size=}"
)
+ learning_starts = params.get("learning_starts")
+ if learning_starts > buffer_size:
+ raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}")
# Ensure that the sampled parameters take precedence
params = deepmerge(self.get_model_params(), params)
train_freq: Optional[Union[TrainFreq, int, Tuple[int, ...], List[int]]],
) -> Optional[int]:
train_freq_val: Optional[int] = None
- if isinstance(train_freq, TrainFreq):
+ if isinstance(train_freq, TrainFreq) and hasattr(train_freq, "frequency"):
if isinstance(train_freq.frequency, int):
train_freq_val = train_freq.frequency
elif isinstance(train_freq, (tuple, list)) and train_freq:
)
if getattr(self.model, "target_kl", None) is not None:
hparam_dict["target_kl"] = float(self.model.target_kl)
+ if "RecurrentPPO" in self.model.__class__.__name__:
+ policy = getattr(self.model, "policy", None)
+ if policy is not None:
+ lstm_actor = getattr(policy, "lstm_actor", None)
+ if lstm_actor is not None:
+ hparam_dict.update(
+ {
+ "lstm_hidden_size": int(lstm_actor.hidden_size),
+ "n_lstm_layers": int(lstm_actor.num_layers),
+ }
+ )
if "DQN" in self.model.__class__.__name__:
hparam_dict.update(
{
logger_exclude = ("stdout", "log", "json", "csv")
- def _is_numeric_non_bool(x: Any) -> bool:
+ def _is_number(x: Any) -> bool:
return isinstance(
x, (int, float, np.integer, np.floating)
) and not isinstance(x, bool)
def _is_finite_number(x: Any) -> bool:
- if not _is_numeric_non_bool(x):
+ if not _is_number(x):
return False
try:
return np.isfinite(float(x))
continue
if _is_finite_number(v):
numeric_acc[k].append(float(v))
- elif _is_numeric_non_bool(v):
+ elif _is_number(v):
filtered_values += 1
else:
non_numeric_counts[k][v] += 1
def get_optimizer_class(
- optimizer_class_name: Literal["adam", "adamw"],
+ optimizer_class_name: Literal["adam", "adamw", "rmsprop"],
) -> Type[th.optim.Optimizer]:
"""
Get optimizer class
return {
"adam": th.optim.Adam,
"adamw": th.optim.AdamW,
+ "rmsprop": th.optim.RMSprop,
}.get(optimizer_class_name, th.optim.Adam)
)
if optuna_params.get("target_kl") is not None:
model_params["target_kl"] = float(optuna_params.get("target_kl"))
+ if "RecurrentPPO" in model_type:
+ policy_kwargs["lstm_hidden_size"] = int(
+ optuna_params.get("lstm_hidden_size")
+ )
+ policy_kwargs["n_lstm_layers"] = int(optuna_params.get("n_lstm_layers"))
elif "DQN" in model_type:
required_dqn_params = [
"gamma",
PPO_N_STEPS: Tuple[int, ...] = (512, 1024, 2048, 4096)
-def sample_params_ppo(trial: Trial, n_envs: int) -> Dict[str, Any]:
+def get_common_ppo_optuna_params(trial: Trial) -> Dict[str, Any]:
+ return {
+ "n_steps": trial.suggest_categorical("n_steps", list(PPO_N_STEPS)),
+ "batch_size": trial.suggest_categorical(
+ "batch_size", [64, 128, 256, 512, 1024]
+ ),
+ "gamma": trial.suggest_categorical(
+ "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999]
+ ),
+ "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
+ "ent_coef": trial.suggest_float("ent_coef", 0.0005, 0.03, log=True),
+ "clip_range": trial.suggest_float("clip_range", 0.1, 0.4, step=0.05),
+ "n_epochs": trial.suggest_int("n_epochs", 1, 5),
+ "gae_lambda": trial.suggest_float("gae_lambda", 0.9, 0.99, step=0.01),
+ "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0, step=0.05),
+ "vf_coef": trial.suggest_float("vf_coef", 0.0, 1.0, step=0.05),
+ "lr_schedule": trial.suggest_categorical("lr_schedule", ["linear", "constant"]),
+ "cr_schedule": trial.suggest_categorical("cr_schedule", ["linear", "constant"]),
+ "target_kl": trial.suggest_categorical(
+ "target_kl", [None, 0.01, 0.015, 0.02, 0.03, 0.04]
+ ),
+ "ortho_init": trial.suggest_categorical("ortho_init", [True, False]),
+ "net_arch": trial.suggest_categorical(
+ "net_arch", ["small", "medium", "large", "extra_large"]
+ ),
+ "activation_fn": trial.suggest_categorical(
+ "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
+ ),
+ "optimizer_class": trial.suggest_categorical(
+ "optimizer_class", ["adamw", "rmsprop"]
+ ),
+ }
+
+
+def sample_params_ppo(trial: Trial) -> Dict[str, Any]:
"""
Sampler for PPO hyperparams
"""
- n_steps = trial.suggest_categorical("n_steps", list(PPO_N_STEPS))
- batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512, 1024])
- if (n_steps * n_envs) % batch_size != 0:
- raise TrialPruned(
- f"{n_steps=} * {n_envs=} = {n_steps * n_envs} is not divisible by {batch_size=}"
- )
return convert_optuna_params_to_model_params(
- "PPO",
+ "PPO", get_common_ppo_optuna_params(trial)
+ )
+
+
+def sample_params_recurrentppo(trial: Trial) -> Dict[str, Any]:
+ """
+ Sampler for RecurrentPPO hyperparams
+ """
+ ppo_optuna_params = get_common_ppo_optuna_params(trial)
+ ppo_optuna_params.update(
{
- "n_steps": n_steps,
- "batch_size": batch_size,
- "gamma": trial.suggest_categorical(
- "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999]
- ),
- "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
- "ent_coef": trial.suggest_float("ent_coef", 0.0005, 0.03, log=True),
- "clip_range": trial.suggest_float("clip_range", 0.1, 0.4, step=0.05),
- "n_epochs": trial.suggest_categorical("n_epochs", [1, 2, 3, 4, 5]),
- "gae_lambda": trial.suggest_float("gae_lambda", 0.9, 0.99, step=0.01),
- "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0, step=0.05),
- "vf_coef": trial.suggest_float("vf_coef", 0.0, 1.0, step=0.05),
- "lr_schedule": trial.suggest_categorical(
- "lr_schedule", ["linear", "constant"]
- ),
- "cr_schedule": trial.suggest_categorical(
- "cr_schedule", ["linear", "constant"]
+ "lstm_hidden_size": trial.suggest_categorical(
+ "lstm_hidden_size", [64, 128, 256, 512]
),
- "target_kl": trial.suggest_categorical(
- "target_kl", [None, 0.01, 0.015, 0.02, 0.03, 0.04]
- ),
- "ortho_init": trial.suggest_categorical("ortho_init", [False, True]),
- "net_arch": trial.suggest_categorical(
- "net_arch", ["small", "medium", "large", "extra_large"]
- ),
- "activation_fn": trial.suggest_categorical(
- "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
- ),
- "optimizer_class": trial.suggest_categorical("optimizer_class", ["adamw"]),
- },
+ "n_lstm_layers": trial.suggest_int("n_lstm_layers", 1, 2),
+ }
)
+ return convert_optuna_params_to_model_params("RecurrentPPO", ppo_optuna_params)
def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]:
min_fraction = 0.15
else:
min_fraction = 0.05
- exploration_fraction = trial.suggest_float(
- "exploration_fraction", min_fraction, 0.9, step=0.02
- )
- buffer_size = trial.suggest_categorical(
- "buffer_size", [int(1e4), int(5e4), int(1e5), int(2e5)]
- )
- learning_starts = trial.suggest_categorical(
- "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000]
- )
- if learning_starts > buffer_size:
- raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}")
return {
"train_freq": trial.suggest_categorical(
"train_freq", [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]
),
"learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
"lr_schedule": trial.suggest_categorical("lr_schedule", ["linear", "constant"]),
- "buffer_size": buffer_size,
+ "buffer_size": trial.suggest_categorical(
+ "buffer_size", [int(1e4), int(5e4), int(1e5), int(2e5)]
+ ),
"exploration_initial_eps": exploration_initial_eps,
"exploration_final_eps": exploration_final_eps,
- "exploration_fraction": exploration_fraction,
+ "exploration_fraction": trial.suggest_float(
+ "exploration_fraction", min_fraction, 0.9, step=0.02
+ ),
"target_update_interval": trial.suggest_categorical(
"target_update_interval", [1000, 2000, 5000, 7500, 10000]
),
- "learning_starts": learning_starts,
+ "learning_starts": trial.suggest_categorical(
+ "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000]
+ ),
"net_arch": trial.suggest_categorical(
"net_arch", ["small", "medium", "large", "extra_large"]
),
"activation_fn": trial.suggest_categorical(
"activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
),
- "optimizer_class": trial.suggest_categorical("optimizer_class", ["adam"]),
+ "optimizer_class": trial.suggest_categorical(
+ "optimizer_class", ["adamw", "rmsprop"]
+ ),
}