Code-agent-team / backend.py
Keeby-smilyai's picture
Update backend.py
a3961a8 verified
import sqlite3
import os
import json
import re
import concurrent.futures
import traceback
import zipfile
import hashlib
from typing import Optional, Dict, Any, List
# --- New dependencies for Web Scraping ---
import requests
from bs4 import BeautifulSoup
# ---
import torch
import psutil
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import os
import concurrent.futures
# Detect number of CPUs in the Hugging Face Space
cpu_count = os.cpu_count() or 1
# Use that many workers
executor = concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count)
# --- CONFIGURATION ---
DB_PATH = "code_agents_pro.db"
PROJECT_ROOT = "./projects"
os.makedirs(PROJECT_ROOT, exist_ok=True)
# ------------------------------ DATABASE (ROBUST) ------------------------------
def init_db():
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.executescript("""
CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT UNIQUE, password_hash TEXT);
CREATE TABLE IF NOT EXISTS projects (id INTEGER PRIMARY KEY, user_id INTEGER, title TEXT, description TEXT, status TEXT DEFAULT 'queued', zip_path TEXT, logs TEXT DEFAULT '', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id));
CREATE INDEX IF NOT EXISTS idx_user_status ON projects(user_id, status);
""")
init_db()
def _db_execute(query, params=(), fetchone=False, fetchall=False, commit=False):
try:
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row; cursor = conn.cursor(); cursor.execute(query, params)
if commit: conn.commit(); return cursor.lastrowid
if fetchone: return cursor.fetchone()
if fetchall: return cursor.fetchall()
except sqlite3.Error as e: print(f"Database error: {e}"); return None
def hash_password(password): return hashlib.sha256(password.encode()).hexdigest()
def verify_password(password, stored_hash): return hash_password(password) == stored_hash
def create_user(username, password):
try: return _db_execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hash_password(password)), commit=True)
except sqlite3.IntegrityError: return None
def get_user_by_username(username): return _db_execute("SELECT * FROM users WHERE username = ?", (username,), fetchone=True)
def get_user_projects(user_id, limit=20): return _db_execute("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit), fetchall=True)
def create_project(user_id, title, description): return _db_execute("INSERT INTO projects (user_id, title, description) VALUES (?, ?, ?)", (user_id, title, description), commit=True)
def update_project_status(project_id, status, logs=None, zip_path=None): _db_execute("UPDATE projects SET status=?, logs=COALESCE(?, logs), zip_path=COALESCE(?, zip_path) WHERE id=?", (status, logs, zip_path, project_id), commit=True)
def get_project(project_id): return _db_execute("SELECT * FROM projects WHERE id = ?", (project_id,), fetchone=True)
# ------------------------------ MODEL LOADING & CACHING ------------------------------
MODEL_REGISTRY = {
"ceo": "Qwen/Qwen3-0.6B",
"manager": "Qwen/Qwen3-0.6B",
"worker_coder": "Qwen/Qwen3-0.6B",
"worker_tester": "Qwen/Qwen3-0.6B",
}
_MODEL_CACHE = {}
def load_model(model_name):
if model_name in _MODEL_CACHE: return _MODEL_CACHE[model_name]
model_kwargs = {"device_map": "auto", "trust_remote_code": True, "attn_implementation": "eager"}
if torch.cuda.is_available():
print(f"CUDA found. Loading '{model_name}' in 4-bit.")
bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16)
model_kwargs["quantization_config"] = bnb_config
else:
print(f"CUDA not found. Loading '{model_name}' on CPU.")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs)
_MODEL_CACHE[model_name] = (tokenizer, model)
print(f"Model {model_name} loaded and cached.")
return tokenizer, model
# ------------------------------ AGENT PROMPTS FOR DYNAMIC FLOW ------------------------------
ROLE_PROMPTS = {
"ceo": """You are the CEO. Your job is to create a high-level plan and delegate the first task to your Manager.
Analyze the user's request and provide a list of files to be created.
Then, create the very first task for the Manager to execute.
Respond ONLY with a JSON object.
Example: {"plan": {"files": ["app.py", "backend.py", "tests/test_app.py"]}, "initial_task": "Start by writing the backend.py file with database functions."}""",
"manager": """You are the Manager. You receive tasks and questions. Your job is to:
1. Break down work into small steps for workers
2. After a worker completes a task, assign the NEXT logical task (code another file OR write tests)
3. Only finish when ALL code and tests are complete
4. If unsure, ask the CEO
Available actions: `delegate_coder`, `delegate_tester`, `answer_worker`, `ask_ceo`, `finish_project`
Respond ONLY with a single tool call on a new line.
Example Actions:
To delegate a task to a Coder:
`delegate_coder("Write the full Python code for app.py")`
To delegate a task to a Tester:
`delegate_tester("Write unit tests for backend.py")`
To answer a worker's question:
`answer_worker("The main file is main.py, and it should use Pygame.")`
To ask a question to the CEO (e.g., if you are confused):
`ask_ceo("Should we use a relational database or a NoSQL database for this project?")`
To finish the entire project:
`finish_project()`""",
"worker_coder": """You are a Coder. Your only job is to write code based on the Manager's task.
If you need more information or documentation to complete a task, you can use the `scrape_web` tool.
After completing a task, report back using the `task_complete` tool. DO NOT decide what to do next.
The Manager will assign the next file to code.
Available actions: `scrape_web`, `write_code`, `read_file`, `ask_manager`, `task_complete`
Respond ONLY with a single tool call on a new line.
Example Actions:
To scrape a website for information:
`scrape_web("https://docs.python.org/3/library/sqlite3.html")`
To write code to a file. The second argument is a raw string of the code:
`write_code("backend.py", "import sqlite3\\n\\ndef init_db():\\n # ...")`
To read the contents of an existing file:
`read_file("main.py")`
To ask the Manager a question:
`ask_manager("What testing framework should I use for this project?")`
To report that you have finished your task:
`task_complete()`""",
"worker_tester": """You are a Tester. Your only job is to write tests for existing code.
If you need to look up testing libraries or best practices, use the `scrape_web` tool.
After completing test writing, report `task_complete`.
The Manager will decide if more tests are needed or if the project can finish.
Available actions: `scrape_web`, `write_test`, `read_file`, `ask_manager`, `task_complete`
Respond ONLY with a single tool call on a new line.
Example Actions:
To scrape a website for information:
`scrape_web("https://docs.pytest.org/en/stable/")`
To write tests to a file. The second argument is a raw string of the test code:
`write_test("tests/test_backend.py", "import pytest\\n\\ndef test_db_connection():\\n # ...")`
To read the contents of an existing file:
`read_file("app.py")`
To ask the Manager a question:
`ask_manager("Are there any specific edge cases I should focus on for the user login tests?")`
To report that you have finished your task:
`task_complete()`"""
}
# --------------------------------- NEW PARSER ---------------------------------
def _extract_tool_call(text: str) -> Optional[Dict[str, Any]]:
"""
Parses a string for a tool-call like `tool_name("arg1", "arg2")` and extracts
the function name and its arguments.
"""
text = text.strip().split("\n")[0] # Only consider the first line
match = re.search(r'(\w+)\((.*)\)', text, re.DOTALL)
if not match:
return None
tool_name = match.group(1)
args_str = match.group(2)
# Simple regex to split arguments, handling quoted strings
# This is a basic approach and might fail on complex arguments
args = []
# Handle single string arguments
if re.fullmatch(r'\s*"(.*?)"\s*', args_str, re.DOTALL) or re.fullmatch(r"\s*'(.*?)'\s*", args_str, re.DOTALL):
args.append(args_str.strip().strip("'\""))
else:
# Simple split by comma for multiple args
for arg in args_str.split(','):
args.append(arg.strip().strip("'\""))
# Convert known numerical args if needed
try:
if tool_name in ["delegate_coder", "delegate_tester"]:
args = [str(arg) for arg in args]
except (ValueError, IndexError):
return None # Return None if parsing fails
return {"tool_name": tool_name, "args": args}
# ------------------------------ FILE SYSTEM & AI TOOLS ------------------------------
def get_project_dir(user_id, project_id):
path = os.path.join(PROJECT_ROOT, str(user_id), str(project_id)); os.makedirs(path, exist_ok=True); return path
def create_file(project_dir, path, content):
full_path = os.path.join(project_dir, path); os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f: f.write(content)
def read_file(project_dir, path):
full_path = os.path.join(project_dir, path)
try:
with open(full_path, 'r', encoding='utf-8') as f: return f.read()
except FileNotFoundError: return "Error: File not found."
def zip_project(project_dir, project_id):
zip_filename = f"project_{project_id}.zip"; zip_path = os.path.join(os.path.dirname(project_dir), zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, _, files in os.walk(project_dir):
for file in files: zf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), project_dir))
return zip_path
def scrape_web(url: str) -> str:
"""Scrapes a URL and returns the clean text content, limited to 5000 chars."""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
for script_or_style in soup(["script", "style"]): script_or_style.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text[:5000]
except requests.exceptions.RequestException as e:
return f"Error: Could not retrieve content from URL. {e}"
except Exception as e:
return f"Error: An unexpected error occurred during web scraping. {e}"
def generate_with_model(role: str, prompt: str) -> str:
try:
model_name = MODEL_REGISTRY[role]; tokenizer, model = load_model(model_name)
messages = [{"role": "system", "content": ROLE_PROMPTS[role]}, {"role": "user", "content": prompt}]
input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = tokenizer(input_text, return_tensors="pt").to(model.device)
outputs = model.generate(**inputs, max_new_tokens=3072, pad_token_id=tokenizer.eos_token_id, use_cache=True)
return tokenizer.decode(outputs[0][len(inputs.input_ids[0]):], skip_special_tokens=True).strip()
except Exception as e:
print(f"Error during model generation for role {role}: {e}"); return f"error({e})"
# ------------------------------ HIERARCHICAL AGENT ORCHESTRATOR ------------------------------
def run_agent_chain(project_id, user_id, initial_prompt):
project_dir = get_project_dir(user_id, project_id)
log_entries = []
def log_step(agent, thought, action_result=""):
log_entry = f"**[{agent.upper()}]**\n> {thought}\n"
if action_result:
log_entry += f"_{action_result}_\n"
log_entry += "\n---\n"
log_entries.append(log_entry)
update_project_status(project_id, "running", logs="".join(log_entries))
try:
project_context = {
"initial_prompt": initial_prompt,
"file_structure": [],
"current_task": "Start the project.",
"last_action_result": None,
"turn": 0,
"max_turns": 30,
}
log_step("ORCHESTRATOR", "Briefing the CEO with the user's request.")
ceo_response_text = generate_with_model("ceo", initial_prompt)
ceo_action = json.loads(ceo_response_text) # CEO still uses JSON for its specific output
if not ceo_action or "plan" not in ceo_action: raise ValueError("CEO failed to provide an initial plan.")
project_context["file_structure"] = ceo_action["plan"].get("files", [])
project_context["current_task"] = ceo_action["initial_task"]
log_step("CEO", f"Initial plan created. First task for Manager: {project_context['current_task']}", f"Files to create: {project_context['file_structure']}")
next_agent = "manager"
while project_context["turn"] < project_context["max_turns"]:
prompt = f"""
Project context:
- User's Goal: {project_context['initial_prompt']}
- File Structure: {project_context['file_structure']}
- Current Task: {project_context['current_task']}
- Result of last action: {project_context['last_action_result']}
Based on the context, decide your next action.
"""
response_text = generate_with_model(next_agent, prompt)
tool_call = _extract_tool_call(response_text)
if not tool_call:
project_context["last_action_result"] = f"Error: Agent {next_agent} returned invalid response. Retrying."
log_step("ORCHESTRATOR", f"Invalid response from {next_agent}. Retrying task.", response_text)
project_context["turn"] += 1
continue
action = tool_call.get("tool_name")
args = tool_call.get("args", [])
log_step(next_agent.replace("_", " "), f"Decided to perform action: `{action}` with args: `{args}`")
if action == "delegate_coder":
if not args:
project_context["last_action_result"] = "Error: Missing task for delegate_coder."
next_agent = "manager"
continue
next_agent = "worker_coder"
project_context["current_task"] = args[0]
project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}."
elif action == "delegate_tester":
if not args:
project_context["last_action_result"] = "Error: Missing task for delegate_tester."
next_agent = "manager"
continue
next_agent = "worker_tester"
project_context["current_task"] = args[0]
project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}."
elif action == "write_code":
if len(args) < 2:
project_context["last_action_result"] = "Error: Missing file_path or code in data."
else:
file_path = args[0]; code = args[1]
create_file(project_dir, file_path, code)
project_context["last_action_result"] = f"Successfully wrote code to {file_path}."
next_agent = "manager"
project_context["current_task"] = f"Worker finished task. What is the next high-level step?"
elif action == "write_test":
if len(args) < 2:
project_context["last_action_result"] = "Error: Missing file_path or code in data."
else:
file_path = args[0]; code = args[1]
create_file(project_dir, file_path, code)
project_context["last_action_result"] = f"Successfully wrote tests to {file_path}."
next_agent = "manager"
project_context["current_task"] = f"Worker finished task. What is the next high-level step?"
elif action == "scrape_web":
if not args:
project_context["last_action_result"] = "Error: Missing URL for scrape_web action."
else:
url = args[0]
log_step(next_agent.replace("_", " "), f"Scraping {url}...")
scraped_content = scrape_web(url)
project_context["last_action_result"] = f"Scraping result from {url}:\n\n{scraped_content}"
continue # Do not increment turn or change agent yet
elif action == "ask_manager":
if not args:
project_context["last_action_result"] = "Error: Missing question for ask_manager."
else:
project_context["last_action_result"] = f"A worker has a question."
project_context["current_task"] = args[0]
next_agent = "manager"
elif action == "answer_worker":
if not args:
project_context["last_action_result"] = "Error: Missing answer for answer_worker."
else:
project_context["last_action_result"] = f"Manager answered: {args[0]}"
project_context["current_task"] = "The worker's question has been answered. What is the next task for a worker?"
next_agent = "manager"
elif action == "task_complete":
project_context["last_action_result"] = "Worker reported task is complete."
project_context["current_task"] = f"Worker finished task. What is the next high-level step?"
next_agent = "manager"
elif action == "finish_project":
log_step("MANAGER", "Project is ready for final packaging.")
break
else:
project_context["last_action_result"] = f"Error: Unknown action '{action}'."
log_step("ORCHESTRATOR", project_context['last_action_result'])
project_context["turn"] += 1
if project_context["turn"] >= project_context["max_turns"]:
raise ValueError("Project failed to complete within the maximum number of turns.")
log_step("SYSTEM", "Packaging project..."); zip_path = zip_project(project_dir, project_id)
update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path)
log_step("SYSTEM", "Project completed successfully!")
update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path)
except Exception as e:
tb_str = traceback.format_exc(); print(f"--- AGENT CHAIN FAILED ---\n{tb_str}\n---")
error_log = "".join(log_entries) + f"\n\n❌ **CRITICAL ERROR:**\n```{str(e)}```"
update_project_status(project_id, "failed", logs=error_log)
# ------------------------------ JOB QUEUE ------------------------------
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def queue_job(project_id, user_id, prompt):
print(f"Queuing job for project: {project_id}"); executor.submit(run_agent_chain, project_id, user_id, prompt)