File size: 7,649 Bytes
c40c447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Constructor de DataFrames para series temporales.

Este módulo implementa la interfaz IDataTransformer,
aplicando ISP (Interface Segregation Principle).
"""

from typing import List, Optional, Dict, Any
import pandas as pd

from app.domain.interfaces.data_transformer import IDataTransformer
from app.utils.timestamp_generator import TimestampGenerator
from app.utils.logger import setup_logger

logger = setup_logger(__name__)


class DataFrameBuilder(IDataTransformer):
    """
    Constructor de DataFrames para modelos de forecasting.
    
    Implementa IDataTransformer para construir DataFrames en el
    formato esperado por Chronos2Pipeline.
    
    Formato de salida:
        - id: Identificador de la serie
        - timestamp: Timestamp o índice
        - target: Valor de la serie
    """
    
    def __init__(self):
        """Inicializa el builder con un generador de timestamps."""
        self.timestamp_gen = TimestampGenerator()
        logger.debug("DataFrameBuilder initialized")
    
    def build_context_df(
        self,
        values: List[float],
        timestamps: Optional[List[str]] = None,
        series_id: str = "series_0",
        freq: str = "D"
    ) -> pd.DataFrame:
        """
        Construye DataFrame de contexto para forecasting.
        
        Args:
            values: Lista de valores de la serie
            timestamps: Lista de timestamps (opcional, se generan si no se proveen)
            series_id: Identificador de la serie
            freq: Frecuencia de la serie
            
        Returns:
            DataFrame con columnas [id, timestamp, target]
            
        Raises:
            ValueError: Si timestamps y values tienen diferente longitud
            
        Example:
            >>> builder = DataFrameBuilder()
            >>> df = builder.build_context_df([100, 102, 105], series_id="sales")
            >>> df.columns.tolist()
            ['id', 'timestamp', 'target']
        """
        n = len(values)
        
        if n == 0:
            raise ValueError("values no puede estar vacío")
        
        # Generar timestamps si no se proveen
        if timestamps is None:
            logger.debug(f"Generating {n} integer timestamps")
            timestamps = self.timestamp_gen.generate_integer_index(n)
        elif len(timestamps) != n:
            raise ValueError(
                f"timestamps ({len(timestamps)}) y values ({n}) "
                f"deben tener la misma longitud"
            )
        
        # Construir DataFrame
        df = pd.DataFrame({
            "id": [series_id] * n,
            "timestamp": timestamps,
            "target": values
        })
        
        logger.debug(
            f"Built context DataFrame: {len(df)} rows, "
            f"series_id='{series_id}'"
        )
        
        return df
    
    def parse_prediction_result(
        self,
        pred_df: pd.DataFrame,
        quantile_levels: List[float]
    ) -> Dict[str, Any]:
        """
        Parsea el resultado de predicción a formato estándar.
        
        Args:
            pred_df: DataFrame de predicción con columnas:
                     [id, timestamp, predictions, q1, q2, ...]
            quantile_levels: Lista de cuantiles solicitados
            
        Returns:
            Diccionario con:
                - timestamps: List[str]
                - median: List[float]
                - quantiles: Dict[str, List[float]]
                
        Example:
            >>> result = builder.parse_prediction_result(pred_df, [0.1, 0.5, 0.9])
            >>> result.keys()
            dict_keys(['timestamps', 'median', 'quantiles'])
        """
        # Validar que el DataFrame tiene las columnas necesarias
        if "timestamp" not in pred_df.columns:
            raise ValueError("pred_df debe tener columna 'timestamp'")
        if "predictions" not in pred_df.columns:
            raise ValueError("pred_df debe tener columna 'predictions'")
        
        # Extraer timestamps y median
        timestamps = pred_df["timestamp"].astype(str).tolist()
        median = pred_df["predictions"].astype(float).tolist()
        
        # Extraer cuantiles
        quantiles = {}
        for q in quantile_levels:
            # Formatear key como string (ej: 0.1 -> "0.1")
            key = f"{q:.3g}"
            
            if key in pred_df.columns:
                quantiles[key] = pred_df[key].astype(float).tolist()
            else:
                logger.warning(
                    f"Quantile {key} no encontrado en pred_df. "
                    f"Columnas disponibles: {pred_df.columns.tolist()}"
                )
        
        result = {
            "timestamps": timestamps,
            "median": median,
            "quantiles": quantiles
        }
        
        logger.debug(
            f"Parsed prediction: {len(timestamps)} timestamps, "
            f"{len(quantiles)} quantiles"
        )
        
        return result
    
    def build_multi_series_df(
        self,
        series_dict: Dict[str, List[float]],
        timestamps_dict: Optional[Dict[str, List[str]]] = None,
        freq: str = "D"
    ) -> pd.DataFrame:
        """
        Construye DataFrame con múltiples series.
        
        Args:
            series_dict: Diccionario {series_id: [values]}
            timestamps_dict: Diccionario {series_id: [timestamps]} (opcional)
            freq: Frecuencia de las series
            
        Returns:
            DataFrame combinado con todas las series
            
        Example:
            >>> builder = DataFrameBuilder()
            >>> series = {"sales": [100, 102], "revenue": [200, 205]}
            >>> df = builder.build_multi_series_df(series)
            >>> df["id"].unique().tolist()
            ['sales', 'revenue']
        """
        dfs = []
        
        for series_id, values in series_dict.items():
            # Obtener timestamps para esta serie
            timestamps = None
            if timestamps_dict and series_id in timestamps_dict:
                timestamps = timestamps_dict[series_id]
            
            # Construir DataFrame individual
            df = self.build_context_df(
                values=values,
                timestamps=timestamps,
                series_id=series_id,
                freq=freq
            )
            dfs.append(df)
        
        # Combinar todos los DataFrames
        result = pd.concat(dfs, ignore_index=True)
        
        logger.debug(
            f"Built multi-series DataFrame: {len(series_dict)} series, "
            f"{len(result)} total rows"
        )
        
        return result
    
    def validate_context_df(self, df: pd.DataFrame) -> bool:
        """
        Valida que un DataFrame tenga el formato correcto.
        
        Args:
            df: DataFrame a validar
            
        Returns:
            True si es válido
            
        Raises:
            ValueError: Si el formato es incorrecto
        """
        required_cols = {"id", "timestamp", "target"}
        
        if not required_cols.issubset(df.columns):
            raise ValueError(
                f"DataFrame debe tener columnas: {required_cols}. "
                f"Encontradas: {set(df.columns)}"
            )
        
        if len(df) == 0:
            raise ValueError("DataFrame no puede estar vacío")
        
        # Validar que target sea numérico
        if not pd.api.types.is_numeric_dtype(df["target"]):
            raise ValueError("Columna 'target' debe ser numérica")
        
        logger.debug("DataFrame validation passed")
        return True