Spaces:
Build error
Build error
| import os | |
| from typing import List, Dict, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel, Field | |
| from chronos import Chronos2Pipeline | |
| # ========================= | |
| # Configuración del modelo | |
| # ========================= | |
| MODEL_ID = os.getenv("CHRONOS_MODEL_ID", "amazon/chronos-2") | |
| DEVICE_MAP = os.getenv("DEVICE_MAP", "cpu") # "cpu" o "cuda" | |
| app = FastAPI( | |
| title="Chronos-2 Universal Forecasting API + Excel Add-in", | |
| description=( | |
| "Servidor para pronósticos con Chronos-2: univariante, " | |
| "multivariante, covariables, escenarios, anomalías y backtesting. " | |
| "Incluye Excel Add-in v2.1.0 con archivos estáticos." | |
| ), | |
| version="2.1.0", | |
| ) | |
| # Configurar CORS para Excel Add-in | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "https://localhost:3001", | |
| "https://localhost:3000", | |
| "https://ttzzs-chronos2-excel-forecasting-api.hf.space", | |
| "*" # Permitir todos los orígenes para Office Add-ins | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Carga única del modelo al iniciar el proceso | |
| pipeline = Chronos2Pipeline.from_pretrained(MODEL_ID, device_map=DEVICE_MAP) | |
| # ========================= | |
| # Archivos estáticos para Excel Add-in | |
| # ========================= | |
| # Montar directorios estáticos si existen | |
| if os.path.exists("static"): | |
| app.mount("/assets", StaticFiles(directory="static/assets"), name="assets") | |
| app.mount("/taskpane", StaticFiles(directory="static/taskpane"), name="taskpane") | |
| app.mount("/commands", StaticFiles(directory="static/commands"), name="commands") | |
| # Endpoint para manifest.xml | |
| async def get_manifest(): | |
| """Devuelve el manifest.xml del Excel Add-in""" | |
| return FileResponse("static/manifest.xml", media_type="application/xml") | |
| async def root_with_addon(): | |
| """Información del API + Add-in""" | |
| return { | |
| "name": "Chronos-2 Forecasting API", | |
| "version": "2.1.0", | |
| "model": MODEL_ID, | |
| "endpoints": { | |
| "api": [ | |
| "/health", | |
| "/forecast_univariate", | |
| "/forecast_multi_id", | |
| "/forecast_with_covariates", | |
| "/forecast_multivariate", | |
| "/forecast_scenarios", | |
| "/detect_anomalies", | |
| "/backtest_simple" | |
| ], | |
| "add_in": [ | |
| "/manifest.xml", | |
| "/taskpane/taskpane.html", | |
| "/assets/icon-*.png" | |
| ] | |
| }, | |
| "docs": "/docs", | |
| "excel_add_in": { | |
| "manifest_url": "https://ttzzs-chronos2-excel-forecasting-api.hf.space/manifest.xml", | |
| "version": "2.1.0", | |
| "features": [ | |
| "Univariate Forecast", | |
| "Multi-Series Forecast", | |
| "Forecast with Covariates", | |
| "Scenario Analysis", | |
| "Multivariate Forecast", | |
| "Anomaly Detection", | |
| "Backtest" | |
| ] | |
| } | |
| } | |
| else: | |
| async def root_api_only(): | |
| """Información del API (sin Add-in)""" | |
| return { | |
| "name": "Chronos-2 Forecasting API", | |
| "version": "2.1.0", | |
| "model": MODEL_ID, | |
| "docs": "/docs" | |
| } | |
| # ========================= | |
| # Modelos Pydantic comunes | |
| # ========================= | |
| class BaseForecastConfig(BaseModel): | |
| prediction_length: int = Field( | |
| 7, description="Horizonte de predicción (número de pasos futuros)" | |
| ) | |
| quantile_levels: List[float] = Field( | |
| default_factory=lambda: [0.1, 0.5, 0.9], | |
| description="Cuantiles para el pronóstico probabilístico", | |
| ) | |
| start_timestamp: Optional[str] = Field( | |
| default=None, | |
| description=( | |
| "Fecha/hora inicial del histórico (formato ISO). " | |
| "Si no se especifica, se usan índices enteros." | |
| ), | |
| ) | |
| freq: str = Field( | |
| "D", | |
| description="Frecuencia temporal (p.ej. 'D' diario, 'H' horario, 'W' semanal...).", | |
| ) | |
| class UnivariateSeries(BaseModel): | |
| values: List[float] | |
| class MultiSeriesItem(BaseModel): | |
| series_id: str | |
| values: List[float] | |
| class CovariatePoint(BaseModel): | |
| """ | |
| Punto temporal usado tanto para contexto (histórico) como para covariables futuras. | |
| """ | |
| timestamp: Optional[str] = None # opcional si se usan índices enteros | |
| id: Optional[str] = None # id de serie, por defecto 'series_0' | |
| target: Optional[float] = None # valor de la variable objetivo (histórico) | |
| covariates: Dict[str, float] = Field( | |
| default_factory=dict, | |
| description="Nombre -> valor de cada covariable dinámica.", | |
| ) | |
| # ========================= | |
| # 1) Healthcheck | |
| # ========================= | |
| def health(): | |
| """ | |
| Devuelve información básica del estado del servidor y el modelo cargado. | |
| """ | |
| return { | |
| "status": "ok", | |
| "model_id": MODEL_ID, | |
| "device_map": DEVICE_MAP, | |
| } | |
| # ========================= | |
| # 2) Pronóstico univariante | |
| # ========================= | |
| class ForecastUnivariateRequest(BaseForecastConfig): | |
| series: UnivariateSeries | |
| class ForecastUnivariateResponse(BaseModel): | |
| timestamps: List[str] | |
| median: List[float] | |
| quantiles: Dict[str, List[float]] # "0.1" -> [..], "0.9" -> [..] | |
| def forecast_univariate(req: ForecastUnivariateRequest): | |
| """ | |
| Pronóstico para una sola serie temporal (univariante, sin covariables). | |
| Pensado para uso directo desde Excel u otras herramientas sencillas. | |
| """ | |
| values = req.series.values | |
| n = len(values) | |
| if n == 0: | |
| raise HTTPException(status_code=400, detail="La serie no puede estar vacía.") | |
| # Construimos contexto como DataFrame largo (id, timestamp, target) | |
| if req.start_timestamp: | |
| timestamps = pd.date_range( | |
| start=pd.to_datetime(req.start_timestamp), | |
| periods=n, | |
| freq=req.freq, | |
| ) | |
| else: | |
| timestamps = pd.RangeIndex(start=0, stop=n, step=1) | |
| context_df = pd.DataFrame( | |
| { | |
| "id": ["series_0"] * n, | |
| "timestamp": timestamps, | |
| "target": values, | |
| } | |
| ) | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| prediction_length=req.prediction_length, | |
| quantile_levels=req.quantile_levels, | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target="target", | |
| ) | |
| pred_df = pred_df.sort_values("timestamp") | |
| timestamps_out = pred_df["timestamp"].astype(str).tolist() | |
| median = pred_df["predictions"].astype(float).tolist() | |
| quantiles_dict: Dict[str, List[float]] = {} | |
| for q in req.quantile_levels: | |
| key = f"{q:.3g}" | |
| if key in pred_df.columns: | |
| quantiles_dict[key] = pred_df[key].astype(float).tolist() | |
| return ForecastUnivariateResponse( | |
| timestamps=timestamps_out, | |
| median=median, | |
| quantiles=quantiles_dict, | |
| ) | |
| # ========================= | |
| # 3) Multi-serie (multi-id) | |
| # ========================= | |
| class ForecastMultiSeriesRequest(BaseForecastConfig): | |
| series_list: List[MultiSeriesItem] | |
| class SeriesForecast(BaseModel): | |
| series_id: str | |
| timestamps: List[str] | |
| median: List[float] | |
| quantiles: Dict[str, List[float]] | |
| class ForecastMultiSeriesResponse(BaseModel): | |
| forecasts: List[SeriesForecast] | |
| def forecast_multi_id(req: ForecastMultiSeriesRequest): | |
| """ | |
| Pronóstico para múltiples series (por ejemplo, varios SKU o tiendas). | |
| """ | |
| if not req.series_list: | |
| raise HTTPException(status_code=400, detail="Debes enviar al menos una serie.") | |
| frames = [] | |
| for item in req.series_list: | |
| n = len(item.values) | |
| if n == 0: | |
| continue | |
| if req.start_timestamp: | |
| timestamps = pd.date_range( | |
| start=pd.to_datetime(req.start_timestamp), | |
| periods=n, | |
| freq=req.freq, | |
| ) | |
| else: | |
| timestamps = pd.RangeIndex(start=0, stop=n, step=1) | |
| frames.append( | |
| pd.DataFrame( | |
| { | |
| "id": [item.series_id] * n, | |
| "timestamp": timestamps, | |
| "target": item.values, | |
| } | |
| ) | |
| ) | |
| if not frames: | |
| raise HTTPException(status_code=400, detail="Todas las series están vacías.") | |
| context_df = pd.concat(frames, ignore_index=True) | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| prediction_length=req.prediction_length, | |
| quantile_levels=req.quantile_levels, | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target="target", | |
| ) | |
| forecasts: List[SeriesForecast] = [] | |
| for series_id, group in pred_df.groupby("id"): | |
| group = group.sort_values("timestamp") | |
| timestamps_out = group["timestamp"].astype(str).tolist() | |
| median = group["predictions"].astype(float).tolist() | |
| quantiles_dict: Dict[str, List[float]] = {} | |
| for q in req.quantile_levels: | |
| key = f"{q:.3g}" | |
| if key in group.columns: | |
| quantiles_dict[key] = group[key].astype(float).tolist() | |
| forecasts.append( | |
| SeriesForecast( | |
| series_id=series_id, | |
| timestamps=timestamps_out, | |
| median=median, | |
| quantiles=quantiles_dict, | |
| ) | |
| ) | |
| return ForecastMultiSeriesResponse(forecasts=forecasts) | |
| # ========================= | |
| # 4) Pronóstico con covariables | |
| # ========================= | |
| class ForecastWithCovariatesRequest(BaseForecastConfig): | |
| context: List[CovariatePoint] | |
| future: Optional[List[CovariatePoint]] = None | |
| class ForecastWithCovariatesResponse(BaseModel): | |
| # filas con todas las columnas de pred_df serializadas como string | |
| pred_df: List[Dict[str, str]] | |
| def forecast_with_covariates(req: ForecastWithCovariatesRequest): | |
| """ | |
| Pronóstico con información de covariables (promos, precio, clima...) tanto | |
| en el histórico (context) como en futuros posibles (future). | |
| """ | |
| if not req.context: | |
| raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.") | |
| ctx_rows = [] | |
| for p in req.context: | |
| if p.target is None: | |
| continue | |
| row = { | |
| "id": p.id or "series_0", | |
| "timestamp": p.timestamp, | |
| "target": p.target, | |
| } | |
| for k, v in p.covariates.items(): | |
| row[k] = v | |
| ctx_rows.append(row) | |
| context_df = pd.DataFrame(ctx_rows) | |
| if "timestamp" not in context_df or context_df["timestamp"].isna().any(): | |
| context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1) | |
| future_df = None | |
| if req.future: | |
| fut_rows = [] | |
| for p in req.future: | |
| row = { | |
| "id": p.id or "series_0", | |
| "timestamp": p.timestamp, | |
| } | |
| for k, v in p.covariates.items(): | |
| row[k] = v | |
| fut_rows.append(row) | |
| future_df = pd.DataFrame(fut_rows) | |
| if "timestamp" not in future_df or future_df["timestamp"].isna().any(): | |
| future_df["timestamp"] = pd.RangeIndex( | |
| start=context_df["timestamp"].max() + 1, | |
| stop=context_df["timestamp"].max() + 1 + len(future_df), | |
| step=1, | |
| ) | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| future_df=future_df, | |
| prediction_length=req.prediction_length, | |
| quantile_levels=req.quantile_levels, | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target="target", | |
| ) | |
| pred_df = pred_df.sort_values(["id", "timestamp"]) | |
| out_records: List[Dict[str, str]] = [] | |
| for _, row in pred_df.iterrows(): | |
| record = {k: str(v) for k, v in row.items()} | |
| out_records.append(record) | |
| return ForecastWithCovariatesResponse(pred_df=out_records) | |
| # ========================= | |
| # 5) Multivariante (varios targets) | |
| # ========================= | |
| class MultivariateContextPoint(BaseModel): | |
| timestamp: Optional[str] = None | |
| id: Optional[str] = None | |
| targets: Dict[str, float] # p.ej. {"demand": 100, "returns": 5} | |
| covariates: Dict[str, float] = Field(default_factory=dict) | |
| class ForecastMultivariateRequest(BaseForecastConfig): | |
| context: List[MultivariateContextPoint] | |
| target_columns: List[str] # nombres de columnas objetivo | |
| class ForecastMultivariateResponse(BaseModel): | |
| pred_df: List[Dict[str, str]] | |
| def forecast_multivariate(req: ForecastMultivariateRequest): | |
| """ | |
| Pronóstico multivariante: múltiples columnas objetivo (p.ej. demanda y devoluciones). | |
| """ | |
| if not req.context: | |
| raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.") | |
| if not req.target_columns: | |
| raise HTTPException(status_code=400, detail="Debes indicar columnas objetivo.") | |
| rows = [] | |
| for p in req.context: | |
| base = { | |
| "id": p.id or "series_0", | |
| "timestamp": p.timestamp, | |
| } | |
| for t_name, t_val in p.targets.items(): | |
| base[t_name] = t_val | |
| for k, v in p.covariates.items(): | |
| base[k] = v | |
| rows.append(base) | |
| context_df = pd.DataFrame(rows) | |
| if "timestamp" not in context_df or context_df["timestamp"].isna().any(): | |
| context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1) | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| prediction_length=req.prediction_length, | |
| quantile_levels=req.quantile_levels, | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target=req.target_columns, | |
| ) | |
| pred_df = pred_df.sort_values(["id", "timestamp"]) | |
| out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()] | |
| return ForecastMultivariateResponse(pred_df=out_records) | |
| # ========================= | |
| # 6) Escenarios (what-if) | |
| # ========================= | |
| class ScenarioDefinition(BaseModel): | |
| name: str | |
| future_covariates: List[CovariatePoint] | |
| class ScenarioForecast(BaseModel): | |
| name: str | |
| pred_df: List[Dict[str, str]] | |
| class ForecastScenariosRequest(BaseForecastConfig): | |
| context: List[CovariatePoint] | |
| scenarios: List[ScenarioDefinition] | |
| class ForecastScenariosResponse(BaseModel): | |
| scenarios: List[ScenarioForecast] | |
| def forecast_scenarios(req: ForecastScenariosRequest): | |
| """ | |
| Evaluación de múltiples escenarios (what-if) cambiando las covariables futuras | |
| (por ejemplo, promo ON/OFF, diferentes precios, etc.). | |
| """ | |
| if not req.context: | |
| raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.") | |
| if not req.scenarios: | |
| raise HTTPException(status_code=400, detail="Debes definir al menos un escenario.") | |
| ctx_rows = [] | |
| for p in req.context: | |
| if p.target is None: | |
| continue | |
| row = { | |
| "id": p.id or "series_0", | |
| "timestamp": p.timestamp, | |
| "target": p.target, | |
| } | |
| for k, v in p.covariates.items(): | |
| row[k] = v | |
| ctx_rows.append(row) | |
| context_df = pd.DataFrame(ctx_rows) | |
| if "timestamp" not in context_df or context_df["timestamp"].isna().any(): | |
| context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1) | |
| results: List[ScenarioForecast] = [] | |
| for scen in req.scenarios: | |
| fut_rows = [] | |
| for p in scen.future_covariates: | |
| row = { | |
| "id": p.id or "series_0", | |
| "timestamp": p.timestamp, | |
| } | |
| for k, v in p.covariates.items(): | |
| row[k] = v | |
| fut_rows.append(row) | |
| future_df = pd.DataFrame(fut_rows) | |
| if "timestamp" not in future_df or future_df["timestamp"].isna().any(): | |
| future_df["timestamp"] = pd.RangeIndex( | |
| start=context_df["timestamp"].max() + 1, | |
| stop=context_df["timestamp"].max() + 1 + len(future_df), | |
| step=1, | |
| ) | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| future_df=future_df, | |
| prediction_length=req.prediction_length, | |
| quantile_levels=req.quantile_levels, | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target="target", | |
| ) | |
| pred_df = pred_df.sort_values(["id", "timestamp"]) | |
| out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()] | |
| results.append(ScenarioForecast(name=scen.name, pred_df=out_records)) | |
| return ForecastScenariosResponse(scenarios=results) | |
| # ========================= | |
| # 7) Detección de anomalías | |
| # ========================= | |
| class AnomalyDetectionRequest(BaseModel): | |
| context: UnivariateSeries | |
| recent_observed: List[float] | |
| prediction_length: int = 7 | |
| quantile_low: float = 0.05 | |
| quantile_high: float = 0.95 | |
| class AnomalyPoint(BaseModel): | |
| index: int | |
| value: float | |
| predicted_median: float | |
| lower: float | |
| upper: float | |
| is_anomaly: bool | |
| class AnomalyDetectionResponse(BaseModel): | |
| anomalies: List[AnomalyPoint] | |
| def detect_anomalies(req: AnomalyDetectionRequest): | |
| """ | |
| Marca como anomalías los puntos observados recientes que caen fuera del | |
| intervalo [quantile_low, quantile_high] del pronóstico. | |
| """ | |
| n_hist = len(req.context.values) | |
| if n_hist == 0: | |
| raise HTTPException(status_code=400, detail="La serie histórica no puede estar vacía.") | |
| if len(req.recent_observed) != req.prediction_length: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="recent_observed debe tener la misma longitud que prediction_length.", | |
| ) | |
| context_df = pd.DataFrame( | |
| { | |
| "id": ["series_0"] * n_hist, | |
| "timestamp": pd.RangeIndex(start=0, stop=n_hist, step=1), | |
| "target": req.context.values, | |
| } | |
| ) | |
| quantiles = sorted({req.quantile_low, 0.5, req.quantile_high}) | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| prediction_length=req.prediction_length, | |
| quantile_levels=quantiles, | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target="target", | |
| ).sort_values("timestamp") | |
| q_low_col = f"{req.quantile_low:.3g}" | |
| q_high_col = f"{req.quantile_high:.3g}" | |
| anomalies: List[AnomalyPoint] = [] | |
| for i, (obs, (_, row)) in enumerate(zip(req.recent_observed, pred_df.iterrows())): | |
| lower = float(row[q_low_col]) | |
| upper = float(row[q_high_col]) | |
| median = float(row["predictions"]) | |
| is_anom = (obs < lower) or (obs > upper) | |
| anomalies.append( | |
| AnomalyPoint( | |
| index=i, | |
| value=obs, | |
| predicted_median=median, | |
| lower=lower, | |
| upper=upper, | |
| is_anomaly=is_anom, | |
| ) | |
| ) | |
| return AnomalyDetectionResponse(anomalies=anomalies) | |
| # ========================= | |
| # 8) Backtest simple | |
| # ========================= | |
| class BacktestRequest(BaseModel): | |
| series: UnivariateSeries | |
| prediction_length: int = 7 | |
| test_length: int = 28 | |
| class BacktestMetrics(BaseModel): | |
| mae: float | |
| mape: float | |
| wql: float # Weighted Quantile Loss aproximada para el cuantil 0.5 | |
| class BacktestResponse(BaseModel): | |
| metrics: BacktestMetrics | |
| forecast_median: List[float] | |
| forecast_timestamps: List[str] | |
| actuals: List[float] | |
| def backtest_simple(req: BacktestRequest): | |
| """ | |
| Backtest sencillo: separamos un tramo final de la serie como test, pronosticamos | |
| ese tramo y calculamos métricas MAE / MAPE / WQL. | |
| """ | |
| values = np.array(req.series.values, dtype=float) | |
| n = len(values) | |
| if n <= req.test_length: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="La serie debe ser más larga que test_length.", | |
| ) | |
| train = values[: n - req.test_length] | |
| test = values[n - req.test_length :] | |
| context_df = pd.DataFrame( | |
| { | |
| "id": ["series_0"] * len(train), | |
| "timestamp": pd.RangeIndex(start=0, stop=len(train), step=1), | |
| "target": train.tolist(), | |
| } | |
| ) | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| prediction_length=req.test_length, | |
| quantile_levels=[0.5], | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target="target", | |
| ).sort_values("timestamp") | |
| forecast = pred_df["predictions"].to_numpy(dtype=float) | |
| timestamps = pred_df["timestamp"].astype(str).tolist() | |
| mae = float(np.mean(np.abs(test - forecast))) | |
| eps = 1e-8 | |
| mape = float(np.mean(np.abs((test - forecast) / (test + eps)))) * 100.0 | |
| tau = 0.5 | |
| diff = test - forecast | |
| wql = float(np.mean(np.maximum(tau * diff, (tau - 1) * diff))) | |
| metrics = BacktestMetrics(mae=mae, mape=mape, wql=wql) | |
| return BacktestResponse( | |
| metrics=metrics, | |
| forecast_median=forecast.tolist(), | |
| forecast_timestamps=timestamps, | |
| actuals=test.tolist(), | |
| ) | |