ttzzs's picture
Deploy Chronos2 Forecasting API v3.0.0 with new SOLID architecture
c40c447 verified
raw
history blame
2.22 kB
"""
Mapper para casos de uso de Detecci贸n de Anomal铆as.
Convierte entre API schemas (Pydantic) y DTOs de aplicaci贸n.
"""
from app.schemas.requests.anomaly import AnomalyDetectionRequest
from app.schemas.responses.anomaly import AnomalyDetectionResponse, AnomalyPoint
from app.application.dtos.anomaly_dtos import (
AnomalyDetectionInputDTO,
AnomalyDetectionOutputDTO,
AnomalyPointDTO
)
class AnomalyMapper:
"""
Mapper para convertir entre API schemas y DTOs de anomal铆as.
"""
@staticmethod
def to_input_dto(request: AnomalyDetectionRequest) -> AnomalyDetectionInputDTO:
"""
Convierte API request a DTO de entrada.
Args:
request: Request de la API
Returns:
AnomalyDetectionInputDTO: DTO para el caso de uso
"""
return AnomalyDetectionInputDTO(
context_values=request.context.values,
recent_values=request.recent_values,
quantile_low=request.quantile_low,
quantile_high=request.quantile_high,
context_timestamps=request.context.timestamps,
freq=request.freq
)
@staticmethod
def from_output_dto(dto: AnomalyDetectionOutputDTO) -> AnomalyDetectionResponse:
"""
Convierte DTO de salida a API response.
Args:
dto: DTO del caso de uso
Returns:
AnomalyDetectionResponse: Response para la API
"""
# Convertir cada punto de anomal铆a
anomaly_points = [
AnomalyPoint(
index=ap.index,
value=ap.value,
expected=ap.expected,
lower_bound=ap.lower_bound,
upper_bound=ap.upper_bound,
is_anomaly=ap.is_anomaly,
z_score=ap.z_score,
severity=ap.severity
)
for ap in dto.anomalies
]
return AnomalyDetectionResponse(
anomalies=anomaly_points,
total_points=dto.total_points,
anomaly_count=dto.anomaly_count,
anomaly_rate=dto.anomaly_rate,
summary=dto.summary
)