# backend.py — FINAL HARDENED VERSION import sqlite3 import os import json import re import concurrent.futures import traceback import zipfile import hashlib from typing import Optional, Dict, Any import torch import psutil from transformers import AutoTokenizer, AutoModelForCausalLM # --- CONFIGURATION --- DB_PATH = "code_agents_pro.db" PROJECT_ROOT = "./projects" os.makedirs(PROJECT_ROOT, exist_ok=True) # ------------------------------ DATABASE (ROBUST) ------------------------------ def init_db(): with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.executescript(""" CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT UNIQUE, password_hash TEXT); CREATE TABLE IF NOT EXISTS projects (id INTEGER PRIMARY KEY, user_id INTEGER, title TEXT, description TEXT, status TEXT DEFAULT 'queued', zip_path TEXT, logs TEXT DEFAULT '', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id)); CREATE INDEX IF NOT EXISTS idx_user_status ON projects(user_id, status); """) init_db() def _db_execute(query, params=(), fetchone=False, fetchall=False, commit=False): try: with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query, params) if commit: conn.commit() return cursor.lastrowid if fetchone: return cursor.fetchone() if fetchall: return cursor.fetchall() except sqlite3.Error as e: print(f"Database error: {e}") return None def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() def verify_password(password, stored_hash): return hash_password(password) == stored_hash def create_user(username, password): try: return _db_execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hash_password(password)), commit=True) except sqlite3.IntegrityError: return None def get_user_by_username(username): return _db_execute("SELECT * FROM users WHERE username = ?", (username,), fetchone=True) def get_user_projects(user_id, limit=20): return _db_execute("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit), fetchall=True) def create_project(user_id, title, description): return _db_execute("INSERT INTO projects (user_id, title, description) VALUES (?, ?, ?)", (user_id, title, description), commit=True) def update_project_status(project_id, status, logs=None, zip_path=None): _db_execute("UPDATE projects SET status=?, logs=COALESCE(?, logs), zip_path=COALESCE(?, zip_path) WHERE id=?", (status, logs, zip_path, project_id), commit=True) def get_project(project_id): return _db_execute("SELECT * FROM projects WHERE id = ?", (project_id,), fetchone=True) # ------------------------------ MODEL LOADING & CACHING ------------------------------ MODEL_REGISTRY = { "planner": "microsoft/Phi-3-mini-4k-instruct", "architect": "Qwen/Qwen2.5-Coder-0.6B-Instruct", "coder": "Qwen/Qwen2.5-Coder-0.6B-Instruct", "reviewer": "microsoft/Phi-3-mini-4k-instruct", "tester": "Qwen/Qwen2.5-Coder-0.6B-Instruct", "publisher": "microsoft/Phi-3-mini-4k-instruct", } _MODEL_CACHE = {} def load_model(model_name): if model_name in _MODEL_CACHE: return _MODEL_CACHE[model_name] print(f"Loading model: {model_name}...") tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype="auto", device_map="auto", trust_remote_code=True, attn_implementation="eager") _MODEL_CACHE[model_name] = (tokenizer, model) print(f"Model {model_name} loaded and cached.") return tokenizer, model # ------------------------------ AGENT PROMPTS (SIMPLIFIED & ROBUST) ------------------------------ ROLE_PROMPTS = { "planner": """You are an expert software planner. Break down the user's request into a detailed plan. Output ONLY a single JSON object with the keys: "purpose", "features", "dependencies", and "files". The "files" key MUST be an array of strings representing complete file paths (e.g., ["src/main.py", "tests/test_main.py", "requirements.txt"]).""", "architect": """You are a software architect. Create initial placeholder content for a list of files. Output ONLY a single JSON object where keys are file paths and values are the initial content (e.g., a comment like '# Main application logic here').""", "coder": "You are a professional programmer. Your ONLY job is to write the complete, clean, and functional code for the single file requested. Do NOT add any explanations, introductions, or markdown formatting. Output ONLY the raw source code.", "reviewer": """You are a meticulous code reviewer. Analyze the given code for bugs, style issues, and security vulnerabilities. Output ONLY a single JSON object with two keys: "has_issues" (boolean) and "suggestions" (a string containing a bulleted list of required changes).""", "tester": "You are a QA engineer. Write a complete pytest test file for the given source code. Cover main functionality and edge cases. Output ONLY the raw source code for the test file.", "publisher": """You are a release manager. Create final documentation and configuration files. Output ONLY a single JSON object where keys are the filenames ("README.md", ".gitignore", "Dockerfile") and values are their complete string content.""" } # ------------------------------ FILE SYSTEM & AI TOOLS ------------------------------ def get_project_dir(user_id, project_id): path = os.path.join(PROJECT_ROOT, str(user_id), str(project_id)) os.makedirs(path, exist_ok=True) return path def create_file(project_dir, path, content): full_path = os.path.join(project_dir, path) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, 'w', encoding='utf-8') as f: f.write(content) def read_file(project_dir, path): full_path = os.path.join(project_dir, path) try: with open(full_path, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return None def zip_project(project_dir, project_id): zip_filename = f"project_{project_id}.zip" zip_path = os.path.join(os.path.dirname(project_dir), zip_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: for root, _, files in os.walk(project_dir): for file in files: zf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), project_dir)) return zip_path def _extract_json(text: str) -> Optional[Dict[str, Any]]: match = re.search(r"```json\s*([\s\S]*?)\s*```|(\{[\s\S]*\})", text) if not match: return None json_str = match.group(1) or match.group(2) try: return json.loads(json_str) except json.JSONDecodeError: print(f"Failed to decode JSON: {json_str[:200]}...") return None def generate_with_model(role: str, prompt: str) -> str: try: model_name = MODEL_REGISTRY[role] tokenizer, model = load_model(model_name) messages = [{"role": "system", "content": ROLE_PROMPTS[role]}, {"role": "user", "content": prompt}] input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = tokenizer(input_text, return_tensors="pt").to(model.device) outputs = model.generate(**inputs, max_new_tokens=2048, pad_token_id=tokenizer.eos_token_id) return tokenizer.decode(outputs[0][len(inputs.input_ids[0]):], skip_special_tokens=True).strip() except Exception as e: print(f"Error during model generation for role {role}: {e}") return f'{{"error": "Failed to generate response: {str(e)}"}}' # ------------------------------ THE AGENT CHAIN EXECUTOR (REWRITTEN FOR RELIABILITY) ------------------------------ def run_agent_chain(project_id, user_id, initial_prompt): project_dir = get_project_dir(user_id, project_id) log_entries = [] def log_step(agent, action, output=""): log_entry = f"**[{agent.upper()}]**: {action}\n" if output: log_entry += f"```\n{output[:1000]}{'...' if len(output) > 1000 else ''}\n```\n---\n" log_entries.append(log_entry) update_project_status(project_id, "running", logs="".join(log_entries)) try: log_step("SYSTEM", f"Initializing project directory...") update_project_status(project_id, "running", logs="Agent team is assembling...") # 1. PLANNER log_step("PLANNER", "Analyzing user request and creating a project plan...") plan_response = generate_with_model("planner", initial_prompt) plan_data = _extract_json(plan_response) if not plan_data or "files" not in plan_data: raise ValueError("Planner failed to create a valid JSON plan with a 'files' key.") ## ROBUSTNESS FIX: Handle cases where the LLM returns [{"file": "path"}] instead of ["path"] if plan_data["files"] and isinstance(plan_data["files"][0], dict): log_step("SYSTEM", "Planner returned a list of objects. Normalizing to a list of strings.") plan_data["files"] = [item["file"] for item in plan_data["files"] if "file" in item] log_step("PLANNER", "Plan created successfully.", json.dumps(plan_data, indent=2)) # 2. ARCHITECT log_step("ARCHITECT", "Creating initial file skeletons...") arch_prompt = f"Create initial content for these files:\n{json.dumps(plan_data['files'])}" arch_response = generate_with_model("architect", arch_prompt) arch_data = _extract_json(arch_response) if not arch_data: raise ValueError("Architect failed to create valid JSON file structures.") for path, content in arch_data.items(): create_file(project_dir, path, content) log_step("ARCHITECT", "File skeletons created.", "\n".join(arch_data.keys())) # 3. CODER source_files = [f for f in plan_data['files'] if f.startswith('src/') and f.endswith('.py')] for file_path in source_files: log_step("CODER", f"Writing complete code for `{file_path}`...") coder_prompt = f"Project purpose: {plan_data['purpose']}. Write the full Python code for the file: `{file_path}`." code = generate_with_model("coder", coder_prompt) create_file(project_dir, file_path, code) log_step("CODER", f"Finished writing `{file_path}`.", code) # 4. REVIEWER log_step("REVIEWER", "Reviewing all generated source code...") for file_path in source_files: code_content = read_file(project_dir, file_path) if not code_content: continue review_prompt = f"Review this code from `{file_path}`:\n\n{code_content}" review_response = generate_with_model("reviewer", review_prompt) review_data = _extract_json(review_response) log_step("REVIEWER", f"Review of `{file_path}` complete.", json.dumps(review_data, indent=2)) # 5. TESTER log_step("TESTER", "Writing unit tests for all source code...") for file_path in source_files: code_content = read_file(project_dir, file_path) if not code_content: continue test_file_path = os.path.join("tests", f"test_{os.path.basename(file_path)}") tester_prompt = f"Write a pytest test file (`{test_file_path}`) for this code from `{file_path}`:\n\n{code_content}" test_code = generate_with_model("tester", tester_prompt) create_file(project_dir, test_file_path, test_code) log_step("TESTER", f"Generated test `{test_file_path}`.", test_code) # 6. PUBLISHER log_step("PUBLISHER", "Generating final documentation and configuration...") all_files = [os.path.join(r, f).replace(project_dir, '', 1) for r, d, fs in os.walk(project_dir) for f in fs] pub_prompt = f"Project file structure: {json.dumps(all_files)}. Generate README.md, .gitignore, and Dockerfile." pub_response = generate_with_model("publisher", pub_prompt) pub_data = _extract_json(pub_response) if not pub_data: raise ValueError("Publisher failed to create valid final assets.") for path, content in pub_data.items(): create_file(project_dir, path, content) log_step("PUBLISHER", "Final assets created.", json.dumps(pub_data, indent=2)) # 7. FINALIZATION log_step("SYSTEM", "Packaging project into a ZIP file...") zip_path = zip_project(project_dir, project_id) update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path) log_step("SYSTEM", "Project completed successfully!", f"Download available.") except Exception as e: tb_str = traceback.format_exc() print(f"--- AGENT CHAIN FAILED for project {project_id} ---\n{tb_str}\n--------------------") error_log = "".join(log_entries) + f"\n\nāŒ **CRITICAL ERROR:**\nAn unexpected error occurred.\n\n**Details:**\n```{str(e)}```" update_project_status(project_id, "failed", logs=error_log) # ------------------------------ JOB QUEUE ------------------------------ executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) def queue_job(project_id, user_id, prompt): print(f"Queuing job for project_id: {project_id}, user_id: {user_id}") executor.submit(run_agent_chain, project_id, user_id, prompt)