ttzzs's picture
Deploy Chronos2 Forecasting API v3.0.0 with new SOLID architecture
c40c447 verified
raw
history blame
7.65 kB
"""
Constructor de DataFrames para series temporales.
Este módulo implementa la interfaz IDataTransformer,
aplicando ISP (Interface Segregation Principle).
"""
from typing import List, Optional, Dict, Any
import pandas as pd
from app.domain.interfaces.data_transformer import IDataTransformer
from app.utils.timestamp_generator import TimestampGenerator
from app.utils.logger import setup_logger
logger = setup_logger(__name__)
class DataFrameBuilder(IDataTransformer):
"""
Constructor de DataFrames para modelos de forecasting.
Implementa IDataTransformer para construir DataFrames en el
formato esperado por Chronos2Pipeline.
Formato de salida:
- id: Identificador de la serie
- timestamp: Timestamp o índice
- target: Valor de la serie
"""
def __init__(self):
"""Inicializa el builder con un generador de timestamps."""
self.timestamp_gen = TimestampGenerator()
logger.debug("DataFrameBuilder initialized")
def build_context_df(
self,
values: List[float],
timestamps: Optional[List[str]] = None,
series_id: str = "series_0",
freq: str = "D"
) -> pd.DataFrame:
"""
Construye DataFrame de contexto para forecasting.
Args:
values: Lista de valores de la serie
timestamps: Lista de timestamps (opcional, se generan si no se proveen)
series_id: Identificador de la serie
freq: Frecuencia de la serie
Returns:
DataFrame con columnas [id, timestamp, target]
Raises:
ValueError: Si timestamps y values tienen diferente longitud
Example:
>>> builder = DataFrameBuilder()
>>> df = builder.build_context_df([100, 102, 105], series_id="sales")
>>> df.columns.tolist()
['id', 'timestamp', 'target']
"""
n = len(values)
if n == 0:
raise ValueError("values no puede estar vacío")
# Generar timestamps si no se proveen
if timestamps is None:
logger.debug(f"Generating {n} integer timestamps")
timestamps = self.timestamp_gen.generate_integer_index(n)
elif len(timestamps) != n:
raise ValueError(
f"timestamps ({len(timestamps)}) y values ({n}) "
f"deben tener la misma longitud"
)
# Construir DataFrame
df = pd.DataFrame({
"id": [series_id] * n,
"timestamp": timestamps,
"target": values
})
logger.debug(
f"Built context DataFrame: {len(df)} rows, "
f"series_id='{series_id}'"
)
return df
def parse_prediction_result(
self,
pred_df: pd.DataFrame,
quantile_levels: List[float]
) -> Dict[str, Any]:
"""
Parsea el resultado de predicción a formato estándar.
Args:
pred_df: DataFrame de predicción con columnas:
[id, timestamp, predictions, q1, q2, ...]
quantile_levels: Lista de cuantiles solicitados
Returns:
Diccionario con:
- timestamps: List[str]
- median: List[float]
- quantiles: Dict[str, List[float]]
Example:
>>> result = builder.parse_prediction_result(pred_df, [0.1, 0.5, 0.9])
>>> result.keys()
dict_keys(['timestamps', 'median', 'quantiles'])
"""
# Validar que el DataFrame tiene las columnas necesarias
if "timestamp" not in pred_df.columns:
raise ValueError("pred_df debe tener columna 'timestamp'")
if "predictions" not in pred_df.columns:
raise ValueError("pred_df debe tener columna 'predictions'")
# Extraer timestamps y median
timestamps = pred_df["timestamp"].astype(str).tolist()
median = pred_df["predictions"].astype(float).tolist()
# Extraer cuantiles
quantiles = {}
for q in quantile_levels:
# Formatear key como string (ej: 0.1 -> "0.1")
key = f"{q:.3g}"
if key in pred_df.columns:
quantiles[key] = pred_df[key].astype(float).tolist()
else:
logger.warning(
f"Quantile {key} no encontrado en pred_df. "
f"Columnas disponibles: {pred_df.columns.tolist()}"
)
result = {
"timestamps": timestamps,
"median": median,
"quantiles": quantiles
}
logger.debug(
f"Parsed prediction: {len(timestamps)} timestamps, "
f"{len(quantiles)} quantiles"
)
return result
def build_multi_series_df(
self,
series_dict: Dict[str, List[float]],
timestamps_dict: Optional[Dict[str, List[str]]] = None,
freq: str = "D"
) -> pd.DataFrame:
"""
Construye DataFrame con múltiples series.
Args:
series_dict: Diccionario {series_id: [values]}
timestamps_dict: Diccionario {series_id: [timestamps]} (opcional)
freq: Frecuencia de las series
Returns:
DataFrame combinado con todas las series
Example:
>>> builder = DataFrameBuilder()
>>> series = {"sales": [100, 102], "revenue": [200, 205]}
>>> df = builder.build_multi_series_df(series)
>>> df["id"].unique().tolist()
['sales', 'revenue']
"""
dfs = []
for series_id, values in series_dict.items():
# Obtener timestamps para esta serie
timestamps = None
if timestamps_dict and series_id in timestamps_dict:
timestamps = timestamps_dict[series_id]
# Construir DataFrame individual
df = self.build_context_df(
values=values,
timestamps=timestamps,
series_id=series_id,
freq=freq
)
dfs.append(df)
# Combinar todos los DataFrames
result = pd.concat(dfs, ignore_index=True)
logger.debug(
f"Built multi-series DataFrame: {len(series_dict)} series, "
f"{len(result)} total rows"
)
return result
def validate_context_df(self, df: pd.DataFrame) -> bool:
"""
Valida que un DataFrame tenga el formato correcto.
Args:
df: DataFrame a validar
Returns:
True si es válido
Raises:
ValueError: Si el formato es incorrecto
"""
required_cols = {"id", "timestamp", "target"}
if not required_cols.issubset(df.columns):
raise ValueError(
f"DataFrame debe tener columnas: {required_cols}. "
f"Encontradas: {set(df.columns)}"
)
if len(df) == 0:
raise ValueError("DataFrame no puede estar vacío")
# Validar que target sea numérico
if not pd.api.types.is_numeric_dtype(df["target"]):
raise ValueError("Columna 'target' debe ser numérica")
logger.debug("DataFrame validation passed")
return True