Spaces:
Runtime error
Runtime error
| import argparse | |
| import glob | |
| import json | |
| import os | |
| import tempfile | |
| from typing import Optional | |
| import requests | |
| from estimate_013 import estimate_from_config | |
| from fastapi import Body, FastAPI | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from megatron.core import parallel_state as mpu | |
| from pydantic import BaseModel, field_validator | |
| from mbridge import AutoBridge | |
| # The directory of the current script (main.py) | |
| WEBUI_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| app = FastAPI() | |
| # Mount static files from the webui directory | |
| app.mount("/static", StaticFiles(directory=WEBUI_DIR), name="static") | |
| async def read_index(): | |
| return FileResponse(os.path.join(WEBUI_DIR, "index.html")) | |
| async def read_css(): | |
| return FileResponse(os.path.join(WEBUI_DIR, "style.css")) | |
| async def read_js(): | |
| return FileResponse(os.path.join(WEBUI_DIR, "script.js")) | |
| SUPPORTED_MODELS = [ | |
| "Qwen/Qwen3-235B-A22B", | |
| "Qwen/Qwen3-30B-A3B", | |
| "Qwen/Qwen3-32B", | |
| "Qwen/Qwen3-14B", | |
| "Qwen/Qwen3-8B", | |
| "Qwen/Qwen2.5-7B", | |
| "Qwen/Qwen2.5-14B", | |
| "Qwen/Qwen2.5-32B", | |
| "Qwen/Qwen2.5-72B", | |
| "moonshotai/Moonlight-16B-A3B", | |
| "moonshotai/Kimi-K2-Instruct", | |
| "deepseek-ai/DeepSeek-V3", | |
| "XiaomiMiMo/MiMo-7B-RL", | |
| ] | |
| async def get_supported_models(): | |
| """Return the list of HF model identifiers supported by the UI.""" | |
| return SUPPORTED_MODELS | |
| async def get_remote_hf_config(model_path: str): | |
| """Fetch the HuggingFace config.json for the given model id.""" | |
| url = f"https://huggingface.co/{model_path}/raw/main/config.json" | |
| try: | |
| resp = requests.get(url, timeout=10) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception as e: | |
| return {"error": f"Failed to fetch config from {url}: {str(e)}"} | |
| class MBridgeEstimateConfig(BaseModel): | |
| hf_model_path: str | |
| custom_hf_config: Optional[dict] = None # Renamed for clarity | |
| # Hardware & Training | |
| num_gpus: int = 8 | |
| mbs: int = 1 | |
| seq_len: int = 4096 | |
| use_distributed_optimizer: bool = True | |
| # Recompute settings are now part of the main config | |
| recompute_granularity: str = "selective" | |
| recompute_method: str = "uniform" | |
| recompute_num_layers: Optional[int] = 1 | |
| # Selective recompute modules (optional list only used when granularity==selective) | |
| recompute_modules: Optional[list[str]] = None | |
| # 新增:Embedding/Loss PP Split 选项 | |
| account_for_embedding_in_pipeline_split: bool = False | |
| account_for_loss_in_pipeline_split: bool = False | |
| # Parallelism | |
| tp: int = 1 | |
| pp: int = 1 | |
| ep: int = 1 | |
| cp: int = 1 | |
| vpp: Optional[int] = None | |
| etp: Optional[int] = None | |
| # Pipeline stage layer counts | |
| num_layers_in_first_pipeline_stage: Optional[int] = None | |
| num_layers_in_last_pipeline_stage: Optional[int] = None | |
| # New field: custom pipeline-model-parallel layout | |
| pipeline_model_parallel_layout: Optional[str] = None # Comma-separated ints | |
| def num_gpus_must_be_multiple_of_8(cls, v): | |
| if v <= 0 or v % 8 != 0: | |
| raise ValueError("must be a positive multiple of 8") | |
| return v | |
| def patch_parallel_states(config: MBridgeEstimateConfig): | |
| from mbridge.core.parallel_states import ParallelStates | |
| ParallelStates.get_default_parallel_states = lambda: ParallelStates( | |
| tp_size=config.tp, | |
| pp_size=config.pp, | |
| ep_size=config.ep, | |
| cp_size=config.cp, | |
| vpp_size=config.vpp, | |
| etp_size=config.etp, | |
| ) | |
| async def estimate_with_mbridge(config: MBridgeEstimateConfig): | |
| # Validate Inputs | |
| if config.num_gpus <= 0 or config.num_gpus % 8 != 0: | |
| return {"error": "Total number of GPUs must be a positive multiple of 8."} | |
| parallel_product = config.tp * config.pp * config.cp | |
| if parallel_product == 0: # Avoid division by zero | |
| return {"error": "Parallelism dimensions (TP, PP, CP) cannot be zero."} | |
| if config.num_gpus % parallel_product != 0: | |
| return { | |
| "error": f"Number of GPUs ({config.num_gpus}) must be divisible by the product of TP*PP*CP ({parallel_product})." | |
| } | |
| patch_parallel_states(config) | |
| # If the path is just a filename, assume it's in our local model-configs dir | |
| hf_model_path = config.hf_model_path | |
| # This logic needs to change. The custom config from the UI is an HF config, not a Megatron config. | |
| # We need to load it via a temporary file. | |
| if config.custom_hf_config: | |
| try: | |
| # Create a temporary file to save the custom HF config | |
| with tempfile.NamedTemporaryFile( | |
| mode="w+", | |
| delete=False, | |
| suffix=".json", | |
| dir=os.path.join("/dev/shm"), | |
| ) as tmp: | |
| json.dump(config.custom_hf_config, tmp) | |
| tmp_path = tmp.name | |
| # Load the bridge from the temporary config file | |
| from transformers import AutoConfig | |
| AutoConfig.trust_remote_code = True | |
| bridge = AutoBridge.from_pretrained(tmp_path) | |
| tf_config = bridge.config | |
| hf_config = bridge.hf_config | |
| finally: | |
| # Ensure the temporary file is deleted | |
| if "tmp_path" in locals() and os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| else: | |
| # If no custom config, load from the original path | |
| if not os.path.isabs(hf_model_path) and not hf_model_path.startswith( | |
| ("http", "./", "../") | |
| ): | |
| hf_model_path = os.path.join("/dev/shm", hf_model_path) | |
| bridge = AutoBridge.from_pretrained(hf_model_path) | |
| tf_config = bridge.config | |
| hf_config = bridge.hf_config | |
| # --- Configuration Unification --- | |
| # Update the tf_config with values from the form. This makes tf_config the single source of truth. | |
| tf_config.tensor_model_parallel_size = config.tp | |
| tf_config.pipeline_model_parallel_size = config.pp | |
| tf_config.expert_model_parallel_size = config.ep | |
| tf_config.context_parallel_size = config.cp | |
| tf_config.recompute_granularity = config.recompute_granularity | |
| tf_config.recompute_method = config.recompute_method | |
| tf_config.recompute_num_layers = config.recompute_num_layers | |
| # 新增:Selective 模式下的模块列表 | |
| tf_config.recompute_modules = config.recompute_modules if config.recompute_modules is not None else [] | |
| # 新增:Embedding/Loss PP Split | |
| tf_config.account_for_embedding_in_pipeline_split = config.account_for_embedding_in_pipeline_split | |
| tf_config.account_for_loss_in_pipeline_split = config.account_for_loss_in_pipeline_split | |
| tf_config.num_layers_per_virtual_pipeline_stage = ( | |
| config.vpp if config.vpp and config.vpp > 1 else None | |
| ) | |
| if config.num_layers_in_first_pipeline_stage is not None: | |
| tf_config.num_layers_in_first_pipeline_stage = ( | |
| config.num_layers_in_first_pipeline_stage | |
| ) | |
| if config.num_layers_in_last_pipeline_stage is not None: | |
| tf_config.num_layers_in_last_pipeline_stage = ( | |
| config.num_layers_in_last_pipeline_stage | |
| ) | |
| # Handle custom pipeline layout if provided | |
| if config.pipeline_model_parallel_layout: | |
| from megatron.core.transformer.pipeline_parallel_layer_layout import ( | |
| PipelineParallelLayerLayout, | |
| ) | |
| tf_config.pipeline_model_parallel_layout = PipelineParallelLayerLayout( | |
| config.pipeline_model_parallel_layout, config.pp | |
| ) | |
| # print(tf_config) | |
| # Create a minimal 'args' object with parameters not present in TransformerConfig | |
| args = argparse.Namespace() | |
| args.micro_batch_size = config.mbs | |
| args.seq_length = config.seq_len | |
| args.use_distributed_optimizer = config.use_distributed_optimizer | |
| args.data_parallel_size = config.num_gpus // parallel_product | |
| args.expert_tensor_parallel_size = config.etp if config.etp else 1 | |
| # These are required by the estimator but can be derived or defaulted | |
| args.transformer_impl = "transformer_engine" | |
| args.fp8 = False | |
| args.num_experts = getattr(tf_config, "num_moe_experts", 1) # Needed for layer spec | |
| args.moe_grouped_gemm = True # Default | |
| args.qk_layernorm = tf_config.qk_layernorm | |
| args.multi_latent_attention = "deepseek" in getattr(hf_config, "model_type", "") | |
| args.padded_vocab_size = getattr(hf_config, "vocab_size") | |
| args.max_position_embeddings = getattr(hf_config, "max_position_embeddings") | |
| args.tie_word_embeddings = getattr(hf_config, "tie_word_embeddings", False) | |
| args.world_size = config.num_gpus | |
| # This function now returns (aggregated_pp_reports, raw_chunk_reports) | |
| aggregated_reports, raw_chunk_reports = estimate_from_config(tf_config, args) | |
| processed_reports = [] | |
| for rpt in aggregated_reports: | |
| p = rpt.copy() | |
| p.pop("details", None) | |
| processed_reports.append(p) | |
| return {"processed_report": processed_reports, "raw_report": raw_chunk_reports} | |