Spaces:
Build error
Build error
| """ | |
| Caso de uso para Detección de Anomalías. | |
| Implementa la lógica de aplicación para detectar anomalías | |
| en series temporales usando pronósticos probabilísticos. | |
| """ | |
| from app.domain.services.anomaly_service import AnomalyService | |
| from app.domain.models.time_series import TimeSeries | |
| from app.domain.models.forecast_config import ForecastConfig | |
| from app.application.dtos.anomaly_dtos import ( | |
| AnomalyDetectionInputDTO, | |
| AnomalyDetectionOutputDTO, | |
| AnomalyPointDTO | |
| ) | |
| from app.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class DetectAnomaliesUseCase: | |
| """ | |
| Caso de uso: Detección de Anomalías. | |
| Responsabilidad: Detectar anomalías comparando valores observados | |
| con pronósticos probabilísticos. | |
| """ | |
| def __init__(self, anomaly_service: AnomalyService): | |
| """ | |
| Inicializa el caso de uso. | |
| Args: | |
| anomaly_service: Servicio de dominio para detección de anomalías | |
| """ | |
| self.anomaly_service = anomaly_service | |
| logger.info("DetectAnomaliesUseCase initialized") | |
| def execute(self, input_dto: AnomalyDetectionInputDTO) -> AnomalyDetectionOutputDTO: | |
| """ | |
| Ejecuta el caso de uso de detección de anomalías. | |
| Args: | |
| input_dto: Datos de entrada con contexto y valores recientes | |
| Returns: | |
| AnomalyDetectionOutputDTO: Puntos de anomalía detectados | |
| Raises: | |
| ValueError: Si los datos son inválidos | |
| RuntimeError: Si falla la detección | |
| """ | |
| logger.info( | |
| f"Detecting anomalies: {len(input_dto.context_values)} context points, " | |
| f"{len(input_dto.recent_values)} recent points" | |
| ) | |
| # Validar entrada | |
| input_dto.validate() | |
| # Convertir DTO a modelos de dominio | |
| context = TimeSeries( | |
| values=input_dto.context_values, | |
| timestamps=input_dto.context_timestamps, | |
| freq=input_dto.freq | |
| ) | |
| config = ForecastConfig( | |
| prediction_length=len(input_dto.recent_values), | |
| quantile_levels=[input_dto.quantile_low, 0.5, input_dto.quantile_high], | |
| freq=input_dto.freq | |
| ) | |
| # Ejecutar servicio de dominio | |
| try: | |
| anomaly_points = self.anomaly_service.detect_anomalies( | |
| context=context, | |
| recent_observed=input_dto.recent_values, | |
| config=config, | |
| quantile_low=input_dto.quantile_low, | |
| quantile_high=input_dto.quantile_high | |
| ) | |
| logger.info(f"Anomaly detection completed") | |
| except Exception as e: | |
| logger.error(f"Anomaly detection failed: {e}", exc_info=True) | |
| raise RuntimeError(f"Anomaly detection failed: {str(e)}") from e | |
| # Convertir a DTOs y calcular severidad | |
| anomaly_dtos = [] | |
| for ap in anomaly_points: | |
| severity = self._calculate_severity(ap.z_score, ap.is_anomaly) | |
| dto = AnomalyPointDTO( | |
| index=ap.index, | |
| value=ap.value, | |
| expected=ap.expected, | |
| lower_bound=ap.lower_bound, | |
| upper_bound=ap.upper_bound, | |
| is_anomaly=ap.is_anomaly, | |
| z_score=ap.z_score, | |
| severity=severity | |
| ) | |
| anomaly_dtos.append(dto) | |
| # Calcular estadísticas | |
| anomaly_count = sum(1 for a in anomaly_dtos if a.is_anomaly) | |
| total_points = len(anomaly_dtos) | |
| anomaly_rate = anomaly_count / total_points if total_points > 0 else 0.0 | |
| # Crear resumen | |
| summary = self._create_summary(anomaly_dtos, input_dto) | |
| logger.info( | |
| f"Anomalies detected: {anomaly_count}/{total_points} " | |
| f"({anomaly_rate*100:.1f}%)" | |
| ) | |
| # Crear DTO de salida | |
| output_dto = AnomalyDetectionOutputDTO( | |
| anomalies=anomaly_dtos, | |
| total_points=total_points, | |
| anomaly_count=anomaly_count, | |
| anomaly_rate=anomaly_rate, | |
| summary=summary | |
| ) | |
| return output_dto | |
| def _calculate_severity(self, z_score: float, is_anomaly: bool) -> str: | |
| """ | |
| Calcula la severidad de una anomalía basándose en el z-score. | |
| Args: | |
| z_score: Puntuación Z | |
| is_anomaly: Si es anomalía | |
| Returns: | |
| str: Nivel de severidad (normal, low, medium, high) | |
| """ | |
| if not is_anomaly: | |
| return "normal" | |
| if z_score < 1.5: | |
| return "low" | |
| elif z_score < 2.5: | |
| return "medium" | |
| else: | |
| return "high" | |
| def _create_summary( | |
| self, | |
| anomaly_dtos: list, | |
| input_dto: AnomalyDetectionInputDTO | |
| ) -> dict: | |
| """ | |
| Crea un resumen de la detección de anomalías. | |
| Args: | |
| anomaly_dtos: Lista de anomalías detectadas | |
| input_dto: Datos de entrada originales | |
| Returns: | |
| dict: Resumen con estadísticas | |
| """ | |
| anomalies_only = [a for a in anomaly_dtos if a.is_anomaly] | |
| if not anomalies_only: | |
| return { | |
| "has_anomalies": False, | |
| "severity_distribution": {"normal": len(anomaly_dtos)}, | |
| "max_z_score": 0.0, | |
| "avg_deviation": 0.0 | |
| } | |
| # Distribución por severidad | |
| severity_dist = { | |
| "normal": sum(1 for a in anomaly_dtos if a.severity == "normal"), | |
| "low": sum(1 for a in anomaly_dtos if a.severity == "low"), | |
| "medium": sum(1 for a in anomaly_dtos if a.severity == "medium"), | |
| "high": sum(1 for a in anomaly_dtos if a.severity == "high") | |
| } | |
| # Estadísticas de anomalías | |
| max_z_score = max(a.z_score for a in anomalies_only) | |
| avg_deviation = sum( | |
| abs(a.value - a.expected) for a in anomalies_only | |
| ) / len(anomalies_only) | |
| return { | |
| "has_anomalies": True, | |
| "severity_distribution": severity_dist, | |
| "max_z_score": round(max_z_score, 2), | |
| "avg_deviation": round(avg_deviation, 2), | |
| "quantile_range": { | |
| "low": input_dto.quantile_low, | |
| "high": input_dto.quantile_high | |
| } | |
| } | |