Spaces:
Build error
Build error
File size: 2,221 Bytes
c40c447 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
"""
Mapper para casos de uso de Detección de Anomalías.
Convierte entre API schemas (Pydantic) y DTOs de aplicación.
"""
from app.schemas.requests.anomaly import AnomalyDetectionRequest
from app.schemas.responses.anomaly import AnomalyDetectionResponse, AnomalyPoint
from app.application.dtos.anomaly_dtos import (
AnomalyDetectionInputDTO,
AnomalyDetectionOutputDTO,
AnomalyPointDTO
)
class AnomalyMapper:
"""
Mapper para convertir entre API schemas y DTOs de anomalías.
"""
@staticmethod
def to_input_dto(request: AnomalyDetectionRequest) -> AnomalyDetectionInputDTO:
"""
Convierte API request a DTO de entrada.
Args:
request: Request de la API
Returns:
AnomalyDetectionInputDTO: DTO para el caso de uso
"""
return AnomalyDetectionInputDTO(
context_values=request.context.values,
recent_values=request.recent_values,
quantile_low=request.quantile_low,
quantile_high=request.quantile_high,
context_timestamps=request.context.timestamps,
freq=request.freq
)
@staticmethod
def from_output_dto(dto: AnomalyDetectionOutputDTO) -> AnomalyDetectionResponse:
"""
Convierte DTO de salida a API response.
Args:
dto: DTO del caso de uso
Returns:
AnomalyDetectionResponse: Response para la API
"""
# Convertir cada punto de anomalía
anomaly_points = [
AnomalyPoint(
index=ap.index,
value=ap.value,
expected=ap.expected,
lower_bound=ap.lower_bound,
upper_bound=ap.upper_bound,
is_anomaly=ap.is_anomaly,
z_score=ap.z_score,
severity=ap.severity
)
for ap in dto.anomalies
]
return AnomalyDetectionResponse(
anomalies=anomaly_points,
total_points=dto.total_points,
anomaly_count=dto.anomaly_count,
anomaly_rate=dto.anomaly_rate,
summary=dto.summary
)
|