import sqlite3 import os import json import re import concurrent.futures import traceback import zipfile import hashlib from typing import Optional, Dict, Any, List # --- New dependencies for Web Scraping --- import requests from bs4 import BeautifulSoup # --- import torch import psutil from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig import os import concurrent.futures # Detect number of CPUs in the Hugging Face Space cpu_count = os.cpu_count() or 1 # Use that many workers executor = concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) # --- CONFIGURATION --- DB_PATH = "code_agents_pro.db" PROJECT_ROOT = "./projects" os.makedirs(PROJECT_ROOT, exist_ok=True) # ------------------------------ DATABASE (ROBUST) ------------------------------ def init_db(): with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.executescript(""" CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT UNIQUE, password_hash TEXT); CREATE TABLE IF NOT EXISTS projects (id INTEGER PRIMARY KEY, user_id INTEGER, title TEXT, description TEXT, status TEXT DEFAULT 'queued', zip_path TEXT, logs TEXT DEFAULT '', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id)); CREATE INDEX IF NOT EXISTS idx_user_status ON projects(user_id, status); """) init_db() def _db_execute(query, params=(), fetchone=False, fetchall=False, commit=False): try: with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row; cursor = conn.cursor(); cursor.execute(query, params) if commit: conn.commit(); return cursor.lastrowid if fetchone: return cursor.fetchone() if fetchall: return cursor.fetchall() except sqlite3.Error as e: print(f"Database error: {e}"); return None def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() def verify_password(password, stored_hash): return hash_password(password) == stored_hash def create_user(username, password): try: return _db_execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hash_password(password)), commit=True) except sqlite3.IntegrityError: return None def get_user_by_username(username): return _db_execute("SELECT * FROM users WHERE username = ?", (username,), fetchone=True) def get_user_projects(user_id, limit=20): return _db_execute("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit), fetchall=True) def create_project(user_id, title, description): return _db_execute("INSERT INTO projects (user_id, title, description) VALUES (?, ?, ?)", (user_id, title, description), commit=True) def update_project_status(project_id, status, logs=None, zip_path=None): _db_execute("UPDATE projects SET status=?, logs=COALESCE(?, logs), zip_path=COALESCE(?, zip_path) WHERE id=?", (status, logs, zip_path, project_id), commit=True) def get_project(project_id): return _db_execute("SELECT * FROM projects WHERE id = ?", (project_id,), fetchone=True) # ------------------------------ MODEL LOADING & CACHING ------------------------------ MODEL_REGISTRY = { "ceo": "Qwen/Qwen3-0.6B", "manager": "Qwen/Qwen3-0.6B", "worker_coder": "Qwen/Qwen3-0.6B", "worker_tester": "Qwen/Qwen3-0.6B", } _MODEL_CACHE = {} def load_model(model_name): if model_name in _MODEL_CACHE: return _MODEL_CACHE[model_name] model_kwargs = {"device_map": "auto", "trust_remote_code": True, "attn_implementation": "eager"} if torch.cuda.is_available(): print(f"CUDA found. Loading '{model_name}' in 4-bit.") bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16) model_kwargs["quantization_config"] = bnb_config else: print(f"CUDA not found. Loading '{model_name}' on CPU.") tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs) _MODEL_CACHE[model_name] = (tokenizer, model) print(f"Model {model_name} loaded and cached.") return tokenizer, model # ------------------------------ AGENT PROMPTS FOR DYNAMIC FLOW ------------------------------ ROLE_PROMPTS = { "ceo": """You are the CEO. Your job is to create a high-level plan and delegate the first task to your Manager. Analyze the user's request and provide a list of files to be created. Then, create the very first task for the Manager to execute. Respond ONLY with a JSON object. Example: {"plan": {"files": ["app.py", "backend.py", "tests/test_app.py"]}, "initial_task": "Start by writing the backend.py file with database functions."}""", "manager": """You are the Manager. You receive tasks and questions. Your job is to: 1. Break down work into small steps for workers 2. After a worker completes a task, assign the NEXT logical task (code another file OR write tests) 3. Only finish when ALL code and tests are complete 4. If unsure, ask the CEO Available actions: `delegate_coder`, `delegate_tester`, `answer_worker`, `ask_ceo`, `finish_project` Respond ONLY with a single tool call on a new line. Example Actions: To delegate a task to a Coder: `delegate_coder("Write the full Python code for app.py")` To delegate a task to a Tester: `delegate_tester("Write unit tests for backend.py")` To answer a worker's question: `answer_worker("The main file is main.py, and it should use Pygame.")` To ask a question to the CEO (e.g., if you are confused): `ask_ceo("Should we use a relational database or a NoSQL database for this project?")` To finish the entire project: `finish_project()`""", "worker_coder": """You are a Coder. Your only job is to write code based on the Manager's task. If you need more information or documentation to complete a task, you can use the `scrape_web` tool. After completing a task, report back using the `task_complete` tool. DO NOT decide what to do next. The Manager will assign the next file to code. Available actions: `scrape_web`, `write_code`, `read_file`, `ask_manager`, `task_complete` Respond ONLY with a single tool call on a new line. Example Actions: To scrape a website for information: `scrape_web("https://docs.python.org/3/library/sqlite3.html")` To write code to a file. The second argument is a raw string of the code: `write_code("backend.py", "import sqlite3\\n\\ndef init_db():\\n # ...")` To read the contents of an existing file: `read_file("main.py")` To ask the Manager a question: `ask_manager("What testing framework should I use for this project?")` To report that you have finished your task: `task_complete()`""", "worker_tester": """You are a Tester. Your only job is to write tests for existing code. If you need to look up testing libraries or best practices, use the `scrape_web` tool. After completing test writing, report `task_complete`. The Manager will decide if more tests are needed or if the project can finish. Available actions: `scrape_web`, `write_test`, `read_file`, `ask_manager`, `task_complete` Respond ONLY with a single tool call on a new line. Example Actions: To scrape a website for information: `scrape_web("https://docs.pytest.org/en/stable/")` To write tests to a file. The second argument is a raw string of the test code: `write_test("tests/test_backend.py", "import pytest\\n\\ndef test_db_connection():\\n # ...")` To read the contents of an existing file: `read_file("app.py")` To ask the Manager a question: `ask_manager("Are there any specific edge cases I should focus on for the user login tests?")` To report that you have finished your task: `task_complete()`""" } # --------------------------------- NEW PARSER --------------------------------- def _extract_tool_call(text: str) -> Optional[Dict[str, Any]]: """ Parses a string for a tool-call like `tool_name("arg1", "arg2")` and extracts the function name and its arguments. """ text = text.strip().split("\n")[0] # Only consider the first line match = re.search(r'(\w+)\((.*)\)', text, re.DOTALL) if not match: return None tool_name = match.group(1) args_str = match.group(2) # Simple regex to split arguments, handling quoted strings # This is a basic approach and might fail on complex arguments args = [] # Handle single string arguments if re.fullmatch(r'\s*"(.*?)"\s*', args_str, re.DOTALL) or re.fullmatch(r"\s*'(.*?)'\s*", args_str, re.DOTALL): args.append(args_str.strip().strip("'\"")) else: # Simple split by comma for multiple args for arg in args_str.split(','): args.append(arg.strip().strip("'\"")) # Convert known numerical args if needed try: if tool_name in ["delegate_coder", "delegate_tester"]: args = [str(arg) for arg in args] except (ValueError, IndexError): return None # Return None if parsing fails return {"tool_name": tool_name, "args": args} # ------------------------------ FILE SYSTEM & AI TOOLS ------------------------------ def get_project_dir(user_id, project_id): path = os.path.join(PROJECT_ROOT, str(user_id), str(project_id)); os.makedirs(path, exist_ok=True); return path def create_file(project_dir, path, content): full_path = os.path.join(project_dir, path); os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, 'w', encoding='utf-8') as f: f.write(content) def read_file(project_dir, path): full_path = os.path.join(project_dir, path) try: with open(full_path, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return "Error: File not found." def zip_project(project_dir, project_id): zip_filename = f"project_{project_id}.zip"; zip_path = os.path.join(os.path.dirname(project_dir), zip_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: for root, _, files in os.walk(project_dir): for file in files: zf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), project_dir)) return zip_path def scrape_web(url: str) -> str: """Scrapes a URL and returns the clean text content, limited to 5000 chars.""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers, timeout=15) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') for script_or_style in soup(["script", "style"]): script_or_style.decompose() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) return text[:5000] except requests.exceptions.RequestException as e: return f"Error: Could not retrieve content from URL. {e}" except Exception as e: return f"Error: An unexpected error occurred during web scraping. {e}" def generate_with_model(role: str, prompt: str) -> str: try: model_name = MODEL_REGISTRY[role]; tokenizer, model = load_model(model_name) messages = [{"role": "system", "content": ROLE_PROMPTS[role]}, {"role": "user", "content": prompt}] input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = tokenizer(input_text, return_tensors="pt").to(model.device) outputs = model.generate(**inputs, max_new_tokens=3072, pad_token_id=tokenizer.eos_token_id, use_cache=True) return tokenizer.decode(outputs[0][len(inputs.input_ids[0]):], skip_special_tokens=True).strip() except Exception as e: print(f"Error during model generation for role {role}: {e}"); return f"error({e})" # ------------------------------ HIERARCHICAL AGENT ORCHESTRATOR ------------------------------ def run_agent_chain(project_id, user_id, initial_prompt): project_dir = get_project_dir(user_id, project_id) log_entries = [] def log_step(agent, thought, action_result=""): log_entry = f"**[{agent.upper()}]**\n> {thought}\n" if action_result: log_entry += f"_{action_result}_\n" log_entry += "\n---\n" log_entries.append(log_entry) update_project_status(project_id, "running", logs="".join(log_entries)) try: project_context = { "initial_prompt": initial_prompt, "file_structure": [], "current_task": "Start the project.", "last_action_result": None, "turn": 0, "max_turns": 30, } log_step("ORCHESTRATOR", "Briefing the CEO with the user's request.") ceo_response_text = generate_with_model("ceo", initial_prompt) ceo_action = json.loads(ceo_response_text) # CEO still uses JSON for its specific output if not ceo_action or "plan" not in ceo_action: raise ValueError("CEO failed to provide an initial plan.") project_context["file_structure"] = ceo_action["plan"].get("files", []) project_context["current_task"] = ceo_action["initial_task"] log_step("CEO", f"Initial plan created. First task for Manager: {project_context['current_task']}", f"Files to create: {project_context['file_structure']}") next_agent = "manager" while project_context["turn"] < project_context["max_turns"]: prompt = f""" Project context: - User's Goal: {project_context['initial_prompt']} - File Structure: {project_context['file_structure']} - Current Task: {project_context['current_task']} - Result of last action: {project_context['last_action_result']} Based on the context, decide your next action. """ response_text = generate_with_model(next_agent, prompt) tool_call = _extract_tool_call(response_text) if not tool_call: project_context["last_action_result"] = f"Error: Agent {next_agent} returned invalid response. Retrying." log_step("ORCHESTRATOR", f"Invalid response from {next_agent}. Retrying task.", response_text) project_context["turn"] += 1 continue action = tool_call.get("tool_name") args = tool_call.get("args", []) log_step(next_agent.replace("_", " "), f"Decided to perform action: `{action}` with args: `{args}`") if action == "delegate_coder": if not args: project_context["last_action_result"] = "Error: Missing task for delegate_coder." next_agent = "manager" continue next_agent = "worker_coder" project_context["current_task"] = args[0] project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}." elif action == "delegate_tester": if not args: project_context["last_action_result"] = "Error: Missing task for delegate_tester." next_agent = "manager" continue next_agent = "worker_tester" project_context["current_task"] = args[0] project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}." elif action == "write_code": if len(args) < 2: project_context["last_action_result"] = "Error: Missing file_path or code in data." else: file_path = args[0]; code = args[1] create_file(project_dir, file_path, code) project_context["last_action_result"] = f"Successfully wrote code to {file_path}." next_agent = "manager" project_context["current_task"] = f"Worker finished task. What is the next high-level step?" elif action == "write_test": if len(args) < 2: project_context["last_action_result"] = "Error: Missing file_path or code in data." else: file_path = args[0]; code = args[1] create_file(project_dir, file_path, code) project_context["last_action_result"] = f"Successfully wrote tests to {file_path}." next_agent = "manager" project_context["current_task"] = f"Worker finished task. What is the next high-level step?" elif action == "scrape_web": if not args: project_context["last_action_result"] = "Error: Missing URL for scrape_web action." else: url = args[0] log_step(next_agent.replace("_", " "), f"Scraping {url}...") scraped_content = scrape_web(url) project_context["last_action_result"] = f"Scraping result from {url}:\n\n{scraped_content}" continue # Do not increment turn or change agent yet elif action == "ask_manager": if not args: project_context["last_action_result"] = "Error: Missing question for ask_manager." else: project_context["last_action_result"] = f"A worker has a question." project_context["current_task"] = args[0] next_agent = "manager" elif action == "answer_worker": if not args: project_context["last_action_result"] = "Error: Missing answer for answer_worker." else: project_context["last_action_result"] = f"Manager answered: {args[0]}" project_context["current_task"] = "The worker's question has been answered. What is the next task for a worker?" next_agent = "manager" elif action == "task_complete": project_context["last_action_result"] = "Worker reported task is complete." project_context["current_task"] = f"Worker finished task. What is the next high-level step?" next_agent = "manager" elif action == "finish_project": log_step("MANAGER", "Project is ready for final packaging.") break else: project_context["last_action_result"] = f"Error: Unknown action '{action}'." log_step("ORCHESTRATOR", project_context['last_action_result']) project_context["turn"] += 1 if project_context["turn"] >= project_context["max_turns"]: raise ValueError("Project failed to complete within the maximum number of turns.") log_step("SYSTEM", "Packaging project..."); zip_path = zip_project(project_dir, project_id) update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path) log_step("SYSTEM", "Project completed successfully!") update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path) except Exception as e: tb_str = traceback.format_exc(); print(f"--- AGENT CHAIN FAILED ---\n{tb_str}\n---") error_log = "".join(log_entries) + f"\n\n❌ **CRITICAL ERROR:**\n```{str(e)}```" update_project_status(project_id, "failed", logs=error_log) # ------------------------------ JOB QUEUE ------------------------------ executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) def queue_job(project_id, user_id, prompt): print(f"Queuing job for project: {project_id}"); executor.submit(run_agent_chain, project_id, user_id, prompt)