""" Constructor de DataFrames para series temporales. Este módulo implementa la interfaz IDataTransformer, aplicando ISP (Interface Segregation Principle). """ from typing import List, Optional, Dict, Any import pandas as pd from app.domain.interfaces.data_transformer import IDataTransformer from app.utils.timestamp_generator import TimestampGenerator from app.utils.logger import setup_logger logger = setup_logger(__name__) class DataFrameBuilder(IDataTransformer): """ Constructor de DataFrames para modelos de forecasting. Implementa IDataTransformer para construir DataFrames en el formato esperado por Chronos2Pipeline. Formato de salida: - id: Identificador de la serie - timestamp: Timestamp o índice - target: Valor de la serie """ def __init__(self): """Inicializa el builder con un generador de timestamps.""" self.timestamp_gen = TimestampGenerator() logger.debug("DataFrameBuilder initialized") def build_context_df( self, values: List[float], timestamps: Optional[List[str]] = None, series_id: str = "series_0", freq: str = "D" ) -> pd.DataFrame: """ Construye DataFrame de contexto para forecasting. Args: values: Lista de valores de la serie timestamps: Lista de timestamps (opcional, se generan si no se proveen) series_id: Identificador de la serie freq: Frecuencia de la serie Returns: DataFrame con columnas [id, timestamp, target] Raises: ValueError: Si timestamps y values tienen diferente longitud Example: >>> builder = DataFrameBuilder() >>> df = builder.build_context_df([100, 102, 105], series_id="sales") >>> df.columns.tolist() ['id', 'timestamp', 'target'] """ n = len(values) if n == 0: raise ValueError("values no puede estar vacío") # Generar timestamps si no se proveen if timestamps is None: logger.debug(f"Generating {n} integer timestamps") timestamps = self.timestamp_gen.generate_integer_index(n) elif len(timestamps) != n: raise ValueError( f"timestamps ({len(timestamps)}) y values ({n}) " f"deben tener la misma longitud" ) # Construir DataFrame df = pd.DataFrame({ "id": [series_id] * n, "timestamp": timestamps, "target": values }) logger.debug( f"Built context DataFrame: {len(df)} rows, " f"series_id='{series_id}'" ) return df def parse_prediction_result( self, pred_df: pd.DataFrame, quantile_levels: List[float] ) -> Dict[str, Any]: """ Parsea el resultado de predicción a formato estándar. Args: pred_df: DataFrame de predicción con columnas: [id, timestamp, predictions, q1, q2, ...] quantile_levels: Lista de cuantiles solicitados Returns: Diccionario con: - timestamps: List[str] - median: List[float] - quantiles: Dict[str, List[float]] Example: >>> result = builder.parse_prediction_result(pred_df, [0.1, 0.5, 0.9]) >>> result.keys() dict_keys(['timestamps', 'median', 'quantiles']) """ # Validar que el DataFrame tiene las columnas necesarias if "timestamp" not in pred_df.columns: raise ValueError("pred_df debe tener columna 'timestamp'") if "predictions" not in pred_df.columns: raise ValueError("pred_df debe tener columna 'predictions'") # Extraer timestamps y median timestamps = pred_df["timestamp"].astype(str).tolist() median = pred_df["predictions"].astype(float).tolist() # Extraer cuantiles quantiles = {} for q in quantile_levels: # Formatear key como string (ej: 0.1 -> "0.1") key = f"{q:.3g}" if key in pred_df.columns: quantiles[key] = pred_df[key].astype(float).tolist() else: logger.warning( f"Quantile {key} no encontrado en pred_df. " f"Columnas disponibles: {pred_df.columns.tolist()}" ) result = { "timestamps": timestamps, "median": median, "quantiles": quantiles } logger.debug( f"Parsed prediction: {len(timestamps)} timestamps, " f"{len(quantiles)} quantiles" ) return result def build_multi_series_df( self, series_dict: Dict[str, List[float]], timestamps_dict: Optional[Dict[str, List[str]]] = None, freq: str = "D" ) -> pd.DataFrame: """ Construye DataFrame con múltiples series. Args: series_dict: Diccionario {series_id: [values]} timestamps_dict: Diccionario {series_id: [timestamps]} (opcional) freq: Frecuencia de las series Returns: DataFrame combinado con todas las series Example: >>> builder = DataFrameBuilder() >>> series = {"sales": [100, 102], "revenue": [200, 205]} >>> df = builder.build_multi_series_df(series) >>> df["id"].unique().tolist() ['sales', 'revenue'] """ dfs = [] for series_id, values in series_dict.items(): # Obtener timestamps para esta serie timestamps = None if timestamps_dict and series_id in timestamps_dict: timestamps = timestamps_dict[series_id] # Construir DataFrame individual df = self.build_context_df( values=values, timestamps=timestamps, series_id=series_id, freq=freq ) dfs.append(df) # Combinar todos los DataFrames result = pd.concat(dfs, ignore_index=True) logger.debug( f"Built multi-series DataFrame: {len(series_dict)} series, " f"{len(result)} total rows" ) return result def validate_context_df(self, df: pd.DataFrame) -> bool: """ Valida que un DataFrame tenga el formato correcto. Args: df: DataFrame a validar Returns: True si es válido Raises: ValueError: Si el formato es incorrecto """ required_cols = {"id", "timestamp", "target"} if not required_cols.issubset(df.columns): raise ValueError( f"DataFrame debe tener columnas: {required_cols}. " f"Encontradas: {set(df.columns)}" ) if len(df) == 0: raise ValueError("DataFrame no puede estar vacío") # Validar que target sea numérico if not pd.api.types.is_numeric_dtype(df["target"]): raise ValueError("Columna 'target' debe ser numérica") logger.debug("DataFrame validation passed") return True