Spaces:
Build error
Build error
File size: 6,568 Bytes
c40c447 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
"""
Servicio de dominio para forecasting.
Este servicio orquesta la l贸gica de negocio de forecasting,
cumpliendo con SRP y DIP.
"""
from typing import List
from app.domain.interfaces.forecast_model import IForecastModel
from app.domain.interfaces.data_transformer import IDataTransformer
from app.domain.models.time_series import TimeSeries
from app.domain.models.forecast_config import ForecastConfig
from app.domain.models.forecast_result import ForecastResult
from app.utils.logger import setup_logger
logger = setup_logger(__name__)
class ForecastService:
"""
Servicio de dominio para operaciones de forecasting.
Este servicio encapsula la l贸gica de negocio para generar pron贸sticos,
dependiendo de abstracciones (IForecastModel, IDataTransformer) en lugar
de implementaciones concretas (DIP).
Attributes:
model: Modelo de forecasting (implementa IForecastModel)
transformer: Transformador de datos (implementa IDataTransformer)
Example:
>>> from app.infrastructure.ml.chronos_model import ChronosModel
>>> from app.utils.dataframe_builder import DataFrameBuilder
>>>
>>> model = ChronosModel("amazon/chronos-2")
>>> transformer = DataFrameBuilder()
>>> service = ForecastService(model, transformer)
>>>
>>> series = TimeSeries(values=[100, 102, 105])
>>> config = ForecastConfig(prediction_length=3)
>>> result = service.forecast_univariate(series, config)
"""
def __init__(
self,
model: IForecastModel,
transformer: IDataTransformer
):
"""
Inicializa el servicio con sus dependencias.
Args:
model: Implementaci贸n de IForecastModel
transformer: Implementaci贸n de IDataTransformer
"""
self.model = model
self.transformer = transformer
model_info = self.model.get_model_info()
logger.info(
f"ForecastService initialized with model: {model_info.get('type', 'unknown')}"
)
def forecast_univariate(
self,
series: TimeSeries,
config: ForecastConfig
) -> ForecastResult:
"""
Genera pron贸stico para una serie univariada.
Args:
series: Serie temporal a pronosticar
config: Configuraci贸n del forecast
Returns:
ForecastResult: Resultado con pron贸sticos
Raises:
ValueError: Si la serie o configuraci贸n son inv谩lidas
RuntimeError: Si el modelo falla al predecir
Example:
>>> series = TimeSeries(values=[100, 102, 105, 103, 108])
>>> config = ForecastConfig(prediction_length=3)
>>> result = service.forecast_univariate(series, config)
>>> len(result.median)
3
"""
logger.info(
f"Forecasting univariate series '{series.series_id}' "
f"(length={series.length}, horizon={config.prediction_length})"
)
# Validar entrada
series.validate()
config.validate()
# Transformar serie a DataFrame
context_df = self.transformer.build_context_df(
values=series.values,
timestamps=series.timestamps,
series_id=series.series_id,
freq=config.freq
)
logger.debug(f"Context DataFrame shape: {context_df.shape}")
# Validar DataFrame
self.model.validate_context(context_df)
# Predecir
try:
pred_df = self.model.predict(
context_df=context_df,
prediction_length=config.prediction_length,
quantile_levels=config.quantile_levels
)
except Exception as e:
logger.error(f"Model prediction failed: {e}", exc_info=True)
raise RuntimeError(f"Error al predecir: {e}") from e
logger.debug(f"Prediction DataFrame shape: {pred_df.shape}")
# Parsear resultado
result_dict = self.transformer.parse_prediction_result(
pred_df=pred_df,
quantile_levels=config.quantile_levels
)
# Crear ForecastResult
result = ForecastResult(
timestamps=result_dict["timestamps"],
median=result_dict["median"],
quantiles=result_dict["quantiles"],
series_id=series.series_id,
metadata={
"prediction_length": config.prediction_length,
"quantile_levels": config.quantile_levels,
"freq": config.freq,
"model": self.model.get_model_info()
}
)
logger.info(
f"Forecast completed: {result.length} periods generated "
f"for series '{series.series_id}'"
)
return result
def forecast_multi_series(
self,
series_list: List[TimeSeries],
config: ForecastConfig
) -> List[ForecastResult]:
"""
Genera pron贸sticos para m煤ltiples series.
Args:
series_list: Lista de series temporales
config: Configuraci贸n del forecast (misma para todas)
Returns:
List[ForecastResult]: Lista de resultados (uno por serie)
Example:
>>> series1 = TimeSeries(values=[100, 102], series_id="A")
>>> series2 = TimeSeries(values=[200, 205], series_id="B")
>>> results = service.forecast_multi_series([series1, series2], config)
>>> len(results)
2
"""
logger.info(f"Forecasting {len(series_list)} series")
if not series_list:
raise ValueError("series_list no puede estar vac铆a")
results = []
for i, series in enumerate(series_list):
logger.debug(f"Processing series {i+1}/{len(series_list)}: {series.series_id}")
try:
result = self.forecast_univariate(series, config)
results.append(result)
except Exception as e:
logger.error(
f"Failed to forecast series '{series.series_id}': {e}",
exc_info=True
)
raise
logger.info(f"Multi-series forecast completed: {len(results)} series processed")
return results
|