Code-agent-team / backend.py
Keeby-smilyai's picture
Update backend.py
a3961a8 verified
raw
history blame
20.3 kB
import sqlite3
import os
import json
import re
import concurrent.futures
import traceback
import zipfile
import hashlib
from typing import Optional, Dict, Any, List
# --- New dependencies for Web Scraping ---
import requests
from bs4 import BeautifulSoup
# ---
import torch
import psutil
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import os
import concurrent.futures
# Detect number of CPUs in the Hugging Face Space
cpu_count = os.cpu_count() or 1
# Use that many workers
executor = concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count)
# --- CONFIGURATION ---
DB_PATH = "code_agents_pro.db"
PROJECT_ROOT = "./projects"
os.makedirs(PROJECT_ROOT, exist_ok=True)
# ------------------------------ DATABASE (ROBUST) ------------------------------
def init_db():
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.executescript("""
CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT UNIQUE, password_hash TEXT);
CREATE TABLE IF NOT EXISTS projects (id INTEGER PRIMARY KEY, user_id INTEGER, title TEXT, description TEXT, status TEXT DEFAULT 'queued', zip_path TEXT, logs TEXT DEFAULT '', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id));
CREATE INDEX IF NOT EXISTS idx_user_status ON projects(user_id, status);
""")
init_db()
def _db_execute(query, params=(), fetchone=False, fetchall=False, commit=False):
try:
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row; cursor = conn.cursor(); cursor.execute(query, params)
if commit: conn.commit(); return cursor.lastrowid
if fetchone: return cursor.fetchone()
if fetchall: return cursor.fetchall()
except sqlite3.Error as e: print(f"Database error: {e}"); return None
def hash_password(password): return hashlib.sha256(password.encode()).hexdigest()
def verify_password(password, stored_hash): return hash_password(password) == stored_hash
def create_user(username, password):
try: return _db_execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hash_password(password)), commit=True)
except sqlite3.IntegrityError: return None
def get_user_by_username(username): return _db_execute("SELECT * FROM users WHERE username = ?", (username,), fetchone=True)
def get_user_projects(user_id, limit=20): return _db_execute("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit), fetchall=True)
def create_project(user_id, title, description): return _db_execute("INSERT INTO projects (user_id, title, description) VALUES (?, ?, ?)", (user_id, title, description), commit=True)
def update_project_status(project_id, status, logs=None, zip_path=None): _db_execute("UPDATE projects SET status=?, logs=COALESCE(?, logs), zip_path=COALESCE(?, zip_path) WHERE id=?", (status, logs, zip_path, project_id), commit=True)
def get_project(project_id): return _db_execute("SELECT * FROM projects WHERE id = ?", (project_id,), fetchone=True)
# ------------------------------ MODEL LOADING & CACHING ------------------------------
MODEL_REGISTRY = {
"ceo": "Qwen/Qwen3-0.6B",
"manager": "Qwen/Qwen3-0.6B",
"worker_coder": "Qwen/Qwen3-0.6B",
"worker_tester": "Qwen/Qwen3-0.6B",
}
_MODEL_CACHE = {}
def load_model(model_name):
if model_name in _MODEL_CACHE: return _MODEL_CACHE[model_name]
model_kwargs = {"device_map": "auto", "trust_remote_code": True, "attn_implementation": "eager"}
if torch.cuda.is_available():
print(f"CUDA found. Loading '{model_name}' in 4-bit.")
bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16)
model_kwargs["quantization_config"] = bnb_config
else:
print(f"CUDA not found. Loading '{model_name}' on CPU.")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs)
_MODEL_CACHE[model_name] = (tokenizer, model)
print(f"Model {model_name} loaded and cached.")
return tokenizer, model
# ------------------------------ AGENT PROMPTS FOR DYNAMIC FLOW ------------------------------
ROLE_PROMPTS = {
"ceo": """You are the CEO. Your job is to create a high-level plan and delegate the first task to your Manager.
Analyze the user's request and provide a list of files to be created.
Then, create the very first task for the Manager to execute.
Respond ONLY with a JSON object.
Example: {"plan": {"files": ["app.py", "backend.py", "tests/test_app.py"]}, "initial_task": "Start by writing the backend.py file with database functions."}""",
"manager": """You are the Manager. You receive tasks and questions. Your job is to:
1. Break down work into small steps for workers
2. After a worker completes a task, assign the NEXT logical task (code another file OR write tests)
3. Only finish when ALL code and tests are complete
4. If unsure, ask the CEO
Available actions: `delegate_coder`, `delegate_tester`, `answer_worker`, `ask_ceo`, `finish_project`
Respond ONLY with a single tool call on a new line.
Example Actions:
To delegate a task to a Coder:
`delegate_coder("Write the full Python code for app.py")`
To delegate a task to a Tester:
`delegate_tester("Write unit tests for backend.py")`
To answer a worker's question:
`answer_worker("The main file is main.py, and it should use Pygame.")`
To ask a question to the CEO (e.g., if you are confused):
`ask_ceo("Should we use a relational database or a NoSQL database for this project?")`
To finish the entire project:
`finish_project()`""",
"worker_coder": """You are a Coder. Your only job is to write code based on the Manager's task.
If you need more information or documentation to complete a task, you can use the `scrape_web` tool.
After completing a task, report back using the `task_complete` tool. DO NOT decide what to do next.
The Manager will assign the next file to code.
Available actions: `scrape_web`, `write_code`, `read_file`, `ask_manager`, `task_complete`
Respond ONLY with a single tool call on a new line.
Example Actions:
To scrape a website for information:
`scrape_web("https://docs.python.org/3/library/sqlite3.html")`
To write code to a file. The second argument is a raw string of the code:
`write_code("backend.py", "import sqlite3\\n\\ndef init_db():\\n # ...")`
To read the contents of an existing file:
`read_file("main.py")`
To ask the Manager a question:
`ask_manager("What testing framework should I use for this project?")`
To report that you have finished your task:
`task_complete()`""",
"worker_tester": """You are a Tester. Your only job is to write tests for existing code.
If you need to look up testing libraries or best practices, use the `scrape_web` tool.
After completing test writing, report `task_complete`.
The Manager will decide if more tests are needed or if the project can finish.
Available actions: `scrape_web`, `write_test`, `read_file`, `ask_manager`, `task_complete`
Respond ONLY with a single tool call on a new line.
Example Actions:
To scrape a website for information:
`scrape_web("https://docs.pytest.org/en/stable/")`
To write tests to a file. The second argument is a raw string of the test code:
`write_test("tests/test_backend.py", "import pytest\\n\\ndef test_db_connection():\\n # ...")`
To read the contents of an existing file:
`read_file("app.py")`
To ask the Manager a question:
`ask_manager("Are there any specific edge cases I should focus on for the user login tests?")`
To report that you have finished your task:
`task_complete()`"""
}
# --------------------------------- NEW PARSER ---------------------------------
def _extract_tool_call(text: str) -> Optional[Dict[str, Any]]:
"""
Parses a string for a tool-call like `tool_name("arg1", "arg2")` and extracts
the function name and its arguments.
"""
text = text.strip().split("\n")[0] # Only consider the first line
match = re.search(r'(\w+)\((.*)\)', text, re.DOTALL)
if not match:
return None
tool_name = match.group(1)
args_str = match.group(2)
# Simple regex to split arguments, handling quoted strings
# This is a basic approach and might fail on complex arguments
args = []
# Handle single string arguments
if re.fullmatch(r'\s*"(.*?)"\s*', args_str, re.DOTALL) or re.fullmatch(r"\s*'(.*?)'\s*", args_str, re.DOTALL):
args.append(args_str.strip().strip("'\""))
else:
# Simple split by comma for multiple args
for arg in args_str.split(','):
args.append(arg.strip().strip("'\""))
# Convert known numerical args if needed
try:
if tool_name in ["delegate_coder", "delegate_tester"]:
args = [str(arg) for arg in args]
except (ValueError, IndexError):
return None # Return None if parsing fails
return {"tool_name": tool_name, "args": args}
# ------------------------------ FILE SYSTEM & AI TOOLS ------------------------------
def get_project_dir(user_id, project_id):
path = os.path.join(PROJECT_ROOT, str(user_id), str(project_id)); os.makedirs(path, exist_ok=True); return path
def create_file(project_dir, path, content):
full_path = os.path.join(project_dir, path); os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f: f.write(content)
def read_file(project_dir, path):
full_path = os.path.join(project_dir, path)
try:
with open(full_path, 'r', encoding='utf-8') as f: return f.read()
except FileNotFoundError: return "Error: File not found."
def zip_project(project_dir, project_id):
zip_filename = f"project_{project_id}.zip"; zip_path = os.path.join(os.path.dirname(project_dir), zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, _, files in os.walk(project_dir):
for file in files: zf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), project_dir))
return zip_path
def scrape_web(url: str) -> str:
"""Scrapes a URL and returns the clean text content, limited to 5000 chars."""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
for script_or_style in soup(["script", "style"]): script_or_style.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text[:5000]
except requests.exceptions.RequestException as e:
return f"Error: Could not retrieve content from URL. {e}"
except Exception as e:
return f"Error: An unexpected error occurred during web scraping. {e}"
def generate_with_model(role: str, prompt: str) -> str:
try:
model_name = MODEL_REGISTRY[role]; tokenizer, model = load_model(model_name)
messages = [{"role": "system", "content": ROLE_PROMPTS[role]}, {"role": "user", "content": prompt}]
input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = tokenizer(input_text, return_tensors="pt").to(model.device)
outputs = model.generate(**inputs, max_new_tokens=3072, pad_token_id=tokenizer.eos_token_id, use_cache=True)
return tokenizer.decode(outputs[0][len(inputs.input_ids[0]):], skip_special_tokens=True).strip()
except Exception as e:
print(f"Error during model generation for role {role}: {e}"); return f"error({e})"
# ------------------------------ HIERARCHICAL AGENT ORCHESTRATOR ------------------------------
def run_agent_chain(project_id, user_id, initial_prompt):
project_dir = get_project_dir(user_id, project_id)
log_entries = []
def log_step(agent, thought, action_result=""):
log_entry = f"**[{agent.upper()}]**\n> {thought}\n"
if action_result:
log_entry += f"_{action_result}_\n"
log_entry += "\n---\n"
log_entries.append(log_entry)
update_project_status(project_id, "running", logs="".join(log_entries))
try:
project_context = {
"initial_prompt": initial_prompt,
"file_structure": [],
"current_task": "Start the project.",
"last_action_result": None,
"turn": 0,
"max_turns": 30,
}
log_step("ORCHESTRATOR", "Briefing the CEO with the user's request.")
ceo_response_text = generate_with_model("ceo", initial_prompt)
ceo_action = json.loads(ceo_response_text) # CEO still uses JSON for its specific output
if not ceo_action or "plan" not in ceo_action: raise ValueError("CEO failed to provide an initial plan.")
project_context["file_structure"] = ceo_action["plan"].get("files", [])
project_context["current_task"] = ceo_action["initial_task"]
log_step("CEO", f"Initial plan created. First task for Manager: {project_context['current_task']}", f"Files to create: {project_context['file_structure']}")
next_agent = "manager"
while project_context["turn"] < project_context["max_turns"]:
prompt = f"""
Project context:
- User's Goal: {project_context['initial_prompt']}
- File Structure: {project_context['file_structure']}
- Current Task: {project_context['current_task']}
- Result of last action: {project_context['last_action_result']}
Based on the context, decide your next action.
"""
response_text = generate_with_model(next_agent, prompt)
tool_call = _extract_tool_call(response_text)
if not tool_call:
project_context["last_action_result"] = f"Error: Agent {next_agent} returned invalid response. Retrying."
log_step("ORCHESTRATOR", f"Invalid response from {next_agent}. Retrying task.", response_text)
project_context["turn"] += 1
continue
action = tool_call.get("tool_name")
args = tool_call.get("args", [])
log_step(next_agent.replace("_", " "), f"Decided to perform action: `{action}` with args: `{args}`")
if action == "delegate_coder":
if not args:
project_context["last_action_result"] = "Error: Missing task for delegate_coder."
next_agent = "manager"
continue
next_agent = "worker_coder"
project_context["current_task"] = args[0]
project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}."
elif action == "delegate_tester":
if not args:
project_context["last_action_result"] = "Error: Missing task for delegate_tester."
next_agent = "manager"
continue
next_agent = "worker_tester"
project_context["current_task"] = args[0]
project_context["last_action_result"] = f"Task delegated to {next_agent.replace('_', ' ')}."
elif action == "write_code":
if len(args) < 2:
project_context["last_action_result"] = "Error: Missing file_path or code in data."
else:
file_path = args[0]; code = args[1]
create_file(project_dir, file_path, code)
project_context["last_action_result"] = f"Successfully wrote code to {file_path}."
next_agent = "manager"
project_context["current_task"] = f"Worker finished task. What is the next high-level step?"
elif action == "write_test":
if len(args) < 2:
project_context["last_action_result"] = "Error: Missing file_path or code in data."
else:
file_path = args[0]; code = args[1]
create_file(project_dir, file_path, code)
project_context["last_action_result"] = f"Successfully wrote tests to {file_path}."
next_agent = "manager"
project_context["current_task"] = f"Worker finished task. What is the next high-level step?"
elif action == "scrape_web":
if not args:
project_context["last_action_result"] = "Error: Missing URL for scrape_web action."
else:
url = args[0]
log_step(next_agent.replace("_", " "), f"Scraping {url}...")
scraped_content = scrape_web(url)
project_context["last_action_result"] = f"Scraping result from {url}:\n\n{scraped_content}"
continue # Do not increment turn or change agent yet
elif action == "ask_manager":
if not args:
project_context["last_action_result"] = "Error: Missing question for ask_manager."
else:
project_context["last_action_result"] = f"A worker has a question."
project_context["current_task"] = args[0]
next_agent = "manager"
elif action == "answer_worker":
if not args:
project_context["last_action_result"] = "Error: Missing answer for answer_worker."
else:
project_context["last_action_result"] = f"Manager answered: {args[0]}"
project_context["current_task"] = "The worker's question has been answered. What is the next task for a worker?"
next_agent = "manager"
elif action == "task_complete":
project_context["last_action_result"] = "Worker reported task is complete."
project_context["current_task"] = f"Worker finished task. What is the next high-level step?"
next_agent = "manager"
elif action == "finish_project":
log_step("MANAGER", "Project is ready for final packaging.")
break
else:
project_context["last_action_result"] = f"Error: Unknown action '{action}'."
log_step("ORCHESTRATOR", project_context['last_action_result'])
project_context["turn"] += 1
if project_context["turn"] >= project_context["max_turns"]:
raise ValueError("Project failed to complete within the maximum number of turns.")
log_step("SYSTEM", "Packaging project..."); zip_path = zip_project(project_dir, project_id)
update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path)
log_step("SYSTEM", "Project completed successfully!")
update_project_status(project_id, "completed", logs="".join(log_entries), zip_path=zip_path)
except Exception as e:
tb_str = traceback.format_exc(); print(f"--- AGENT CHAIN FAILED ---\n{tb_str}\n---")
error_log = "".join(log_entries) + f"\n\n❌ **CRITICAL ERROR:**\n```{str(e)}```"
update_project_status(project_id, "failed", logs=error_log)
# ------------------------------ JOB QUEUE ------------------------------
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def queue_job(project_id, user_id, prompt):
print(f"Queuing job for project: {project_id}"); executor.submit(run_agent_chain, project_id, user_id, prompt)