Spaces:
Build error
Build error
| """ | |
| Servicio de dominio para detecci贸n de anomal铆as. | |
| Este servicio encapsula la l贸gica de detecci贸n de anomal铆as, | |
| cumpliendo con SRP y DIP. | |
| """ | |
| from typing import List | |
| from app.domain.interfaces.forecast_model import IForecastModel | |
| from app.domain.interfaces.data_transformer import IDataTransformer | |
| from app.domain.models.time_series import TimeSeries | |
| from app.domain.models.forecast_config import ForecastConfig | |
| from app.domain.models.anomaly import AnomalyPoint | |
| from app.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class AnomalyService: | |
| """ | |
| Servicio de dominio para detecci贸n de anomal铆as. | |
| Detecta puntos an贸malos comparando valores observados con | |
| pron贸sticos del modelo, usando intervalos de predicci贸n. | |
| Attributes: | |
| model: Modelo de forecasting | |
| transformer: Transformador de datos | |
| Example: | |
| >>> service = AnomalyService(model, transformer) | |
| >>> context = TimeSeries(values=[100, 102, 105, 103, 108]) | |
| >>> recent = [107, 200, 106] # 200 es anomal铆a | |
| >>> anomalies = service.detect_anomalies(context, recent, config) | |
| >>> sum(1 for a in anomalies if a.is_anomaly) | |
| 1 | |
| """ | |
| def __init__( | |
| self, | |
| model: IForecastModel, | |
| transformer: IDataTransformer | |
| ): | |
| """ | |
| Inicializa el servicio. | |
| Args: | |
| model: Implementaci贸n de IForecastModel | |
| transformer: Implementaci贸n de IDataTransformer | |
| """ | |
| self.model = model | |
| self.transformer = transformer | |
| logger.info("AnomalyService initialized") | |
| def detect_anomalies( | |
| self, | |
| context: TimeSeries, | |
| recent_observed: List[float], | |
| config: ForecastConfig, | |
| quantile_low: float = 0.05, | |
| quantile_high: float = 0.95 | |
| ) -> List[AnomalyPoint]: | |
| """ | |
| Detecta anomal铆as comparando observaciones con pron贸stico. | |
| Un punto se considera an贸malo si cae fuera del intervalo | |
| [quantile_low, quantile_high] del pron贸stico. | |
| Args: | |
| context: Serie temporal hist贸rica (contexto) | |
| recent_observed: Valores recientes a evaluar | |
| config: Configuraci贸n del forecast | |
| quantile_low: Cuantil inferior del intervalo (default: 0.05) | |
| quantile_high: Cuantil superior del intervalo (default: 0.95) | |
| Returns: | |
| List[AnomalyPoint]: Lista de puntos con indicador de anomal铆a | |
| Raises: | |
| ValueError: Si las longitudes no coinciden | |
| Example: | |
| >>> context = TimeSeries(values=[100, 102, 105]) | |
| >>> recent = [106, 250, 104] # 250 es anomal铆a | |
| >>> config = ForecastConfig(prediction_length=3) | |
| >>> anomalies = service.detect_anomalies(context, recent, config) | |
| >>> anomalies[1].is_anomaly | |
| True | |
| """ | |
| logger.info( | |
| f"Detecting anomalies in {len(recent_observed)} points " | |
| f"(interval: [{quantile_low}, {quantile_high}])" | |
| ) | |
| # Validar longitudes | |
| if len(recent_observed) != config.prediction_length: | |
| raise ValueError( | |
| f"recent_observed length ({len(recent_observed)}) must equal " | |
| f"prediction_length ({config.prediction_length})" | |
| ) | |
| # Preparar config con cuantiles necesarios | |
| quantiles = sorted(set([quantile_low, 0.5, quantile_high])) | |
| config_anomaly = ForecastConfig( | |
| prediction_length=config.prediction_length, | |
| quantile_levels=quantiles, | |
| freq=config.freq | |
| ) | |
| # Construir DataFrame de contexto | |
| context_df = self.transformer.build_context_df( | |
| values=context.values, | |
| timestamps=context.timestamps, | |
| series_id=context.series_id, | |
| freq=config.freq | |
| ) | |
| # Predecir | |
| pred_df = self.model.predict( | |
| context_df=context_df, | |
| prediction_length=config_anomaly.prediction_length, | |
| quantile_levels=config_anomaly.quantile_levels | |
| ) | |
| # Parsear resultado | |
| result = self.transformer.parse_prediction_result( | |
| pred_df=pred_df, | |
| quantile_levels=quantiles | |
| ) | |
| # Detectar anomal铆as | |
| anomalies = [] | |
| q_low_key = f"{quantile_low:.3g}" | |
| q_high_key = f"{quantile_high:.3g}" | |
| for i, obs in enumerate(recent_observed): | |
| expected = result["median"][i] | |
| lower = result["quantiles"][q_low_key][i] | |
| upper = result["quantiles"][q_high_key][i] | |
| # Verificar si est谩 fuera del intervalo | |
| is_anom = (obs < lower) or (obs > upper) | |
| # Calcular z-score aproximado | |
| spread = (upper - lower) / 2 | |
| z_score = abs(obs - expected) / (spread + 1e-8) if spread > 0 else 0 | |
| anomalies.append(AnomalyPoint( | |
| index=i, | |
| value=obs, | |
| expected=expected, | |
| lower_bound=lower, | |
| upper_bound=upper, | |
| is_anomaly=is_anom, | |
| z_score=z_score | |
| )) | |
| num_anomalies = sum(1 for a in anomalies if a.is_anomaly) | |
| logger.info( | |
| f"Anomaly detection completed: {num_anomalies}/{len(anomalies)} " | |
| "anomalies detected" | |
| ) | |
| return anomalies | |
| def get_anomaly_summary(self, anomalies: List[AnomalyPoint]) -> dict: | |
| """ | |
| Genera un resumen de las anomal铆as detectadas. | |
| Args: | |
| anomalies: Lista de anomal铆as | |
| Returns: | |
| Dict con estad铆sticas de las anomal铆as | |
| """ | |
| total = len(anomalies) | |
| detected = sum(1 for a in anomalies if a.is_anomaly) | |
| severities = {"low": 0, "medium": 0, "high": 0} | |
| for a in anomalies: | |
| if a.is_anomaly and a.severity: | |
| severities[a.severity] += 1 | |
| return { | |
| "total_points": total, | |
| "anomalies_detected": detected, | |
| "anomaly_rate": (detected / total * 100) if total > 0 else 0, | |
| "severities": severities, | |
| "max_deviation": max((a.deviation for a in anomalies), default=0), | |
| "max_z_score": max((abs(a.z_score) for a in anomalies), default=0) | |
| } | |