Sami Marreed
feat: docker-v1 with optimized frontend
0646b18
import copy
import json
import os
import shutil
from datetime import datetime
from typing import Any, Dict, List, Optional
import time
import pandas as pd
from cuga.backend.cuga_graph.nodes.api.code_agent.model import CodeAgentOutput
from cuga.backend.tools_env.registry.utils.types import AppDefinition
from cuga.backend.utils.id_utils import mask_with_timestamp, random_id_with_timestamp
from cuga.config import TRAJECTORY_DATA_DIR, settings
from langchain_core.tools import StructuredTool
from loguru import logger
from mcp.types import CallToolResult, TextContent
from pydantic import BaseModel, Field
AGENT_ANALYTICS = True
try:
from agent_analytics.instrumentation.utils import AIEventRecorder
from agent_analytics_core.interfaces.annotations import DataAnnotation
except Exception:
AGENT_ANALYTICS = False
logger.warning("Ignoring agent analytics")
class MergeResult(BaseModel):
folder_name: str
merged_task_ids: List[str]
class Prompt(BaseModel):
role: str
value: str
class Step(BaseModel):
name: Optional[str] = ""
plan: Optional[str] = ""
prompts: List[Prompt] = Field(default_factory=list)
data: Optional[str] = ""
task_decomposition: Optional[str] = ""
current_url: Optional[str] = ""
action_formatted: Optional[str] = ""
action_type: Optional[str] = ""
action_args: Optional[Any] = ""
observation_before: Optional[str] = ""
image_before: Optional[str] = ""
class TasksMetadata(BaseModel):
task_ids: List[str]
description: Optional[str] = ""
experiment_name: str
experiment_folder: str
created_at: str
class ActivityTracker(object):
_instance = None
start_time: float = 0
user_id: str = ""
intent: str = ""
session_id: str = ""
dataset_name: str = ""
prompts: List[Prompt] = []
current_date: Optional[str] = None
pi: Optional[str] = None
eval: Any = None
final_answer: Optional[str] = None
task_id: str = "default"
actions_count: int = 0
token_usage: int = 0
steps: List[Step] = []
images: List[str] = []
score: float = 0.0
tools: Dict[str, List[StructuredTool]] = {}
apps: List[AppDefinition] = []
# Task management attributes
tasks: Dict[str, Dict[str, Any]] = {}
experiment_folder: Optional[str] = None
tasks_metadata: Optional[TasksMetadata] = None
if settings.advanced_features.enable_memory:
from cuga.backend.memory.memory import Memory
memory = Memory()
# Base directory configuration
_base_dir: str = TRAJECTORY_DATA_DIR
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ActivityTracker, cls).__new__(cls)
return cls._instance
async def invoke_tool(self, server_name: str, tool_name: str, args: dict):
if server_name not in self.tools:
raise ValueError(f"Server '{server_name}' not found")
# Find the tool by name
for tool in self.tools[server_name]:
if tool.name == tool_name:
result = await tool.ainvoke(args)
logger.debug(f"type of {type(result)}")
# logger.debug(f"Tool output call {result.con}")
# Check if result is JSON parseable
if isinstance(result, CallToolResult):
result = result.content[0]
if isinstance(result, TextContent):
result = result.text
if isinstance(result, str):
try:
res = json.loads(result)
logger.debug("json res worked!")
return res
except (json.JSONDecodeError, TypeError):
logger.debug("no json tool output !!")
# Not valid JSON, return original result
return result
else:
logger.debug(f"answer is not str answer is of type {type(result)}")
# Result is not a string, return as-is
return result
# Tool not found
available_tools = [tool.name for tool in self.tools[server_name]]
raise ValueError(
f"Tool '{tool_name}' not found in server '{server_name}'. Available tools: {available_tools}"
)
def invoke_tool_sync(self, server_name: str, tool_name: str, args: dict):
"""Synchronous version of invoke_tool to avoid async/sync context issues"""
import asyncio
import concurrent.futures
if server_name not in self.tools:
raise ValueError(f"Server '{server_name}' not found")
# Find the tool by name
for tool in self.tools[server_name]:
if tool.name == tool_name:
# Try synchronous invoke first
try:
result = tool.invoke(args) # Use synchronous invoke
except RuntimeError as e:
if "event loop is already running" in str(e):
# We're in an async context, need to handle this differently
try:
# Check if we have a running loop
asyncio.get_running_loop()
# We're in an async context, create a new thread to run the async function
def run_in_new_loop():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
# Use async invoke in the new loop
async def async_invoke():
return await tool.ainvoke(args)
return new_loop.run_until_complete(async_invoke())
finally:
new_loop.close()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_new_loop)
result = future.result()
except RuntimeError:
# No running loop, use asyncio.run
import asyncio
async def async_invoke():
return await tool.ainvoke(args)
result = asyncio.run(async_invoke())
else:
raise
# logger.debug(f"type of {type(result)}")
# logger.debug(f"Tool output call {result}")
# Check if result is JSON parseable
if isinstance(result, CallToolResult):
result = result.content[0]
if isinstance(result, TextContent):
result = result.text
if isinstance(result, str):
try:
res = json.loads(result)
logger.debug("json res worked!")
return res
except (json.JSONDecodeError, TypeError):
logger.debug("no json tool output !!")
# Not valid JSON, return original result
return result
else:
logger.debug(f"answer is not str answer is of type {type(result)}")
# Result is not a string, return as-is
return result
# Tool not found
available_tools = [tool.name for tool in self.tools[server_name]]
raise ValueError(
f"Tool '{tool_name}' not found in server '{server_name}'. Available tools: {available_tools}"
)
def get_tools_by_server(self, server_name: str) -> Dict[str, Dict]:
tools = self.tools
if server_name not in tools:
return {}
server_tools = {}
for tool in tools[server_name]:
tool_config = {
"app_name": server_name,
"secure": False,
"api_name": tool.name,
"path": '',
"method": '',
"description": tool.description or '',
"parameters": tool.args_schema.model_json_schema(),
"response_schemas": 'Any',
"canary_string": '',
}
server_tools[tool.name] = tool_config
return server_tools
def set_tools(self, tools: List[StructuredTool]):
"""
Detects application prefixes and assigns server_name to tool metadata.
Returns list of AppDefinition objects for all detected applications.
Optionally fills self.tools dictionary with server_name grouped tools.
- For tools with metadata=None OR server_name=None: assigns detected app name or 'default'
- For tools with existing server_name: leaves unchanged
Args:
tools (list): List of tool objects with .name and .metadata attributes
self_tools (dict, optional): Dictionary to fill with server_name grouped tools
Returns:
List[AppDefinition]: List of app definitions with tools description
"""
self.tools = {}
# logger.debug(f"tools: {tools}")
# Common prefixes to exclude (HTTP methods, etc.)
excluded_prefixes = {'get', 'post', 'put', 'delete', 'patch', 'head', 'options', 'trace'}
# Step 1: Extract tool names for analysis (only for tools that need server_name assignment)
tools_to_process = [
tool for tool in tools if tool.metadata is None or tool.metadata.get("server_name", None) is None
]
tool_names = [tool.name for tool in tools_to_process]
# Step 2: Find potential prefixes and count occurrences
prefix_candidates = {}
for tool_name in tool_names:
# Split by underscore and take the first part as potential prefix
if '_' in tool_name:
potential_prefix = tool_name.split('_')[0].lower()
# Skip if it's an excluded prefix
if potential_prefix not in excluded_prefixes:
if potential_prefix not in prefix_candidates:
prefix_candidates[potential_prefix] = []
prefix_candidates[potential_prefix].append(tool_name)
# Step 3: Filter prefixes that appear in multiple tools (consistency check)
detected_applications = {}
for prefix, tool_list in prefix_candidates.items():
if len(tool_list) > 1: # Prefix appears in multiple tools - consistent!
detected_applications[prefix.upper()] = tool_list
# Step 4: Assign server_name to metadata for tools that need it
for tool in tools:
# Only process tools with metadata=None OR server_name=None
if tool.metadata is None or tool.metadata.get("server_name", None) is None:
tool_name = tool.name
server_name = 'default_app' # Default assignment
# Check if this tool belongs to any detected application
for app_name, app_tools in detected_applications.items():
if tool_name in app_tools:
server_name = app_name
break
# Initialize metadata if it's None, otherwise just update server_name
if tool.metadata is None:
tool.metadata = {"server_name": server_name}
else:
tool.metadata["server_name"] = server_name
# Step 5: Fill self.tools dictionary if provided
for tool in tools:
# Get server_name from tool metadata
server_name = tool.metadata.get('server_name')
# Skip tools without server_name metadata
if server_name is None:
raise Exception("Tool server name is none!")
# Initialize list for this server if it doesn't exist
if server_name not in self.tools:
self.tools[server_name] = []
# Add tool to the appropriate server group
self.tools[server_name].append(tool)
# Step 6: Collect all unique server_names and their associated tools
app_tools_map = {}
for tool in tools:
if tool.metadata is not None:
server_name = tool.metadata.get("server_name")
if server_name:
if server_name not in app_tools_map:
app_tools_map[server_name] = []
app_tools_map[server_name].append(tool)
# Step 7: Create AppDefinition objects
app_definitions = []
for app_name, tool_list in app_tools_map.items():
tools_description = "Available tools:\n" + "\n".join(
f"{tool.name}: {tool.description}" if tool.description else f"{tool.name}:"
for tool in sorted(tool_list, key=lambda x: x.name)
)
app_def = AppDefinition(name=app_name, description=tools_description, url=None)
app_definitions.append(app_def)
self.apps = app_definitions
def set_base_dir(self, base_dir: str) -> None:
"""
Set the base directory for logging trajectory data.
Args:
base_dir (str): The base directory path for storing experiment data
"""
self._base_dir = base_dir
logger.info(f"Base directory set to: {self._base_dir}")
def get_base_dir(self) -> str:
"""
Get the current base directory for logging trajectory data.
Returns:
str: The current base directory path
"""
return self._base_dir
def get_current_trajectory_path(self) -> Optional[str]:
"""
Get the full path of the current experiment folder.
Returns:
Optional[str]: The full path of the experiment folder, or None if no experiment is active.
"""
if self.experiment_folder:
return os.path.join(self._base_dir, self.experiment_folder, self.task_id + ".json")
return ""
def generate_session_id(self):
self.session_id = random_id_with_timestamp(full_date=True)
def reset(self, intent, task_id="default"):
self.token_usage = 0
self.start_time = time.time()
self.current_date = None
self.pi = None
self.prompts = []
self.steps = []
self.images = []
self.actions_count = 0
self.final_answer = None
self.task_id = task_id
self.intent = intent
self.user_id = None
def reload_steps(self, task_id: Optional[str] = None) -> bool:
"""
Reload steps from the current experiment's task JSON file.
Args:
task_id (str, optional): Task ID to reload. If None, uses current task_id.
Returns:
bool: True if steps were successfully reloaded, False otherwise.
"""
# Use provided task_id or fall back to current task_id
target_task_id = task_id if task_id is not None else self.task_id
if not target_task_id or target_task_id == "default":
logger.error("No valid task_id provided for reloading steps")
return False
# Get the trajectory path for the specified task
self.task_id = target_task_id
trajectory_path = self.get_current_trajectory_path()
if not trajectory_path:
logger.error(f"No trajectory path found for task_id: {target_task_id}")
return False
if not os.path.exists(trajectory_path):
logger.error(f"Trajectory file does not exist: {trajectory_path}")
return False
try:
# Read the JSON file
with open(trajectory_path, 'r', encoding='utf-8') as f:
trajectory_data = json.load(f)
# Extract steps from the JSON
steps_data = trajectory_data.get('steps', [])
# Convert dictionaries back to Step objects
reloaded_steps = []
for step_dict in steps_data:
try:
step = Step(**step_dict)
reloaded_steps.append(step)
except Exception as e:
logger.warning(f"Failed to convert step data to Step object: {e}")
continue
# Update current steps
self.steps = reloaded_steps
logger.info(f"Successfully reloaded {len(reloaded_steps)} steps for task_id: {target_task_id}")
return True
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Error reading trajectory file {trajectory_path}: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error while reloading steps: {e}")
return False
def start_experiment(
self, task_ids: List[str], experiment_name: str, description: Optional[str] = ""
) -> str:
"""
Start a new experiment with given task IDs.
Args:
task_ids (List[str]): List of task IDs for this experiment
experiment_name (str): Name of the experiment
description (str, optional): Description of the experiment
Returns:
str: The experiment folder name
"""
# Generate experiment folder name using mask_with_timestamp
self.experiment_folder = mask_with_timestamp(experiment_name, full_date=True)
# Create metadata
self.tasks_metadata = TasksMetadata(
task_ids=task_ids,
description=description,
experiment_name=experiment_name,
experiment_folder=self.experiment_folder,
created_at=datetime.now().isoformat(),
)
# Only create files and directories if tracker is enabled
if settings.advanced_features.tracker_enabled:
# Create directory structure
experiment_dir = os.path.join(self._base_dir, self.experiment_folder)
os.makedirs(experiment_dir, exist_ok=True)
# Save metadata to file
metadata_path = os.path.join(experiment_dir, "metadata.json")
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(self.tasks_metadata.model_dump(), f, indent=2, ensure_ascii=False)
# Initialize empty files
self._initialize_experiment_files(experiment_dir)
# Reset tasks dictionary
self.tasks = {}
if settings.advanced_features.enable_memory:
from cuga.backend.memory.agentic_memory.client.exceptions import NamespaceNotFoundException
try:
self.memory.get_namespace_details(namespace_id="memory")
except NamespaceNotFoundException:
self.memory.create_namespace(namespace_id="memory")
self.memory.create_run(namespace_id="memory", run_id=self.experiment_folder)
# Start timer
self.start_time = time.time()
return self.experiment_folder
def _initialize_experiment_files(self, experiment_dir: str) -> None:
"""Initialize empty result files for the experiment."""
# Define column order for CSV
columns = [
'task_id',
'site',
'intent',
'agent_answer',
'eval',
'score',
'exception',
'num_steps',
'fail_category',
'agent_v',
]
# Create empty results.csv
results_csv_path = os.path.join(experiment_dir, "results.csv")
df = pd.DataFrame(columns=columns)
df.to_csv(results_csv_path, index=False, encoding='utf-8')
# Create empty results.json
results_json_path = os.path.join(experiment_dir, "results.json")
with open(results_json_path, 'w', encoding='utf-8') as f:
json.dump({}, f, indent=2, ensure_ascii=False)
# Create empty .progress file
progress_path = os.path.join(experiment_dir, ".progress")
with open(progress_path, 'w', encoding='utf-8') as f:
f.write("")
def collect_prompt(self, role: str, value: str):
self.prompts.append(Prompt(role=role, value=value))
def collect_tokens_usage(self, count: int) -> None:
"""
Increases the number of tokens used.
Args:
count (int): The number of times the token is used.
"""
self.token_usage += count
def collect_image(self, img: str) -> None:
if not img:
return
# Ensure the image string is compatible with OpenAI vision API: must be a valid URL or a data URL.
if img.startswith("data:image") or img.startswith("http://") or img.startswith("https://"):
self.images.append(img)
else:
# Assume raw base64 PNG data; prepend appropriate data URL header.
self.images.append(f"data:image/png;base64,{img}")
def collect_step(self, step: Step) -> None:
"""
Collects a step, adding it to the steps list.
Args:
step (Step): The description of the step to collect.
"""
data_json = None
try:
data_json = json.loads(step.data)
except Exception:
pass
# Attach any collected prompts to this step so they are persisted
if getattr(self, "prompts", None):
try:
step.prompts = list(self.prompts)
except Exception:
# Ensure prompts never break logging
step.prompts = []
# Attach the most recent captured image (if any) to the step
if getattr(self, "images", None):
try:
# Use the last captured screenshot as the "before" image
step.image_before = self.images[-1]
except Exception:
step.image_before = None
if AGENT_ANALYTICS:
if step.name == "TaskAnalyzerAgent":
AIEventRecorder.record_data_annotation(
name=step.name,
annotation_type=DataAnnotation.Type.RAW_TEXT,
annotation_title="Intent",
annotation_content=self.intent,
)
if step.name == "CodeAgent":
res_obj = CodeAgentOutput(**json.loads(step.data))
AIEventRecorder.record_data_annotation(
name="CodeAgent",
annotation_type=DataAnnotation.Type.CODE_GENERATION,
annotation_title="Generated Code",
annotation_content="\n" + res_obj.code,
)
AIEventRecorder.record_data_annotation(
name="CodeAgent",
annotation_type=DataAnnotation.Type.CODE_SNIPPET,
annotation_title="Code output",
annotation_content="\n" + res_obj.execution_output,
)
AIEventRecorder.record_data_annotation(
name="CodeAgent",
annotation_type=DataAnnotation.Type.RAW_TEXT,
annotation_title="Output summary",
annotation_content="\n" + res_obj.summary,
)
else:
if data_json and isinstance(data_json, dict):
if data_json.get('thoughts', None):
AIEventRecorder.record_data_annotation(
name=step.name,
annotation_type=DataAnnotation.Type.THOUGHT,
annotation_title=step.name,
annotation_content=f"{data_json.get('thoughts', None)}",
)
if len(list(data_json.keys())) == 1 and isinstance(
data_json[list(data_json.keys())[0]], str
):
AIEventRecorder.record_data_annotation(
name=step.name,
annotation_type=DataAnnotation.Type.RAW_TEXT,
annotation_title=step.name,
annotation_content=f"\n\n{data_json[list(data_json.keys())[0]]}",
)
else:
AIEventRecorder.record_data_annotation(
name=step.name,
annotation_type=DataAnnotation.Type.RAW_TEXT,
annotation_title=step.name,
annotation_content=json.dumps(data_json),
)
else:
AIEventRecorder.record_data_annotation(
name=step.name,
annotation_type=DataAnnotation.Type.RAW_TEXT,
annotation_title=step.name,
annotation_content=f"{step.data}",
)
if step.image_before:
AIEventRecorder.record_data_annotation(
name=step.name,
annotation_type=DataAnnotation.Type.MULTIMODAL_DATA,
annotation_title="Image",
annotation_content=f"{step.image_before}",
)
if settings.advanced_features.enable_memory:
from cuga.backend.memory.agentic_memory.utils.prompts import prompts
# Include intent in step metadata so it's available during tip extraction
step_data = step.model_dump()
step_data['intent'] = self.intent # Add the user's task intent
self.memory.add_step(
namespace_id='memory',
run_id=self.experiment_folder,
step=step_data,
prompt=prompts[step.name],
)
step.prompts = copy.deepcopy(self.prompts)
self.prompts = []
self.steps.append(step)
if settings.advanced_features.enable_memory and step.name == "FinalAnswerAgent":
# End run and execute any background processing.
self.memory.end_run(namespace_id="memory", run_id=self.experiment_folder)
if settings.advanced_features.tracker_enabled:
self.to_file()
self.prompts = []
def collect_step_external(self, step: Step, full_path: Optional[str] = None) -> None:
"""
Collects a step and saves it to a separate log file in a directory
specified by an environment variable.
The path is retrieved from os.environ['current_folder_path'].
The steps are saved to a file named 'recordinglg.json' in that directory.
Args:
step (Step): The Step object to collect.
full_path (Optional[str]): The full file path to save to. If None, the step is skipped.
TODO: Properly handle None full_path case - either provide a default path or make the
calling code always provide a valid path. Currently returns early if None to avoid errors.
"""
try:
if not settings.advanced_features.tracker_enabled:
return
# TODO: Handle None full_path properly - either use a default path or require callers to provide one
if not full_path:
logger.debug("Skipping external step collection: full_path is None")
return
if not os.path.exists(os.path.dirname(full_path)):
logger.error(
f"External path directory not found or does not exist: {os.path.dirname(full_path)}"
)
return
step.prompts = copy.deepcopy(self.prompts)
self.prompts = []
self.steps.append(step)
self._to_file_external_append(full_path, step)
logger.info(f"Step appended to external file: {full_path}")
except Exception as e:
logger.error(f"Failed to collect and save external step: {e}")
def _to_file_external_append(self, full_path: str, new_step: Step):
"""
Append a new step to an existing JSON file or create a new file if it doesn't exist.
This method reads the existing file, appends the new step, and saves it back.
Args:
full_path (str): The full file path to save/append to.
new_step (Step): The new step to append.
"""
try:
# Check if file exists and read existing data
if os.path.exists(full_path):
with open(full_path, 'r', encoding='utf-8') as f:
try:
existing_data = json.load(f)
# Ensure the existing data has the expected structure
if not isinstance(existing_data, dict) or 'steps' not in existing_data:
logger.warning(f"Invalid JSON structure in {full_path}, creating new file")
existing_data = None
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in {full_path}, creating new file: {e}")
existing_data = None
else:
existing_data = None
# If no valid existing data, create new structure
if existing_data is None:
data_to_save = {
"intent": self.intent,
"dataset_name": self.dataset_name,
"actions_count": self.actions_count,
"task_id": self.task_id,
"eval": self.eval,
"steps": [new_step.model_dump()],
"score": self.score,
}
else:
# Update existing data with new step
existing_data["steps"].append(new_step.model_dump())
# Update other fields that might have changed
existing_data.update(
{
"intent": self.intent,
"dataset_name": self.dataset_name,
"actions_count": self.actions_count,
"task_id": self.task_id,
"eval": self.eval,
"score": self.score,
}
)
data_to_save = existing_data
# Write the updated data back to file
with open(full_path, 'w', encoding='utf-8') as f:
json.dump(
data_to_save,
f,
ensure_ascii=False,
indent=4,
)
except Exception as e:
logger.error(f"Failed to append step to file {full_path}: {e}")
raise
def collect_score(self, score: float) -> None:
"""
Collects a step, adding it to the steps list.
Args:
score (str): The description of the step to collect.
"""
self.score = score
if settings.advanced_features.tracker_enabled:
self.to_file()
def collect_step_with_pass(self) -> None:
"""
Placeholder for collecting a step.
"""
pass
def to_file(self):
"""Save current task data to file in the experiment directory."""
if self.experiment_folder:
# Save to experiment directory
source_dir = os.path.join(self._base_dir, self.experiment_folder)
else:
# Fallback to original behavior
source_dir = "logging{}".format("_" + self.dataset_name if self.dataset_name else "")
os.makedirs(source_dir, exist_ok=True)
filename = self.task_id if self.task_id != "default" else self.session_id
filepath = os.path.join(source_dir, f"{filename}.json")
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(
{
"intent": self.intent,
"dataset_name": self.dataset_name,
"actions_count": self.actions_count,
"task_id": self.task_id,
"eval": self.eval,
"steps": [d.model_dump() for d in self.steps],
"score": self.score,
},
f,
ensure_ascii=False,
indent=4,
)
def finish_task(
self,
task_id: str,
site: str,
intent: str,
agent_answer: Optional[str] = None,
eval: Optional[str] = None,
score: Optional[float] = None,
exception: Optional[bool] = None,
num_steps: Optional[int] = None,
fail_category: Optional[str] = None,
agent_v: Optional[str] = None,
duration: Optional[int] = None,
total_llm_calls: Optional[int] = None,
total_tokens: Optional[int] = None,
total_cost: Optional[float] = None,
total_cache_input_tokens: Optional[int] = None,
) -> str:
"""
Mark a task as finished and update result files.
Args:
task_id (str): Required unique identifier for the task
site (str): Required site name
intent (str): Task intent/description
agent_answer (str, optional): Agent's answer
eval (str, optional): Evaluation details
score (float, optional): Task score
exception (bool, optional): Whether an exception occurred
num_steps (int, optional): Number of steps taken
fail_category (str, optional): Category of failure if applicable
agent_v (str, optional): Agent version
Returns:
str: The ID of the finished task
"""
if not self.experiment_folder:
raise ValueError("No experiment started. Call start_experiment() first.")
# Calculate number of api calls
api_calls_num = len([step for step in self.steps if "api_call" in step.name])
# Add task to internal storage
self.tasks[task_id] = {
"site": site,
"intent": intent,
"agent_answer": agent_answer,
"eval": eval,
"score": score,
"exception": exception,
"num_steps": num_steps if num_steps is not None else len(self.steps),
"fail_category": fail_category,
"agent_v": agent_v,
"duration": duration if duration is not None else time.time() - self.start_time,
"total_llm_calls": total_llm_calls,
"total_tokens": self.token_usage if not total_tokens else total_tokens,
"api_calls": api_calls_num,
"total_cost": total_cost,
"total_cache_input_tokens": total_cache_input_tokens,
}
# Update result files only if tracker is enabled
if settings.advanced_features.tracker_enabled:
self._update_result_files()
self._add_to_progress_file(task_id)
return task_id
def _update_result_files(self) -> None:
"""Update both JSON and CSV result files."""
if not self.experiment_folder:
return
experiment_dir = os.path.join(self._base_dir, self.experiment_folder)
# Update results.json
results_json_path = os.path.join(experiment_dir, "results.json")
with open(results_json_path, 'w', encoding='utf-8') as f:
json.dump(self.tasks, f, indent=2, ensure_ascii=False)
# Update results.csv
self._save_csv(experiment_dir)
def _save_csv(self, experiment_dir: str) -> None:
"""Save current tasks to CSV file using pandas."""
# Define the column order
columns = [
'task_id',
'site',
'intent',
'agent_answer',
'eval',
'score',
'exception',
'num_steps',
'fail_category',
'agent_v',
'duration',
'total_llm_calls',
'total_tokens',
'api_calls',
'total_cost',
'total_cache_input_tokens',
]
if not self.tasks:
# Create empty DataFrame with headers if no tasks
df = pd.DataFrame(columns=columns)
else:
# Convert tasks dictionary to list of dictionaries for DataFrame
data = []
for task_id, task_data in self.tasks.items():
row = {'task_id': task_id}
row.update(task_data)
data.append(row)
# Create DataFrame
df = pd.DataFrame(data)
# Reorder columns to match the desired order
df = df.reindex(columns=columns)
# Save to CSV
results_csv_path = os.path.join(experiment_dir, "results.csv")
df.to_csv(results_csv_path, index=False, encoding='utf-8')
def _add_to_progress_file(self, task_id: str) -> None:
"""Add a task ID to the .progress file."""
if not self.experiment_folder:
return
progress_path = os.path.join(self._base_dir, self.experiment_folder, ".progress")
with open(progress_path, 'a', encoding='utf-8') as f:
f.write(task_id + '\n')
def update_task(
self,
task_id: str,
site: Optional[str] = None,
intent: Optional[str] = None,
agent_answer: Optional[str] = None,
eval: Optional[str] = None,
score: Optional[float] = None,
exception: Optional[bool] = None,
num_steps: Optional[int] = None,
fail_category: Optional[str] = None,
agent_v: Optional[str] = None,
) -> bool:
"""
Update an existing task.
Args:
task_id (str): ID of the task to update
site (str, optional): New site
intent (str, optional): New intent
agent_answer (str, optional): New agent answer
eval (str, optional): New evaluation
score (float, optional): New score
exception (bool, optional): New exception status
num_steps (int, optional): New number of steps
fail_category (str, optional): New fail category
agent_v (str, optional): New agent version
Returns:
bool: True if task was updated, False if task not found
"""
if task_id not in self.tasks:
return False
# Update only provided fields
if site is not None:
self.tasks[task_id]["site"] = site
if intent is not None:
self.tasks[task_id]["intent"] = intent
if agent_answer is not None:
self.tasks[task_id]["agent_answer"] = agent_answer
if eval is not None:
self.tasks[task_id]["eval"] = eval
if score is not None:
self.tasks[task_id]["score"] = score
if exception is not None:
self.tasks[task_id]["exception"] = exception
if num_steps is not None:
self.tasks[task_id]["num_steps"] = num_steps
if fail_category is not None:
self.tasks[task_id]["fail_category"] = fail_category
if agent_v is not None:
self.tasks[task_id]["agent_v"] = agent_v
if settings.advanced_features.tracker_enabled:
self._update_result_files()
return True
def remove_task(self, task_id: str) -> bool:
"""
Remove a task from the results.
Args:
task_id (str): ID of the task to remove
Returns:
bool: True if task was removed, False if task not found
"""
if task_id in self.tasks:
del self.tasks[task_id]
if settings.advanced_features.tracker_enabled:
self._update_result_files()
return True
return False
def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
Get a specific task by ID.
Args:
task_id (str): ID of the task to retrieve
Returns:
Dict containing task data or None if not found
"""
return self.tasks.get(task_id)
def get_all_tasks(self) -> Dict[str, Dict[str, Any]]:
"""
Get all tasks.
Returns:
Dict containing all tasks
"""
return self.tasks.copy()
def find_tasks_by_score(self, score: float) -> Dict[str, Dict[str, Any]]:
"""
Find all tasks with a specific score.
Args:
score (float): Score to search for
Returns:
Dict containing matching tasks
"""
return {task_id: task for task_id, task in self.tasks.items() if task.get("score") == score}
def find_tasks_by_site(self, site: str) -> Dict[str, Dict[str, Any]]:
"""
Find all tasks with a specific site.
Args:
site (str): Site to search for
Returns:
Dict containing matching tasks
"""
return {task_id: task for task_id, task in self.tasks.items() if task.get("site") == site}
def find_tasks_by_exception(self, exception: bool) -> Dict[str, Dict[str, Any]]:
"""
Find all tasks with specific exception status.
Args:
exception (bool): Exception status to search for
Returns:
Dict containing matching tasks
"""
return {task_id: task for task_id, task in self.tasks.items() if task.get("exception") == exception}
def find_tasks_by_agent_version(self, agent_v: str) -> Dict[str, Dict[str, Any]]:
"""
Find all tasks with a specific agent version.
Args:
agent_v (str): Agent version to search for
Returns:
Dict containing matching tasks
"""
return {task_id: task for task_id, task in self.tasks.items() if task.get("agent_v") == agent_v}
def clear_all_tasks(self) -> None:
"""Remove all tasks from result files."""
self.tasks = {}
if self.experiment_folder and settings.advanced_features.tracker_enabled:
self._update_result_files()
# Clear progress file
progress_path = os.path.join(self._base_dir, self.experiment_folder, ".progress")
with open(progress_path, 'w', encoding='utf-8') as f:
f.truncate(0)
def get_task_count(self) -> int:
"""
Get the total number of tasks.
Returns:
int: Number of tasks
"""
return len(self.tasks)
def get_statistics(self) -> Dict[str, Any]:
"""
Get basic statistics about the tasks.
Returns:
Dict containing task statistics
"""
if not self.tasks:
return {"total_tasks": 0}
stats = {
"total_tasks": len(self.tasks),
"tasks_with_exceptions": len([t for t in self.tasks.values() if t.get("exception") is True]),
"tasks_without_exceptions": len([t for t in self.tasks.values() if t.get("exception") is False]),
"unique_sites": len(set(t.get("site") for t in self.tasks.values() if t.get("site"))),
"unique_agent_versions": len(
set(t.get("agent_v") for t in self.tasks.values() if t.get("agent_v"))
),
}
# Score statistics
scores = [t.get("score") for t in self.tasks.values() if t.get("score") is not None]
if scores:
stats["average_score"] = sum(scores) / len(scores)
stats["min_score"] = min(scores)
stats["max_score"] = max(scores)
return stats
def get_dataframe(self) -> pd.DataFrame:
"""
Get all tasks as a pandas DataFrame.
Returns:
pd.DataFrame: DataFrame containing all tasks
"""
columns = [
'task_id',
'site',
'intent',
'agent_answer',
'eval',
'score',
'exception',
'num_steps',
'fail_category',
'agent_v',
]
if not self.tasks:
return pd.DataFrame(columns=columns)
data = []
for task_id, task_data in self.tasks.items():
row = {'task_id': task_id}
row.update(task_data)
data.append(row)
df = pd.DataFrame(data)
return df.reindex(columns=columns)
def _copy_task_json_files(
self,
source_folders: List[str],
target_folder: str,
selected_task_ids: List[str],
base_dir: str = None,
) -> None:
"""
Copy individual task JSON files from source folders to target folder.
Args:
source_folders (List[str]): List of source experiment folder names
target_folder (str): Target experiment folder name
selected_task_ids (List[str]): List of task IDs to copy
base_dir (str, optional): Base directory. If None, uses instance base_dir
"""
if base_dir is None:
base_dir = self._base_dir
target_dir = os.path.join(base_dir, target_folder)
copied_files = 0
skipped_files = 0
for task_id in selected_task_ids:
file_found = False
# Look for the task JSON file in each source folder
for folder_name in source_folders:
source_dir = os.path.join(base_dir, folder_name)
source_file = os.path.join(source_dir, f"{task_id}.json")
if os.path.exists(source_file):
target_file = os.path.join(target_dir, f"{task_id}.json")
try:
# Copy the file
shutil.copy2(source_file, target_file)
logger.debug(f"Copied {task_id}.json from {folder_name}")
copied_files += 1
file_found = True
break # Found and copied, move to next task
except Exception as e:
logger.error(f"Failed to copy {task_id}.json from {folder_name}: {e}")
if not file_found:
logger.warning(f"Task JSON file {task_id}.json not found in any source folder")
skipped_files += 1
logger.info(f"Task JSON files - Copied: {copied_files}, Skipped: {skipped_files}")
def merge_experiments(
self,
experiment_folders: List[str],
output_experiment_name: str,
description: Optional[str] = "Merged experiments",
output_folder: Optional[str] = None,
) -> MergeResult:
"""
Merge multiple experiment folders, preferring tasks with score 1.0 over 0.0.
Also copies individual task JSON files from source experiments.
Args:
experiment_folders (List[str]): List of experiment folder names to merge
output_experiment_name (str): Name for the merged experiment
description (str, optional): Description for the merged experiment
Returns:
MergeResult: Contains folder_name and merged_task_ids
"""
logger.info(f"Starting merge of {len(experiment_folders)} experiments")
# Create new experiment for merged results
merged_folder = self.start_experiment(
task_ids=[], # Will be populated with merged task IDs
experiment_name=output_experiment_name,
description=description,
)
merged_tasks = {}
all_task_ids = set()
task_source_mapping = {} # Track which folder each task came from
# First pass: collect all tasks and identify duplicates
for folder_name in experiment_folders:
folder_path = os.path.join(self._base_dir, folder_name)
results_json_path = os.path.join(folder_path, "results.json")
if not os.path.exists(results_json_path):
logger.warning(f"Results file not found in {folder_name}, skipping")
continue
try:
with open(results_json_path, 'r', encoding='utf-8') as f:
folder_tasks = json.load(f)
logger.info(f"Processing {len(folder_tasks)} tasks from {folder_name}")
for task_id, task_data in folder_tasks.items():
all_task_ids.add(task_id)
if task_id not in merged_tasks:
# First occurrence of this task
merged_tasks[task_id] = {**task_data, 'source_experiment': folder_name}
task_source_mapping[task_id] = folder_name
logger.debug(f"Added new task {task_id} from {folder_name}")
else:
# Task already exists, apply preference logic
existing_score = merged_tasks[task_id].get('score', 0.0)
new_score = task_data.get('score', 0.0)
if existing_score == 1.0 and new_score != 1.0:
# Keep existing (perfect score)
should_replace = False
elif existing_score != 1.0 and new_score == 1.0:
# Replace with perfect score
should_replace = True
elif existing_score == new_score:
# Same score, keep existing (first found)
should_replace = False
else:
# Different scores, prefer higher
should_replace = new_score > existing_score
if should_replace:
merged_tasks[task_id] = {**task_data, 'source_experiment': folder_name}
task_source_mapping[task_id] = folder_name
logger.debug(
f"Replaced task {task_id}: {existing_score} -> {new_score} from {folder_name}"
)
else:
logger.debug(
f"Kept existing task {task_id}: score {existing_score} vs {new_score}"
)
except Exception as e:
logger.error(f"Error processing {folder_name}: {e}")
continue
# Update the merged experiment with final task list
self.tasks = merged_tasks
# Update metadata with actual task IDs
if self.tasks_metadata:
self.tasks_metadata.task_ids = list(all_task_ids)
# Save updated metadata
experiment_dir = os.path.join(self._base_dir, merged_folder)
metadata_path = os.path.join(experiment_dir, "metadata.json")
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(self.tasks_metadata.model_dump(), f, indent=2, ensure_ascii=False)
# Update result files with merged data only if tracker is enabled
if settings.advanced_features.tracker_enabled:
self._update_result_files()
# Update progress file with all task IDs
for task_id in merged_tasks.keys():
self._add_to_progress_file(task_id)
# Copy individual task JSON files only if tracker is enabled
if settings.advanced_features.tracker_enabled:
logger.info("Copying individual task JSON files...")
selected_task_ids = list(merged_tasks.keys())
self._copy_task_json_files(experiment_folders, merged_folder, selected_task_ids)
logger.success(f"Successfully merged {len(merged_tasks)} tasks into {merged_folder}")
logger.info(f"Source experiments: {experiment_folders}")
score_distribution = {}
source_distribution = {}
for task_data in merged_tasks.values():
score = task_data.get('score', 0.0)
source = task_data.get('source_experiment', 'unknown')
score_distribution[score] = score_distribution.get(score, 0) + 1
source_distribution[source] = source_distribution.get(source, 0) + 1
logger.info(f"Score distribution in merged results: {score_distribution}")
logger.info(f"Source distribution in merged results: {source_distribution}")
# Return MergeResult
return MergeResult(folder_name=merged_folder, merged_task_ids=list(merged_tasks.keys()))
def list_experiment_folders(self, base_path: Optional[str] = None) -> List[str]:
"""
List all available experiment folders.
Args:
base_path (str, optional): Base directory to search for experiments.
If None, uses instance base_dir
Returns:
List[str]: List of experiment folder names
"""
if base_path is None:
base_path = self._base_dir
if not os.path.exists(base_path):
logger.warning(f"Base path {base_path} does not exist")
return []
folders = []
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isdir(item_path):
# Check if it looks like an experiment folder (has metadata.json)
metadata_path = os.path.join(item_path, "metadata.json")
if os.path.exists(metadata_path):
folders.append(item)
logger.info(f"Found {len(folders)} experiment folders")
return sorted(folders)
@staticmethod
def list_experiment_folders_static(base_path: str = "./logging/trajectory_data") -> List[str]:
"""
Static method to list all available experiment folders.
Args:
base_path (str): Base directory to search for experiments
Returns:
List[str]: List of experiment folder names
"""
if not os.path.exists(base_path):
logger.warning(f"Base path {base_path} does not exist")
return []
folders = []
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isdir(item_path):
# Check if it looks like an experiment folder (has metadata.json)
metadata_path = os.path.join(item_path, "metadata.json")
if os.path.exists(metadata_path):
folders.append(item)
logger.info(f"Found {len(folders)} experiment folders")
return sorted(folders)
def get_experiment_progress(self, experiment_folder_name: str) -> Dict[str, Any]:
"""
Get the progress of a specific experiment.
Args:
experiment_folder_name (str): The name of the experiment folder.
Returns:
Dict[str, Any]: A dictionary containing 'total_tasks', 'completed_tasks', and 'uncompleted_task_ids'.
Returns default values if files are not found or errors occur.
"""
experiment_dir = os.path.join(self._base_dir, experiment_folder_name)
metadata_path = os.path.join(experiment_dir, "metadata.json")
progress_path = os.path.join(experiment_dir, ".progress")
total_tasks = 0
completed_tasks = 0
all_task_ids = set()
completed_task_ids = set()
# Read total tasks from metadata.json
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
all_task_ids = set(metadata.get('task_ids', []))
total_tasks = len(all_task_ids)
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Error reading metadata.json for {experiment_folder_name}: {e}")
else:
logger.warning(f"metadata.json not found for experiment: {experiment_folder_name}")
# Read completed tasks from .progress
if os.path.exists(progress_path):
try:
with open(progress_path, 'r', encoding='utf-8') as f:
completed_task_ids = set(line.strip() for line in f if line.strip())
completed_tasks = len(completed_task_ids)
except IOError as e:
logger.error(f"Error reading .progress file for {experiment_folder_name}: {e}")
else:
logger.info(f".progress file not found for experiment: {experiment_folder_name}")
uncompleted_task_ids = list(sorted(list(all_task_ids - completed_task_ids)))
return {
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"uncompleted_task_ids": uncompleted_task_ids,
}