3v324v23's picture
Add Excel Add-in v2.1.0 with static files
d9dc826
import os
from typing import List, Dict, Optional
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from chronos import Chronos2Pipeline
# =========================
# Configuración del modelo
# =========================
MODEL_ID = os.getenv("CHRONOS_MODEL_ID", "amazon/chronos-2")
DEVICE_MAP = os.getenv("DEVICE_MAP", "cpu") # "cpu" o "cuda"
app = FastAPI(
title="Chronos-2 Universal Forecasting API + Excel Add-in",
description=(
"Servidor para pronósticos con Chronos-2: univariante, "
"multivariante, covariables, escenarios, anomalías y backtesting. "
"Incluye Excel Add-in v2.1.0 con archivos estáticos."
),
version="2.1.0",
)
# Configurar CORS para Excel Add-in
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://localhost:3001",
"https://localhost:3000",
"https://ttzzs-chronos2-excel-forecasting-api.hf.space",
"*" # Permitir todos los orígenes para Office Add-ins
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Carga única del modelo al iniciar el proceso
pipeline = Chronos2Pipeline.from_pretrained(MODEL_ID, device_map=DEVICE_MAP)
# =========================
# Archivos estáticos para Excel Add-in
# =========================
# Montar directorios estáticos si existen
if os.path.exists("static"):
app.mount("/assets", StaticFiles(directory="static/assets"), name="assets")
app.mount("/taskpane", StaticFiles(directory="static/taskpane"), name="taskpane")
app.mount("/commands", StaticFiles(directory="static/commands"), name="commands")
# Endpoint para manifest.xml
@app.get("/manifest.xml", response_class=FileResponse)
async def get_manifest():
"""Devuelve el manifest.xml del Excel Add-in"""
return FileResponse("static/manifest.xml", media_type="application/xml")
@app.get("/", tags=["Info"])
async def root_with_addon():
"""Información del API + Add-in"""
return {
"name": "Chronos-2 Forecasting API",
"version": "2.1.0",
"model": MODEL_ID,
"endpoints": {
"api": [
"/health",
"/forecast_univariate",
"/forecast_multi_id",
"/forecast_with_covariates",
"/forecast_multivariate",
"/forecast_scenarios",
"/detect_anomalies",
"/backtest_simple"
],
"add_in": [
"/manifest.xml",
"/taskpane/taskpane.html",
"/assets/icon-*.png"
]
},
"docs": "/docs",
"excel_add_in": {
"manifest_url": "https://ttzzs-chronos2-excel-forecasting-api.hf.space/manifest.xml",
"version": "2.1.0",
"features": [
"Univariate Forecast",
"Multi-Series Forecast",
"Forecast with Covariates",
"Scenario Analysis",
"Multivariate Forecast",
"Anomaly Detection",
"Backtest"
]
}
}
else:
@app.get("/", tags=["Info"])
async def root_api_only():
"""Información del API (sin Add-in)"""
return {
"name": "Chronos-2 Forecasting API",
"version": "2.1.0",
"model": MODEL_ID,
"docs": "/docs"
}
# =========================
# Modelos Pydantic comunes
# =========================
class BaseForecastConfig(BaseModel):
prediction_length: int = Field(
7, description="Horizonte de predicción (número de pasos futuros)"
)
quantile_levels: List[float] = Field(
default_factory=lambda: [0.1, 0.5, 0.9],
description="Cuantiles para el pronóstico probabilístico",
)
start_timestamp: Optional[str] = Field(
default=None,
description=(
"Fecha/hora inicial del histórico (formato ISO). "
"Si no se especifica, se usan índices enteros."
),
)
freq: str = Field(
"D",
description="Frecuencia temporal (p.ej. 'D' diario, 'H' horario, 'W' semanal...).",
)
class UnivariateSeries(BaseModel):
values: List[float]
class MultiSeriesItem(BaseModel):
series_id: str
values: List[float]
class CovariatePoint(BaseModel):
"""
Punto temporal usado tanto para contexto (histórico) como para covariables futuras.
"""
timestamp: Optional[str] = None # opcional si se usan índices enteros
id: Optional[str] = None # id de serie, por defecto 'series_0'
target: Optional[float] = None # valor de la variable objetivo (histórico)
covariates: Dict[str, float] = Field(
default_factory=dict,
description="Nombre -> valor de cada covariable dinámica.",
)
# =========================
# 1) Healthcheck
# =========================
@app.get("/health")
def health():
"""
Devuelve información básica del estado del servidor y el modelo cargado.
"""
return {
"status": "ok",
"model_id": MODEL_ID,
"device_map": DEVICE_MAP,
}
# =========================
# 2) Pronóstico univariante
# =========================
class ForecastUnivariateRequest(BaseForecastConfig):
series: UnivariateSeries
class ForecastUnivariateResponse(BaseModel):
timestamps: List[str]
median: List[float]
quantiles: Dict[str, List[float]] # "0.1" -> [..], "0.9" -> [..]
@app.post("/forecast_univariate", response_model=ForecastUnivariateResponse)
def forecast_univariate(req: ForecastUnivariateRequest):
"""
Pronóstico para una sola serie temporal (univariante, sin covariables).
Pensado para uso directo desde Excel u otras herramientas sencillas.
"""
values = req.series.values
n = len(values)
if n == 0:
raise HTTPException(status_code=400, detail="La serie no puede estar vacía.")
# Construimos contexto como DataFrame largo (id, timestamp, target)
if req.start_timestamp:
timestamps = pd.date_range(
start=pd.to_datetime(req.start_timestamp),
periods=n,
freq=req.freq,
)
else:
timestamps = pd.RangeIndex(start=0, stop=n, step=1)
context_df = pd.DataFrame(
{
"id": ["series_0"] * n,
"timestamp": timestamps,
"target": values,
}
)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
pred_df = pred_df.sort_values("timestamp")
timestamps_out = pred_df["timestamp"].astype(str).tolist()
median = pred_df["predictions"].astype(float).tolist()
quantiles_dict: Dict[str, List[float]] = {}
for q in req.quantile_levels:
key = f"{q:.3g}"
if key in pred_df.columns:
quantiles_dict[key] = pred_df[key].astype(float).tolist()
return ForecastUnivariateResponse(
timestamps=timestamps_out,
median=median,
quantiles=quantiles_dict,
)
# =========================
# 3) Multi-serie (multi-id)
# =========================
class ForecastMultiSeriesRequest(BaseForecastConfig):
series_list: List[MultiSeriesItem]
class SeriesForecast(BaseModel):
series_id: str
timestamps: List[str]
median: List[float]
quantiles: Dict[str, List[float]]
class ForecastMultiSeriesResponse(BaseModel):
forecasts: List[SeriesForecast]
@app.post("/forecast_multi_id", response_model=ForecastMultiSeriesResponse)
def forecast_multi_id(req: ForecastMultiSeriesRequest):
"""
Pronóstico para múltiples series (por ejemplo, varios SKU o tiendas).
"""
if not req.series_list:
raise HTTPException(status_code=400, detail="Debes enviar al menos una serie.")
frames = []
for item in req.series_list:
n = len(item.values)
if n == 0:
continue
if req.start_timestamp:
timestamps = pd.date_range(
start=pd.to_datetime(req.start_timestamp),
periods=n,
freq=req.freq,
)
else:
timestamps = pd.RangeIndex(start=0, stop=n, step=1)
frames.append(
pd.DataFrame(
{
"id": [item.series_id] * n,
"timestamp": timestamps,
"target": item.values,
}
)
)
if not frames:
raise HTTPException(status_code=400, detail="Todas las series están vacías.")
context_df = pd.concat(frames, ignore_index=True)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
forecasts: List[SeriesForecast] = []
for series_id, group in pred_df.groupby("id"):
group = group.sort_values("timestamp")
timestamps_out = group["timestamp"].astype(str).tolist()
median = group["predictions"].astype(float).tolist()
quantiles_dict: Dict[str, List[float]] = {}
for q in req.quantile_levels:
key = f"{q:.3g}"
if key in group.columns:
quantiles_dict[key] = group[key].astype(float).tolist()
forecasts.append(
SeriesForecast(
series_id=series_id,
timestamps=timestamps_out,
median=median,
quantiles=quantiles_dict,
)
)
return ForecastMultiSeriesResponse(forecasts=forecasts)
# =========================
# 4) Pronóstico con covariables
# =========================
class ForecastWithCovariatesRequest(BaseForecastConfig):
context: List[CovariatePoint]
future: Optional[List[CovariatePoint]] = None
class ForecastWithCovariatesResponse(BaseModel):
# filas con todas las columnas de pred_df serializadas como string
pred_df: List[Dict[str, str]]
@app.post("/forecast_with_covariates", response_model=ForecastWithCovariatesResponse)
def forecast_with_covariates(req: ForecastWithCovariatesRequest):
"""
Pronóstico con información de covariables (promos, precio, clima...) tanto
en el histórico (context) como en futuros posibles (future).
"""
if not req.context:
raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.")
ctx_rows = []
for p in req.context:
if p.target is None:
continue
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
"target": p.target,
}
for k, v in p.covariates.items():
row[k] = v
ctx_rows.append(row)
context_df = pd.DataFrame(ctx_rows)
if "timestamp" not in context_df or context_df["timestamp"].isna().any():
context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1)
future_df = None
if req.future:
fut_rows = []
for p in req.future:
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
}
for k, v in p.covariates.items():
row[k] = v
fut_rows.append(row)
future_df = pd.DataFrame(fut_rows)
if "timestamp" not in future_df or future_df["timestamp"].isna().any():
future_df["timestamp"] = pd.RangeIndex(
start=context_df["timestamp"].max() + 1,
stop=context_df["timestamp"].max() + 1 + len(future_df),
step=1,
)
pred_df = pipeline.predict_df(
context_df,
future_df=future_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
pred_df = pred_df.sort_values(["id", "timestamp"])
out_records: List[Dict[str, str]] = []
for _, row in pred_df.iterrows():
record = {k: str(v) for k, v in row.items()}
out_records.append(record)
return ForecastWithCovariatesResponse(pred_df=out_records)
# =========================
# 5) Multivariante (varios targets)
# =========================
class MultivariateContextPoint(BaseModel):
timestamp: Optional[str] = None
id: Optional[str] = None
targets: Dict[str, float] # p.ej. {"demand": 100, "returns": 5}
covariates: Dict[str, float] = Field(default_factory=dict)
class ForecastMultivariateRequest(BaseForecastConfig):
context: List[MultivariateContextPoint]
target_columns: List[str] # nombres de columnas objetivo
class ForecastMultivariateResponse(BaseModel):
pred_df: List[Dict[str, str]]
@app.post("/forecast_multivariate", response_model=ForecastMultivariateResponse)
def forecast_multivariate(req: ForecastMultivariateRequest):
"""
Pronóstico multivariante: múltiples columnas objetivo (p.ej. demanda y devoluciones).
"""
if not req.context:
raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.")
if not req.target_columns:
raise HTTPException(status_code=400, detail="Debes indicar columnas objetivo.")
rows = []
for p in req.context:
base = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
}
for t_name, t_val in p.targets.items():
base[t_name] = t_val
for k, v in p.covariates.items():
base[k] = v
rows.append(base)
context_df = pd.DataFrame(rows)
if "timestamp" not in context_df or context_df["timestamp"].isna().any():
context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target=req.target_columns,
)
pred_df = pred_df.sort_values(["id", "timestamp"])
out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()]
return ForecastMultivariateResponse(pred_df=out_records)
# =========================
# 6) Escenarios (what-if)
# =========================
class ScenarioDefinition(BaseModel):
name: str
future_covariates: List[CovariatePoint]
class ScenarioForecast(BaseModel):
name: str
pred_df: List[Dict[str, str]]
class ForecastScenariosRequest(BaseForecastConfig):
context: List[CovariatePoint]
scenarios: List[ScenarioDefinition]
class ForecastScenariosResponse(BaseModel):
scenarios: List[ScenarioForecast]
@app.post("/forecast_scenarios", response_model=ForecastScenariosResponse)
def forecast_scenarios(req: ForecastScenariosRequest):
"""
Evaluación de múltiples escenarios (what-if) cambiando las covariables futuras
(por ejemplo, promo ON/OFF, diferentes precios, etc.).
"""
if not req.context:
raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.")
if not req.scenarios:
raise HTTPException(status_code=400, detail="Debes definir al menos un escenario.")
ctx_rows = []
for p in req.context:
if p.target is None:
continue
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
"target": p.target,
}
for k, v in p.covariates.items():
row[k] = v
ctx_rows.append(row)
context_df = pd.DataFrame(ctx_rows)
if "timestamp" not in context_df or context_df["timestamp"].isna().any():
context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1)
results: List[ScenarioForecast] = []
for scen in req.scenarios:
fut_rows = []
for p in scen.future_covariates:
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
}
for k, v in p.covariates.items():
row[k] = v
fut_rows.append(row)
future_df = pd.DataFrame(fut_rows)
if "timestamp" not in future_df or future_df["timestamp"].isna().any():
future_df["timestamp"] = pd.RangeIndex(
start=context_df["timestamp"].max() + 1,
stop=context_df["timestamp"].max() + 1 + len(future_df),
step=1,
)
pred_df = pipeline.predict_df(
context_df,
future_df=future_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
pred_df = pred_df.sort_values(["id", "timestamp"])
out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()]
results.append(ScenarioForecast(name=scen.name, pred_df=out_records))
return ForecastScenariosResponse(scenarios=results)
# =========================
# 7) Detección de anomalías
# =========================
class AnomalyDetectionRequest(BaseModel):
context: UnivariateSeries
recent_observed: List[float]
prediction_length: int = 7
quantile_low: float = 0.05
quantile_high: float = 0.95
class AnomalyPoint(BaseModel):
index: int
value: float
predicted_median: float
lower: float
upper: float
is_anomaly: bool
class AnomalyDetectionResponse(BaseModel):
anomalies: List[AnomalyPoint]
@app.post("/detect_anomalies", response_model=AnomalyDetectionResponse)
def detect_anomalies(req: AnomalyDetectionRequest):
"""
Marca como anomalías los puntos observados recientes que caen fuera del
intervalo [quantile_low, quantile_high] del pronóstico.
"""
n_hist = len(req.context.values)
if n_hist == 0:
raise HTTPException(status_code=400, detail="La serie histórica no puede estar vacía.")
if len(req.recent_observed) != req.prediction_length:
raise HTTPException(
status_code=400,
detail="recent_observed debe tener la misma longitud que prediction_length.",
)
context_df = pd.DataFrame(
{
"id": ["series_0"] * n_hist,
"timestamp": pd.RangeIndex(start=0, stop=n_hist, step=1),
"target": req.context.values,
}
)
quantiles = sorted({req.quantile_low, 0.5, req.quantile_high})
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=quantiles,
id_column="id",
timestamp_column="timestamp",
target="target",
).sort_values("timestamp")
q_low_col = f"{req.quantile_low:.3g}"
q_high_col = f"{req.quantile_high:.3g}"
anomalies: List[AnomalyPoint] = []
for i, (obs, (_, row)) in enumerate(zip(req.recent_observed, pred_df.iterrows())):
lower = float(row[q_low_col])
upper = float(row[q_high_col])
median = float(row["predictions"])
is_anom = (obs < lower) or (obs > upper)
anomalies.append(
AnomalyPoint(
index=i,
value=obs,
predicted_median=median,
lower=lower,
upper=upper,
is_anomaly=is_anom,
)
)
return AnomalyDetectionResponse(anomalies=anomalies)
# =========================
# 8) Backtest simple
# =========================
class BacktestRequest(BaseModel):
series: UnivariateSeries
prediction_length: int = 7
test_length: int = 28
class BacktestMetrics(BaseModel):
mae: float
mape: float
wql: float # Weighted Quantile Loss aproximada para el cuantil 0.5
class BacktestResponse(BaseModel):
metrics: BacktestMetrics
forecast_median: List[float]
forecast_timestamps: List[str]
actuals: List[float]
@app.post("/backtest_simple", response_model=BacktestResponse)
def backtest_simple(req: BacktestRequest):
"""
Backtest sencillo: separamos un tramo final de la serie como test, pronosticamos
ese tramo y calculamos métricas MAE / MAPE / WQL.
"""
values = np.array(req.series.values, dtype=float)
n = len(values)
if n <= req.test_length:
raise HTTPException(
status_code=400,
detail="La serie debe ser más larga que test_length.",
)
train = values[: n - req.test_length]
test = values[n - req.test_length :]
context_df = pd.DataFrame(
{
"id": ["series_0"] * len(train),
"timestamp": pd.RangeIndex(start=0, stop=len(train), step=1),
"target": train.tolist(),
}
)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.test_length,
quantile_levels=[0.5],
id_column="id",
timestamp_column="timestamp",
target="target",
).sort_values("timestamp")
forecast = pred_df["predictions"].to_numpy(dtype=float)
timestamps = pred_df["timestamp"].astype(str).tolist()
mae = float(np.mean(np.abs(test - forecast)))
eps = 1e-8
mape = float(np.mean(np.abs((test - forecast) / (test + eps)))) * 100.0
tau = 0.5
diff = test - forecast
wql = float(np.mean(np.maximum(tau * diff, (tau - 1) * diff)))
metrics = BacktestMetrics(mae=mae, mape=mape, wql=wql)
return BacktestResponse(
metrics=metrics,
forecast_median=forecast.tolist(),
forecast_timestamps=timestamps,
actuals=test.tolist(),
)