Spaces:
Running
Running
| import sqlite3 | |
| import os | |
| import json | |
| import re | |
| import concurrent.futures | |
| import traceback | |
| import zipfile | |
| import hashlib | |
| from typing import Optional, Dict, Any, List | |
| # --- New dependencies for Web Scraping --- | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # --- | |
| import torch | |
| import psutil | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig | |
| import os | |
| import concurrent.futures | |
| # Detect number of CPUs in the Hugging Face Space | |
| cpu_count = os.cpu_count() or 1 | |
| # Use that many workers | |
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) | |
| # --- CONFIGURATION --- | |
| DB_PATH = "code_agents_pro.db" | |
| PROJECT_ROOT = "./projects" | |
| os.makedirs(PROJECT_ROOT, exist_ok=True) | |
| # ------------------------------ DATABASE (ROBUST) ------------------------------ | |
| def init_db(): | |
| with sqlite3.connect(DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| cursor.executescript(""" | |
| CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT UNIQUE, password_hash TEXT); | |
| CREATE TABLE IF NOT EXISTS projects (id INTEGER PRIMARY KEY, user_id INTEGER, title TEXT, description TEXT, status TEXT DEFAULT 'queued', zip_path TEXT, logs TEXT DEFAULT '', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id)); | |
| CREATE INDEX IF NOT EXISTS idx_user_status ON projects(user_id, status); | |
| """) | |
| init_db() | |
| def _db_execute(query, params=(), fetchone=False, fetchall=False, commit=False): | |
| try: | |
| with sqlite3.connect(DB_PATH) as conn: | |
| conn.row_factory = sqlite3.Row; cursor = conn.cursor(); cursor.execute(query, params) | |
| if commit: conn.commit(); return cursor.lastrowid | |
| if fetchone: return cursor.fetchone() | |
| if fetchall: return cursor.fetchall() | |
| except sqlite3.Error as e: print(f"Database error: {e}"); return None | |
| def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() | |
| def verify_password(password, stored_hash): return hash_password(password) == stored_hash | |
| def create_user(username, password): | |
| try: return _db_execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hash_password(password)), commit=True) | |
| except sqlite3.IntegrityError: return None | |
| def get_user_by_username(username): return _db_execute("SELECT * FROM users WHERE username = ?", (username,), fetchone=True) | |
| def get_user_projects(user_id, limit=20): return _db_execute("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit), fetchall=True) | |
| def create_project(user_id, title, description): return _db_execute("INSERT INTO projects (user_id, title, description) VALUES (?, ?, ?)", (user_id, title, description), commit=True) | |
| def update_project_status(project_id, status, logs=None, zip_path=None): _db_execute("UPDATE projects SET status=?, logs=COALESCE(?, logs), zip_path=COALESCE(?, zip_path) WHERE id=?", (status, logs, zip_path, project_id), commit=True) | |
| def get_project(project_id): return _db_execute("SELECT * FROM projects WHERE id = ?", (project_id,), fetchone=True) | |
| # ------------------------------ MODEL LOADING & CACHING ------------------------------ | |
| MODEL_REGISTRY = { | |
| "ceo": "Qwen/Qwen3-0.6B", | |
| "manager": "Qwen/Qwen3-0.6B", | |
| "worker_coder": "Qwen/Qwen3-0.6B", | |
| "worker_tester": "Qwen/Qwen3-0.6B", | |
| } | |
| _MODEL_CACHE = {} | |
| def load_model(model_name): | |
| if model_name in _MODEL_CACHE: return _MODEL_CACHE[model_name] | |
| model_kwargs = {"device_map": "auto", "trust_remote_code": True, "attn_implementation": "eager"} | |
| if torch.cuda.is_available(): | |
| print(f"CUDA found. Loading '{model_name}' in 4-bit.") | |
| bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16) | |
| model_kwargs["quantization_config"] = bnb_config | |
| else: | |
| print(f"CUDA not found. Loading '{model_name}' on CPU.") | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs) | |
| _MODEL_CACHE[model_name] = (tokenizer, model) | |
| print(f"Model {model_name} loaded and cached.") | |
| return tokenizer, model | |
| # ------------------------------ AGENT PROMPTS FOR DYNAMIC FLOW ------------------------------ | |
| ROLE_PROMPTS = { | |
| "ceo": """You are the CEO. Your job is to create a high-level plan and delegate the first task to your Manager. | |
| Analyze the user's request and provide a list of files to be created. | |
| Then, create the very first task for the Manager to execute. | |
| Respond ONLY with a JSON object. | |
| Example: {"plan": {"files": ["app.py", "backend.py", "tests/test_app.py"]}, "initial_task": "Start by writing the backend.py file with database functions."}""", | |
| "manager": """You are the Manager. You receive tasks and questions. Your job is to: | |
| 1. Break down work into small steps for workers | |
| 2. After a worker completes a task, assign the NEXT logical task (code another file OR write tests) | |
| 3. Only finish when ALL code and tests are complete | |
| 4. If unsure, ask the CEO | |
| Available actions: `delegate_coder`, `delegate_tester`, `answer_worker`, `ask_ceo`, `finish_project` | |
| Respond ONLY with a single tool call on a new line. | |
| Example Actions: | |
| To delegate a task to a Coder: | |
| `delegate_coder("Write the full Python code for app.py")` | |
| To delegate a task to a Tester: | |
| `delegate_tester("Write unit tests for backend.py")` | |
| To answer a worker's question: | |
| `answer_worker("The main file is main.py, and it should use Pygame.")` | |
| To ask a question to the CEO (e.g., if you are confused): | |
| `ask_ceo("Should we use a relational database or a NoSQL database for this project?")` | |
| To finish the entire project: | |
| `finish_project()`""", | |
| "worker_coder": """You are a Coder. Your only job is to write code based on the Manager's task. | |
| If you need more information or documentation to complete a task, you can use the `scrape_web` tool. | |
| After completing a task, report back using the `task_complete` tool. DO NOT decide what to do next. | |
| The Manager will assign the next file to code. | |
| Available actions: `scrape_web`, `write_code`, `read_file`, `ask_manager`, `task_complete` | |
| Respond ONLY with a single tool call on a new line. | |
| Example Actions: | |
| To scrape a website for information: | |
| `scrape_web("https://docs.python.org/3/library/sqlite3.html")` | |
| To write code to a file. The second argument is a raw string of the code: | |
| `write_code("backend.py", "import sqlite3\\n\\ndef init_db():\\n # ...")` | |
| To read the contents of an existing file: | |
| `read_file("main.py")` | |
| To ask the Manager a question: | |
| `ask_manager("What testing framework should I use for this project?")` | |
| To report that you have finished your task: | |
| `task_complete()`""", | |
| "worker_tester": """You are a Tester. Your only job is to write tests for existing code. | |
| If you need to look up testing libraries or best practices, use the `scrape_web` tool. | |
| After completing test writing, report `task_complete`. | |
| The Manager will decide if more tests are needed or if the project can finish. | |
| Available actions: `scrape_web`, `write_test`, `read_file`, `ask_manager`, `task_complete` | |
| Respond ONLY with a single tool call on a new line. | |
| Example Actions: | |
| To scrape a website for information: | |
| `scrape_web("https://docs.pytest.org/en/stable/")` | |
| To write tests to a file. The second argument is a raw string of the test code: | |
| `write_test("tests/test_backend.py", "import pytest\\n\\ndef test_db_connection():\\n # ...")` | |
| To read the contents of an existing file: | |
| `read_file("app.py")` | |
| To ask the Manager a question: | |
| `ask_manager("Are there any specific edge cases I should focus on for the user login tests?")` | |
| To report that you have finished your task: | |
| `task_complete()`""" | |
| } | |
| # --------------------------------- NEW PARSER --------------------------------- | |
| def _extract_tool_call(text: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parses a string for a tool-call like `tool_name("arg1", "arg2")` and extracts | |
| the function name and its arguments. | |
| """ | |
| text = text.strip().split("\n")[0] # Only consider the first line | |
| match = re.search(r'(\w+)\((.*)\)', text, re.DOTALL) | |
| if not match: | |
| return None | |
| tool_name = match.group(1) | |
| args_str = match.group(2) | |
| # Simple regex to split arguments, handling quoted strings | |
| # This is a basic approach and might fail on complex arguments | |
| args = [] | |
| # Handle single string arguments | |
| if re.fullmatch(r'\s*"(.*?)"\s*', args_str, re.DOTALL) or re.fullmatch(r"\s*'(.*?)'\s*", args_str, re.DOTALL): | |
| args.append(args_str.strip().strip("'\"")) | |
| else: | |
| # Simple split by comma for multiple args | |
| for arg in args_str.split(','): | |
| args.append(arg.strip().strip("'\"")) | |
| # Convert known numerical args if needed | |
| try: | |
| if tool_name in ["delegate_coder", "delegate_tester"]: | |
| args = [str(arg) for arg in args] | |
| except (ValueError, IndexError): | |
| return None # Return None if parsing fails | |
| return {"tool_name": tool_name, "args": args} | |
| # ------------------------------ FILE SYSTEM & AI TOOLS ------------------------------ | |
| def get_project_dir(user_id, project_id): | |
| path = os.path.join(PROJECT_ROOT, str(user_id), str(project_id)); os.makedirs(path, exist_ok=True); return path | |
| def create_file(project_dir, path, content): | |
| full_path = os.path.join(project_dir, path); os.makedirs(os.path.dirname(full_path), exist_ok=True) | |
| with open(full_path, 'w', encoding='utf-8') as f: f.write(content) | |
| def read_file(project_dir, path): | |
| full_path = os.path.join(project_dir, path) | |
| try: | |
| with open(full_path, 'r', encoding='utf-8') as f: return f.read() | |
| except FileNotFoundError: return "Error: File not found." | |
| def zip_project(project_dir, project_id): | |
| zip_filename = f"project_{project_id}.zip"; zip_path = os.path.join(os.path.dirname(project_dir), zip_filename) | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: | |
| for root, _, files in os.walk(project_dir): | |
| for file in files: zf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), project_dir)) | |
| return zip_path | |
| def scrape_web(url: str) -> str: | |
| """Scrapes a URL and returns the clean text content, limited to 5000 chars.""" | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| for script_or_style in soup(["script", "style"]): script_or_style.decompose() | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text[:5000] | |
| except requests.exceptions.RequestException as e: | |
| return f"Error: Could not retrieve content from URL. {e}" | |
| except Exception as e: | |
| return f"Error: An unexpected error occurred during web scraping. {e}" | |
| def generate_with_model(role: str, prompt: str) -> str: | |
| try: | |
| model_name = MODEL_REGISTRY[role]; tokenizer, model = load_model(model_name) | |
| messages = [{"role": "system", "content": ROLE_PROMPTS[role]}, {"role": "user", "content": prompt}] | |
| input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = tokenizer(input_text, return_tensors="pt").to(model.device) | |
| outputs = model.generate(**inputs, max_new_tokens=3072, pad_token_id=tokenizer.eos_token_id, use_cache=True) | |
| return tokenizer.decode(outputs[0][len(inputs.input_ids[0]):], skip_special_tokens=True).strip() | |
| except Exception as e: | |
| print(f"Error during model generation for role {role}: {e}"); return f"error({e})" | |
| # ------------------------------ HIERARCHICAL AGENT ORCHESTRATOR ------------------------------ | |
| def run_agent_chain(project_id, user_id, initial_prompt): | |
| project_dir = get_project_dir(user_id, project_id) | |
| log_entries = [] | |
| def log_step(agent, thought, action_result=""): | |
| log_entry = f"**[{agent.upper()}]**\n> {thought}\n" | |
| if action_result: | |
| log_entry += f"_{action_result}_\n" | |
| log_entry += "\n---\n" | |
| log_entries.append(log_entry) | |
| update_project_status(project_id, "running", logs="".join(log_entries)) | |
| try: | |
| project_context = { | |
| "initial_prompt": initial_prompt, | |
| "file_structure": [], | |
| "current_task": "Start the project.", | |
| "last_action_result": None, | |
| "turn": 0, | |
| "max_turns": 30, | |
| } | |
| log_step("ORCHESTRATOR", "Briefing the CEO with the user's request.") | |
| ceo_response_text = generate_with_model("ceo", initial_prompt) | |
| ceo_action = json.loads(ceo_response_text) # CEO still uses JSON for its specific output | |
| if not ceo_action or "plan" not in ceo_action: raise ValueError("CEO failed to provide an initial plan.") | |
| project_context["file_structure"] = ceo_action["plan"].get("files", []) | |
| project_context["current_task"] = ceo_action["initial_task"] | |
| log_step("CEO", f"Initial plan created. First task for Manager: {project_context['current_task']}", f"Files to create: {project_context['file_structure']}") | |
| next_agent = "manager" | |
| while project_context["turn"] < project_context["max_turns"]: | |
| prompt = f""" | |
| Project context: | |
| - User's Goal: {project_context['initial_prompt']} | |
| - File Structure: {project_context['file_structure']} | |
| - Current Task: {project_context['current_task']} | |
| - Result of last action: {project_context['last_action_result']} | |
| Based on the context, decide your next action. | |
| """ | |
| response_text = generate_with_model(next_agent, prompt) | |
| tool_call = _extract_tool_call(response_text) | |
| if not tool_call: | |
| project_context["last_action_result"] = f"Error: Agent {next_agent} returned invalid response. Retrying." | |
| log_step("ORCHESTRATOR", f"Invalid response from {next_agent}. Retrying task.", response_text) | |
| project_context["turn"] += 1 | |
| continue | |
| action = tool_call.get("tool_name") | |
| args = tool_call.get("args", []) | |
| log_step(next_agent.replace("_", " "), f"Decided to perform action: `{action}` with args: `{args}`") | |
| if action == "delegate_coder": | |
| if not args: | |
| project_context["last_action_result"] = "Error: Missing task for delegate_coder." | |
| next_agent = "manager" | |
| continue | |
| next_agent = "worker_coder" | |
| project_context["current_task"] = args[0] | |
| project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}." | |
| elif action == "delegate_tester": | |
| if not args: | |
| project_context["last_action_result"] = "Error: Missing task for delegate_tester." | |
| next_agent = "manager" | |
| continue | |
| next_agent = "worker_tester" | |
| project_context["current_task"] = args[0] | |
| project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}." | |
| elif action == "write_code": | |
| if len(args) < 2: | |
| project_context["last_action_result"] = "Error: Missing file_path or code in data." | |
| else: | |
| file_path = args[0]; code = args[1] | |
| create_file(project_dir, file_path, code) | |
| project_context["last_action_result"] = f"Successfully wrote code to {file_path}." | |
| next_agent = "manager" | |
| project_context["current_task"] = f"Worker finished task. What is the next high-level step?" | |
| elif action == "write_test": | |
| if len(args) < 2: | |
| project_context["last_action_result"] = "Error: Missing file_path or code in data." | |
| else: | |
| file_path = args[0]; code = args[1] | |
| create_file(project_dir, file_path, code) | |
| project_context["last_action_result"] = f"Successfully wrote tests to {file_path}." | |
| next_agent = "manager" | |
| project_context["current_task"] = f"Worker finished task. What is the next high-level step?" | |
| elif action == "scrape_web": | |
| if not args: | |
| project_context["last_action_result"] = "Error: Missing URL for scrape_web action." | |
| else: | |
| url = args[0] | |
| log_step(next_agent.replace("_", " "), f"Scraping {url}...") | |
| scraped_content = scrape_web(url) | |
| project_context["last_action_result"] = f"Scraping result from {url}:\n\n{scraped_content}" | |
| continue # Do not increment turn or change agent yet | |
| elif action == "ask_manager": | |
| if not args: | |
| project_context["last_action_result"] = "Error: Missing question for ask_manager." | |
| else: | |
| project_context["last_action_result"] = f"A worker has a question." | |
| project_context["current_task"] = args[0] | |
| next_agent = "manager" | |
| elif action == "answer_worker": | |
| if not args: | |
| project_context["last_action_result"] = "Error: Missing answer for answer_worker." | |
| else: | |
| project_context["last_action_result"] = f"Manager answered: {args[0]}" | |
| project_context["current_task"] = "The worker's question has been answered. What is the next task for a worker?" | |
| next_agent = "manager" | |
| elif action == "task_complete": | |
| project_context["last_action_result"] = "Worker reported task is complete." | |
| project_context["current_task"] = f"Worker finished task. What is the next high-level step?" | |
| next_agent = "manager" | |
| elif action == "finish_project": | |
| log_step("MANAGER", "Project is ready for final packaging.") | |
| break | |
| else: | |
| project_context["last_action_result"] = f"Error: Unknown action '{action}'." | |
| log_step("ORCHESTRATOR", project_context['last_action_result']) | |
| project_context["turn"] += 1 | |
| if project_context["turn"] >= project_context["max_turns"]: | |
| raise ValueError("Project failed to complete within the maximum number of turns.") | |
| log_step("SYSTEM", "Packaging project..."); zip_path = zip_project(project_dir, project_id) | |
| update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path) | |
| log_step("SYSTEM", "Project completed successfully!") | |
| update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path) | |
| except Exception as e: | |
| tb_str = traceback.format_exc(); print(f"--- AGENT CHAIN FAILED ---\n{tb_str}\n---") | |
| error_log = "".join(log_entries) + f"\n\n❌ **CRITICAL ERROR:**\n```{str(e)}```" | |
| update_project_status(project_id, "failed", logs=error_log) | |
| # ------------------------------ JOB QUEUE ------------------------------ | |
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | |
| def queue_job(project_id, user_id, prompt): | |
| print(f"Queuing job for project: {project_id}"); executor.submit(run_agent_chain, project_id, user_id, prompt) |