ttzzs's picture
Deploy Chronos2 Forecasting API v3.0.0 with new SOLID architecture
c40c447 verified
"""
Mapper para casos de uso de Detecci贸n de Anomal铆as.
Convierte entre API schemas (Pydantic) y DTOs de aplicaci贸n.
"""
from app.schemas.requests.anomaly import AnomalyDetectionRequest
from app.schemas.responses.anomaly import AnomalyDetectionResponse, AnomalyPoint
from app.application.dtos.anomaly_dtos import (
AnomalyDetectionInputDTO,
AnomalyDetectionOutputDTO,
AnomalyPointDTO
)
class AnomalyMapper:
"""
Mapper para convertir entre API schemas y DTOs de anomal铆as.
"""
@staticmethod
def to_input_dto(request: AnomalyDetectionRequest) -> AnomalyDetectionInputDTO:
"""
Convierte API request a DTO de entrada.
Args:
request: Request de la API
Returns:
AnomalyDetectionInputDTO: DTO para el caso de uso
"""
return AnomalyDetectionInputDTO(
context_values=request.context.values,
recent_values=request.recent_values,
quantile_low=request.quantile_low,
quantile_high=request.quantile_high,
context_timestamps=request.context.timestamps,
freq=request.freq
)
@staticmethod
def from_output_dto(dto: AnomalyDetectionOutputDTO) -> AnomalyDetectionResponse:
"""
Convierte DTO de salida a API response.
Args:
dto: DTO del caso de uso
Returns:
AnomalyDetectionResponse: Response para la API
"""
# Convertir cada punto de anomal铆a
anomaly_points = [
AnomalyPoint(
index=ap.index,
value=ap.value,
expected=ap.expected,
lower_bound=ap.lower_bound,
upper_bound=ap.upper_bound,
is_anomaly=ap.is_anomaly,
z_score=ap.z_score,
severity=ap.severity
)
for ap in dto.anomalies
]
return AnomalyDetectionResponse(
anomalies=anomaly_points,
total_points=dto.total_points,
anomaly_count=dto.anomaly_count,
anomaly_rate=dto.anomaly_rate,
summary=dto.summary
)