Spaces:
Build error
Build error
| """ | |
| Casos de uso para Forecasting. | |
| Implementan la l贸gica de aplicaci贸n para pron贸sticos, | |
| orquestando servicios de dominio y transformando DTOs. | |
| """ | |
| from typing import List | |
| from app.domain.services.forecast_service import ForecastService | |
| from app.domain.models.time_series import TimeSeries | |
| from app.domain.models.forecast_config import ForecastConfig | |
| from app.application.dtos.forecast_dtos import ( | |
| ForecastInputDTO, | |
| ForecastOutputDTO, | |
| MultiForecastInputDTO, | |
| MultiForecastOutputDTO, | |
| SeriesInputDTO | |
| ) | |
| from app.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class ForecastUnivariateUseCase: | |
| """ | |
| Caso de uso: Pron贸stico Univariado. | |
| Responsabilidad: Ejecutar pron贸stico para una serie temporal 煤nica. | |
| """ | |
| def __init__(self, forecast_service: ForecastService): | |
| """ | |
| Inicializa el caso de uso. | |
| Args: | |
| forecast_service: Servicio de dominio para forecasting | |
| """ | |
| self.forecast_service = forecast_service | |
| logger.info("ForecastUnivariateUseCase initialized") | |
| def execute(self, input_dto: ForecastInputDTO) -> ForecastOutputDTO: | |
| """ | |
| Ejecuta el caso de uso. | |
| Args: | |
| input_dto: Datos de entrada validados | |
| Returns: | |
| ForecastOutputDTO: Resultado del pron贸stico | |
| Raises: | |
| ValueError: Si los datos son inv谩lidos | |
| RuntimeError: Si falla el pron贸stico | |
| """ | |
| logger.info(f"Executing forecast for series: {input_dto.series_id}") | |
| # Validar entrada | |
| input_dto.validate() | |
| # Convertir DTO a modelos de dominio | |
| series = TimeSeries( | |
| values=input_dto.values, | |
| timestamps=input_dto.timestamps, | |
| series_id=input_dto.series_id, | |
| freq=input_dto.freq | |
| ) | |
| config = ForecastConfig( | |
| prediction_length=input_dto.prediction_length, | |
| quantile_levels=input_dto.quantile_levels, | |
| freq=input_dto.freq | |
| ) | |
| # Ejecutar servicio de dominio | |
| try: | |
| result = self.forecast_service.forecast_univariate(series, config) | |
| logger.info(f"Forecast completed: {len(result.timestamps)} periods") | |
| except Exception as e: | |
| logger.error(f"Forecast failed: {e}", exc_info=True) | |
| raise RuntimeError(f"Forecast execution failed: {str(e)}") from e | |
| # Convertir resultado a DTO | |
| output_dto = ForecastOutputDTO( | |
| timestamps=result.timestamps, | |
| median=result.median, | |
| quantiles=result.quantiles, | |
| series_id=result.series_id, | |
| metadata={ | |
| "prediction_length": config.prediction_length, | |
| "freq": config.freq, | |
| "context_length": len(series.values) | |
| } | |
| ) | |
| return output_dto | |
| class ForecastMultiSeriesUseCase: | |
| """ | |
| Caso de uso: Pron贸stico Multi-Series. | |
| Responsabilidad: Ejecutar pron贸sticos para m煤ltiples series. | |
| """ | |
| def __init__(self, forecast_service: ForecastService): | |
| """ | |
| Inicializa el caso de uso. | |
| Args: | |
| forecast_service: Servicio de dominio para forecasting | |
| """ | |
| self.forecast_service = forecast_service | |
| logger.info("ForecastMultiSeriesUseCase initialized") | |
| def execute(self, input_dto: MultiForecastInputDTO) -> MultiForecastOutputDTO: | |
| """ | |
| Ejecuta el caso de uso para m煤ltiples series. | |
| Args: | |
| input_dto: Datos de entrada con m煤ltiples series | |
| Returns: | |
| MultiForecastOutputDTO: Resultados de todos los pron贸sticos | |
| """ | |
| logger.info(f"Executing forecast for {len(input_dto.series_list)} series") | |
| # Validar entrada | |
| input_dto.validate() | |
| # Configuraci贸n compartida | |
| config = ForecastConfig( | |
| prediction_length=input_dto.prediction_length, | |
| quantile_levels=input_dto.quantile_levels, | |
| freq=input_dto.freq | |
| ) | |
| # Convertir DTOs a modelos de dominio | |
| time_series_list: List[TimeSeries] = [] | |
| for series_dto in input_dto.series_list: | |
| series = TimeSeries( | |
| values=series_dto.values, | |
| timestamps=series_dto.timestamps, | |
| series_id=series_dto.series_id, | |
| freq=input_dto.freq | |
| ) | |
| time_series_list.append(series) | |
| # Ejecutar servicio de dominio | |
| results = [] | |
| successful = 0 | |
| failed = 0 | |
| for ts in time_series_list: | |
| try: | |
| result = self.forecast_service.forecast_univariate(ts, config) | |
| output_dto = ForecastOutputDTO( | |
| timestamps=result.timestamps, | |
| median=result.median, | |
| quantiles=result.quantiles, | |
| series_id=result.series_id, | |
| metadata={ | |
| "prediction_length": config.prediction_length, | |
| "freq": config.freq, | |
| "context_length": len(ts.values) | |
| } | |
| ) | |
| results.append(output_dto) | |
| successful += 1 | |
| except Exception as e: | |
| logger.error(f"Forecast failed for series {ts.series_id}: {e}") | |
| failed += 1 | |
| # Continuar con las siguientes series | |
| logger.info(f"Multi-series forecast completed: {successful} successful, {failed} failed") | |
| # Crear DTO de salida | |
| multi_output = MultiForecastOutputDTO( | |
| results=results, | |
| total_series=len(input_dto.series_list), | |
| successful=successful, | |
| failed=failed | |
| ) | |
| return multi_output | |