Spaces:
Running
Running
| import copy | |
| import json | |
| import os | |
| import shutil | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| import time | |
| import pandas as pd | |
| from cuga.backend.cuga_graph.nodes.api.code_agent.model import CodeAgentOutput | |
| from cuga.backend.tools_env.registry.utils.types import AppDefinition | |
| from cuga.backend.utils.id_utils import mask_with_timestamp, random_id_with_timestamp | |
| from cuga.config import TRAJECTORY_DATA_DIR, settings | |
| from langchain_core.tools import StructuredTool | |
| from loguru import logger | |
| from mcp.types import CallToolResult, TextContent | |
| from pydantic import BaseModel, Field | |
| AGENT_ANALYTICS = True | |
| try: | |
| from agent_analytics.instrumentation.utils import AIEventRecorder | |
| from agent_analytics_core.interfaces.annotations import DataAnnotation | |
| except Exception: | |
| AGENT_ANALYTICS = False | |
| logger.warning("Ignoring agent analytics") | |
| class MergeResult(BaseModel): | |
| folder_name: str | |
| merged_task_ids: List[str] | |
| class Prompt(BaseModel): | |
| role: str | |
| value: str | |
| class Step(BaseModel): | |
| name: Optional[str] = "" | |
| plan: Optional[str] = "" | |
| prompts: List[Prompt] = Field(default_factory=list) | |
| data: Optional[str] = "" | |
| task_decomposition: Optional[str] = "" | |
| current_url: Optional[str] = "" | |
| action_formatted: Optional[str] = "" | |
| action_type: Optional[str] = "" | |
| action_args: Optional[Any] = "" | |
| observation_before: Optional[str] = "" | |
| image_before: Optional[str] = "" | |
| class TasksMetadata(BaseModel): | |
| task_ids: List[str] | |
| description: Optional[str] = "" | |
| experiment_name: str | |
| experiment_folder: str | |
| created_at: str | |
| class ActivityTracker(object): | |
| _instance = None | |
| start_time: float = 0 | |
| user_id: str = "" | |
| intent: str = "" | |
| session_id: str = "" | |
| dataset_name: str = "" | |
| prompts: List[Prompt] = [] | |
| current_date: Optional[str] = None | |
| pi: Optional[str] = None | |
| eval: Any = None | |
| final_answer: Optional[str] = None | |
| task_id: str = "default" | |
| actions_count: int = 0 | |
| token_usage: int = 0 | |
| steps: List[Step] = [] | |
| images: List[str] = [] | |
| score: float = 0.0 | |
| tools: Dict[str, List[StructuredTool]] = {} | |
| apps: List[AppDefinition] = [] | |
| # Task management attributes | |
| tasks: Dict[str, Dict[str, Any]] = {} | |
| experiment_folder: Optional[str] = None | |
| tasks_metadata: Optional[TasksMetadata] = None | |
| if settings.advanced_features.enable_memory: | |
| from cuga.backend.memory.memory import Memory | |
| memory = Memory() | |
| # Base directory configuration | |
| _base_dir: str = TRAJECTORY_DATA_DIR | |
| def __new__(cls, *args, **kwargs): | |
| if not cls._instance: | |
| cls._instance = super(ActivityTracker, cls).__new__(cls) | |
| return cls._instance | |
| async def invoke_tool(self, server_name: str, tool_name: str, args: dict): | |
| if server_name not in self.tools: | |
| raise ValueError(f"Server '{server_name}' not found") | |
| # Find the tool by name | |
| for tool in self.tools[server_name]: | |
| if tool.name == tool_name: | |
| result = await tool.ainvoke(args) | |
| logger.debug(f"type of {type(result)}") | |
| # logger.debug(f"Tool output call {result.con}") | |
| # Check if result is JSON parseable | |
| if isinstance(result, CallToolResult): | |
| result = result.content[0] | |
| if isinstance(result, TextContent): | |
| result = result.text | |
| if isinstance(result, str): | |
| try: | |
| res = json.loads(result) | |
| logger.debug("json res worked!") | |
| return res | |
| except (json.JSONDecodeError, TypeError): | |
| logger.debug("no json tool output !!") | |
| # Not valid JSON, return original result | |
| return result | |
| else: | |
| logger.debug(f"answer is not str answer is of type {type(result)}") | |
| # Result is not a string, return as-is | |
| return result | |
| # Tool not found | |
| available_tools = [tool.name for tool in self.tools[server_name]] | |
| raise ValueError( | |
| f"Tool '{tool_name}' not found in server '{server_name}'. Available tools: {available_tools}" | |
| ) | |
| def invoke_tool_sync(self, server_name: str, tool_name: str, args: dict): | |
| """Synchronous version of invoke_tool to avoid async/sync context issues""" | |
| import asyncio | |
| import concurrent.futures | |
| if server_name not in self.tools: | |
| raise ValueError(f"Server '{server_name}' not found") | |
| # Find the tool by name | |
| for tool in self.tools[server_name]: | |
| if tool.name == tool_name: | |
| # Try synchronous invoke first | |
| try: | |
| result = tool.invoke(args) # Use synchronous invoke | |
| except RuntimeError as e: | |
| if "event loop is already running" in str(e): | |
| # We're in an async context, need to handle this differently | |
| try: | |
| # Check if we have a running loop | |
| asyncio.get_running_loop() | |
| # We're in an async context, create a new thread to run the async function | |
| def run_in_new_loop(): | |
| new_loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(new_loop) | |
| try: | |
| # Use async invoke in the new loop | |
| async def async_invoke(): | |
| return await tool.ainvoke(args) | |
| return new_loop.run_until_complete(async_invoke()) | |
| finally: | |
| new_loop.close() | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(run_in_new_loop) | |
| result = future.result() | |
| except RuntimeError: | |
| # No running loop, use asyncio.run | |
| import asyncio | |
| async def async_invoke(): | |
| return await tool.ainvoke(args) | |
| result = asyncio.run(async_invoke()) | |
| else: | |
| raise | |
| # logger.debug(f"type of {type(result)}") | |
| # logger.debug(f"Tool output call {result}") | |
| # Check if result is JSON parseable | |
| if isinstance(result, CallToolResult): | |
| result = result.content[0] | |
| if isinstance(result, TextContent): | |
| result = result.text | |
| if isinstance(result, str): | |
| try: | |
| res = json.loads(result) | |
| logger.debug("json res worked!") | |
| return res | |
| except (json.JSONDecodeError, TypeError): | |
| logger.debug("no json tool output !!") | |
| # Not valid JSON, return original result | |
| return result | |
| else: | |
| logger.debug(f"answer is not str answer is of type {type(result)}") | |
| # Result is not a string, return as-is | |
| return result | |
| # Tool not found | |
| available_tools = [tool.name for tool in self.tools[server_name]] | |
| raise ValueError( | |
| f"Tool '{tool_name}' not found in server '{server_name}'. Available tools: {available_tools}" | |
| ) | |
| def get_tools_by_server(self, server_name: str) -> Dict[str, Dict]: | |
| tools = self.tools | |
| if server_name not in tools: | |
| return {} | |
| server_tools = {} | |
| for tool in tools[server_name]: | |
| tool_config = { | |
| "app_name": server_name, | |
| "secure": False, | |
| "api_name": tool.name, | |
| "path": '', | |
| "method": '', | |
| "description": tool.description or '', | |
| "parameters": tool.args_schema.model_json_schema(), | |
| "response_schemas": 'Any', | |
| "canary_string": '', | |
| } | |
| server_tools[tool.name] = tool_config | |
| return server_tools | |
| def set_tools(self, tools: List[StructuredTool]): | |
| """ | |
| Detects application prefixes and assigns server_name to tool metadata. | |
| Returns list of AppDefinition objects for all detected applications. | |
| Optionally fills self.tools dictionary with server_name grouped tools. | |
| - For tools with metadata=None OR server_name=None: assigns detected app name or 'default' | |
| - For tools with existing server_name: leaves unchanged | |
| Args: | |
| tools (list): List of tool objects with .name and .metadata attributes | |
| self_tools (dict, optional): Dictionary to fill with server_name grouped tools | |
| Returns: | |
| List[AppDefinition]: List of app definitions with tools description | |
| """ | |
| self.tools = {} | |
| # logger.debug(f"tools: {tools}") | |
| # Common prefixes to exclude (HTTP methods, etc.) | |
| excluded_prefixes = {'get', 'post', 'put', 'delete', 'patch', 'head', 'options', 'trace'} | |
| # Step 1: Extract tool names for analysis (only for tools that need server_name assignment) | |
| tools_to_process = [ | |
| tool for tool in tools if tool.metadata is None or tool.metadata.get("server_name", None) is None | |
| ] | |
| tool_names = [tool.name for tool in tools_to_process] | |
| # Step 2: Find potential prefixes and count occurrences | |
| prefix_candidates = {} | |
| for tool_name in tool_names: | |
| # Split by underscore and take the first part as potential prefix | |
| if '_' in tool_name: | |
| potential_prefix = tool_name.split('_')[0].lower() | |
| # Skip if it's an excluded prefix | |
| if potential_prefix not in excluded_prefixes: | |
| if potential_prefix not in prefix_candidates: | |
| prefix_candidates[potential_prefix] = [] | |
| prefix_candidates[potential_prefix].append(tool_name) | |
| # Step 3: Filter prefixes that appear in multiple tools (consistency check) | |
| detected_applications = {} | |
| for prefix, tool_list in prefix_candidates.items(): | |
| if len(tool_list) > 1: # Prefix appears in multiple tools - consistent! | |
| detected_applications[prefix.upper()] = tool_list | |
| # Step 4: Assign server_name to metadata for tools that need it | |
| for tool in tools: | |
| # Only process tools with metadata=None OR server_name=None | |
| if tool.metadata is None or tool.metadata.get("server_name", None) is None: | |
| tool_name = tool.name | |
| server_name = 'default_app' # Default assignment | |
| # Check if this tool belongs to any detected application | |
| for app_name, app_tools in detected_applications.items(): | |
| if tool_name in app_tools: | |
| server_name = app_name | |
| break | |
| # Initialize metadata if it's None, otherwise just update server_name | |
| if tool.metadata is None: | |
| tool.metadata = {"server_name": server_name} | |
| else: | |
| tool.metadata["server_name"] = server_name | |
| # Step 5: Fill self.tools dictionary if provided | |
| for tool in tools: | |
| # Get server_name from tool metadata | |
| server_name = tool.metadata.get('server_name') | |
| # Skip tools without server_name metadata | |
| if server_name is None: | |
| raise Exception("Tool server name is none!") | |
| # Initialize list for this server if it doesn't exist | |
| if server_name not in self.tools: | |
| self.tools[server_name] = [] | |
| # Add tool to the appropriate server group | |
| self.tools[server_name].append(tool) | |
| # Step 6: Collect all unique server_names and their associated tools | |
| app_tools_map = {} | |
| for tool in tools: | |
| if tool.metadata is not None: | |
| server_name = tool.metadata.get("server_name") | |
| if server_name: | |
| if server_name not in app_tools_map: | |
| app_tools_map[server_name] = [] | |
| app_tools_map[server_name].append(tool) | |
| # Step 7: Create AppDefinition objects | |
| app_definitions = [] | |
| for app_name, tool_list in app_tools_map.items(): | |
| tools_description = "Available tools:\n" + "\n".join( | |
| f"{tool.name}: {tool.description}" if tool.description else f"{tool.name}:" | |
| for tool in sorted(tool_list, key=lambda x: x.name) | |
| ) | |
| app_def = AppDefinition(name=app_name, description=tools_description, url=None) | |
| app_definitions.append(app_def) | |
| self.apps = app_definitions | |
| def set_base_dir(self, base_dir: str) -> None: | |
| """ | |
| Set the base directory for logging trajectory data. | |
| Args: | |
| base_dir (str): The base directory path for storing experiment data | |
| """ | |
| self._base_dir = base_dir | |
| logger.info(f"Base directory set to: {self._base_dir}") | |
| def get_base_dir(self) -> str: | |
| """ | |
| Get the current base directory for logging trajectory data. | |
| Returns: | |
| str: The current base directory path | |
| """ | |
| return self._base_dir | |
| def get_current_trajectory_path(self) -> Optional[str]: | |
| """ | |
| Get the full path of the current experiment folder. | |
| Returns: | |
| Optional[str]: The full path of the experiment folder, or None if no experiment is active. | |
| """ | |
| if self.experiment_folder: | |
| return os.path.join(self._base_dir, self.experiment_folder, self.task_id + ".json") | |
| return "" | |
| def generate_session_id(self): | |
| self.session_id = random_id_with_timestamp(full_date=True) | |
| def reset(self, intent, task_id="default"): | |
| self.token_usage = 0 | |
| self.start_time = time.time() | |
| self.current_date = None | |
| self.pi = None | |
| self.prompts = [] | |
| self.steps = [] | |
| self.images = [] | |
| self.actions_count = 0 | |
| self.final_answer = None | |
| self.task_id = task_id | |
| self.intent = intent | |
| self.user_id = None | |
| def reload_steps(self, task_id: Optional[str] = None) -> bool: | |
| """ | |
| Reload steps from the current experiment's task JSON file. | |
| Args: | |
| task_id (str, optional): Task ID to reload. If None, uses current task_id. | |
| Returns: | |
| bool: True if steps were successfully reloaded, False otherwise. | |
| """ | |
| # Use provided task_id or fall back to current task_id | |
| target_task_id = task_id if task_id is not None else self.task_id | |
| if not target_task_id or target_task_id == "default": | |
| logger.error("No valid task_id provided for reloading steps") | |
| return False | |
| # Get the trajectory path for the specified task | |
| self.task_id = target_task_id | |
| trajectory_path = self.get_current_trajectory_path() | |
| if not trajectory_path: | |
| logger.error(f"No trajectory path found for task_id: {target_task_id}") | |
| return False | |
| if not os.path.exists(trajectory_path): | |
| logger.error(f"Trajectory file does not exist: {trajectory_path}") | |
| return False | |
| try: | |
| # Read the JSON file | |
| with open(trajectory_path, 'r', encoding='utf-8') as f: | |
| trajectory_data = json.load(f) | |
| # Extract steps from the JSON | |
| steps_data = trajectory_data.get('steps', []) | |
| # Convert dictionaries back to Step objects | |
| reloaded_steps = [] | |
| for step_dict in steps_data: | |
| try: | |
| step = Step(**step_dict) | |
| reloaded_steps.append(step) | |
| except Exception as e: | |
| logger.warning(f"Failed to convert step data to Step object: {e}") | |
| continue | |
| # Update current steps | |
| self.steps = reloaded_steps | |
| logger.info(f"Successfully reloaded {len(reloaded_steps)} steps for task_id: {target_task_id}") | |
| return True | |
| except (json.JSONDecodeError, IOError) as e: | |
| logger.error(f"Error reading trajectory file {trajectory_path}: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Unexpected error while reloading steps: {e}") | |
| return False | |
| def start_experiment( | |
| self, task_ids: List[str], experiment_name: str, description: Optional[str] = "" | |
| ) -> str: | |
| """ | |
| Start a new experiment with given task IDs. | |
| Args: | |
| task_ids (List[str]): List of task IDs for this experiment | |
| experiment_name (str): Name of the experiment | |
| description (str, optional): Description of the experiment | |
| Returns: | |
| str: The experiment folder name | |
| """ | |
| # Generate experiment folder name using mask_with_timestamp | |
| self.experiment_folder = mask_with_timestamp(experiment_name, full_date=True) | |
| # Create metadata | |
| self.tasks_metadata = TasksMetadata( | |
| task_ids=task_ids, | |
| description=description, | |
| experiment_name=experiment_name, | |
| experiment_folder=self.experiment_folder, | |
| created_at=datetime.now().isoformat(), | |
| ) | |
| # Only create files and directories if tracker is enabled | |
| if settings.advanced_features.tracker_enabled: | |
| # Create directory structure | |
| experiment_dir = os.path.join(self._base_dir, self.experiment_folder) | |
| os.makedirs(experiment_dir, exist_ok=True) | |
| # Save metadata to file | |
| metadata_path = os.path.join(experiment_dir, "metadata.json") | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.tasks_metadata.model_dump(), f, indent=2, ensure_ascii=False) | |
| # Initialize empty files | |
| self._initialize_experiment_files(experiment_dir) | |
| # Reset tasks dictionary | |
| self.tasks = {} | |
| if settings.advanced_features.enable_memory: | |
| from cuga.backend.memory.agentic_memory.client.exceptions import NamespaceNotFoundException | |
| try: | |
| self.memory.get_namespace_details(namespace_id="memory") | |
| except NamespaceNotFoundException: | |
| self.memory.create_namespace(namespace_id="memory") | |
| self.memory.create_run(namespace_id="memory", run_id=self.experiment_folder) | |
| # Start timer | |
| self.start_time = time.time() | |
| return self.experiment_folder | |
| def _initialize_experiment_files(self, experiment_dir: str) -> None: | |
| """Initialize empty result files for the experiment.""" | |
| # Define column order for CSV | |
| columns = [ | |
| 'task_id', | |
| 'site', | |
| 'intent', | |
| 'agent_answer', | |
| 'eval', | |
| 'score', | |
| 'exception', | |
| 'num_steps', | |
| 'fail_category', | |
| 'agent_v', | |
| ] | |
| # Create empty results.csv | |
| results_csv_path = os.path.join(experiment_dir, "results.csv") | |
| df = pd.DataFrame(columns=columns) | |
| df.to_csv(results_csv_path, index=False, encoding='utf-8') | |
| # Create empty results.json | |
| results_json_path = os.path.join(experiment_dir, "results.json") | |
| with open(results_json_path, 'w', encoding='utf-8') as f: | |
| json.dump({}, f, indent=2, ensure_ascii=False) | |
| # Create empty .progress file | |
| progress_path = os.path.join(experiment_dir, ".progress") | |
| with open(progress_path, 'w', encoding='utf-8') as f: | |
| f.write("") | |
| def collect_prompt(self, role: str, value: str): | |
| self.prompts.append(Prompt(role=role, value=value)) | |
| def collect_tokens_usage(self, count: int) -> None: | |
| """ | |
| Increases the number of tokens used. | |
| Args: | |
| count (int): The number of times the token is used. | |
| """ | |
| self.token_usage += count | |
| def collect_image(self, img: str) -> None: | |
| if not img: | |
| return | |
| # Ensure the image string is compatible with OpenAI vision API: must be a valid URL or a data URL. | |
| if img.startswith("data:image") or img.startswith("http://") or img.startswith("https://"): | |
| self.images.append(img) | |
| else: | |
| # Assume raw base64 PNG data; prepend appropriate data URL header. | |
| self.images.append(f"data:image/png;base64,{img}") | |
| def collect_step(self, step: Step) -> None: | |
| """ | |
| Collects a step, adding it to the steps list. | |
| Args: | |
| step (Step): The description of the step to collect. | |
| """ | |
| data_json = None | |
| try: | |
| data_json = json.loads(step.data) | |
| except Exception: | |
| pass | |
| # Attach any collected prompts to this step so they are persisted | |
| if getattr(self, "prompts", None): | |
| try: | |
| step.prompts = list(self.prompts) | |
| except Exception: | |
| # Ensure prompts never break logging | |
| step.prompts = [] | |
| # Attach the most recent captured image (if any) to the step | |
| if getattr(self, "images", None): | |
| try: | |
| # Use the last captured screenshot as the "before" image | |
| step.image_before = self.images[-1] | |
| except Exception: | |
| step.image_before = None | |
| if AGENT_ANALYTICS: | |
| if step.name == "TaskAnalyzerAgent": | |
| AIEventRecorder.record_data_annotation( | |
| name=step.name, | |
| annotation_type=DataAnnotation.Type.RAW_TEXT, | |
| annotation_title="Intent", | |
| annotation_content=self.intent, | |
| ) | |
| if step.name == "CodeAgent": | |
| res_obj = CodeAgentOutput(**json.loads(step.data)) | |
| AIEventRecorder.record_data_annotation( | |
| name="CodeAgent", | |
| annotation_type=DataAnnotation.Type.CODE_GENERATION, | |
| annotation_title="Generated Code", | |
| annotation_content="\n" + res_obj.code, | |
| ) | |
| AIEventRecorder.record_data_annotation( | |
| name="CodeAgent", | |
| annotation_type=DataAnnotation.Type.CODE_SNIPPET, | |
| annotation_title="Code output", | |
| annotation_content="\n" + res_obj.execution_output, | |
| ) | |
| AIEventRecorder.record_data_annotation( | |
| name="CodeAgent", | |
| annotation_type=DataAnnotation.Type.RAW_TEXT, | |
| annotation_title="Output summary", | |
| annotation_content="\n" + res_obj.summary, | |
| ) | |
| else: | |
| if data_json and isinstance(data_json, dict): | |
| if data_json.get('thoughts', None): | |
| AIEventRecorder.record_data_annotation( | |
| name=step.name, | |
| annotation_type=DataAnnotation.Type.THOUGHT, | |
| annotation_title=step.name, | |
| annotation_content=f"{data_json.get('thoughts', None)}", | |
| ) | |
| if len(list(data_json.keys())) == 1 and isinstance( | |
| data_json[list(data_json.keys())[0]], str | |
| ): | |
| AIEventRecorder.record_data_annotation( | |
| name=step.name, | |
| annotation_type=DataAnnotation.Type.RAW_TEXT, | |
| annotation_title=step.name, | |
| annotation_content=f"\n\n{data_json[list(data_json.keys())[0]]}", | |
| ) | |
| else: | |
| AIEventRecorder.record_data_annotation( | |
| name=step.name, | |
| annotation_type=DataAnnotation.Type.RAW_TEXT, | |
| annotation_title=step.name, | |
| annotation_content=json.dumps(data_json), | |
| ) | |
| else: | |
| AIEventRecorder.record_data_annotation( | |
| name=step.name, | |
| annotation_type=DataAnnotation.Type.RAW_TEXT, | |
| annotation_title=step.name, | |
| annotation_content=f"{step.data}", | |
| ) | |
| if step.image_before: | |
| AIEventRecorder.record_data_annotation( | |
| name=step.name, | |
| annotation_type=DataAnnotation.Type.MULTIMODAL_DATA, | |
| annotation_title="Image", | |
| annotation_content=f"{step.image_before}", | |
| ) | |
| if settings.advanced_features.enable_memory: | |
| from cuga.backend.memory.agentic_memory.utils.prompts import prompts | |
| # Include intent in step metadata so it's available during tip extraction | |
| step_data = step.model_dump() | |
| step_data['intent'] = self.intent # Add the user's task intent | |
| self.memory.add_step( | |
| namespace_id='memory', | |
| run_id=self.experiment_folder, | |
| step=step_data, | |
| prompt=prompts[step.name], | |
| ) | |
| step.prompts = copy.deepcopy(self.prompts) | |
| self.prompts = [] | |
| self.steps.append(step) | |
| if settings.advanced_features.enable_memory and step.name == "FinalAnswerAgent": | |
| # End run and execute any background processing. | |
| self.memory.end_run(namespace_id="memory", run_id=self.experiment_folder) | |
| if settings.advanced_features.tracker_enabled: | |
| self.to_file() | |
| self.prompts = [] | |
| def collect_step_external(self, step: Step, full_path: Optional[str] = None) -> None: | |
| """ | |
| Collects a step and saves it to a separate log file in a directory | |
| specified by an environment variable. | |
| The path is retrieved from os.environ['current_folder_path']. | |
| The steps are saved to a file named 'recordinglg.json' in that directory. | |
| Args: | |
| step (Step): The Step object to collect. | |
| full_path (Optional[str]): The full file path to save to. If None, the step is skipped. | |
| TODO: Properly handle None full_path case - either provide a default path or make the | |
| calling code always provide a valid path. Currently returns early if None to avoid errors. | |
| """ | |
| try: | |
| if not settings.advanced_features.tracker_enabled: | |
| return | |
| # TODO: Handle None full_path properly - either use a default path or require callers to provide one | |
| if not full_path: | |
| logger.debug("Skipping external step collection: full_path is None") | |
| return | |
| if not os.path.exists(os.path.dirname(full_path)): | |
| logger.error( | |
| f"External path directory not found or does not exist: {os.path.dirname(full_path)}" | |
| ) | |
| return | |
| step.prompts = copy.deepcopy(self.prompts) | |
| self.prompts = [] | |
| self.steps.append(step) | |
| self._to_file_external_append(full_path, step) | |
| logger.info(f"Step appended to external file: {full_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to collect and save external step: {e}") | |
| def _to_file_external_append(self, full_path: str, new_step: Step): | |
| """ | |
| Append a new step to an existing JSON file or create a new file if it doesn't exist. | |
| This method reads the existing file, appends the new step, and saves it back. | |
| Args: | |
| full_path (str): The full file path to save/append to. | |
| new_step (Step): The new step to append. | |
| """ | |
| try: | |
| # Check if file exists and read existing data | |
| if os.path.exists(full_path): | |
| with open(full_path, 'r', encoding='utf-8') as f: | |
| try: | |
| existing_data = json.load(f) | |
| # Ensure the existing data has the expected structure | |
| if not isinstance(existing_data, dict) or 'steps' not in existing_data: | |
| logger.warning(f"Invalid JSON structure in {full_path}, creating new file") | |
| existing_data = None | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Invalid JSON in {full_path}, creating new file: {e}") | |
| existing_data = None | |
| else: | |
| existing_data = None | |
| # If no valid existing data, create new structure | |
| if existing_data is None: | |
| data_to_save = { | |
| "intent": self.intent, | |
| "dataset_name": self.dataset_name, | |
| "actions_count": self.actions_count, | |
| "task_id": self.task_id, | |
| "eval": self.eval, | |
| "steps": [new_step.model_dump()], | |
| "score": self.score, | |
| } | |
| else: | |
| # Update existing data with new step | |
| existing_data["steps"].append(new_step.model_dump()) | |
| # Update other fields that might have changed | |
| existing_data.update( | |
| { | |
| "intent": self.intent, | |
| "dataset_name": self.dataset_name, | |
| "actions_count": self.actions_count, | |
| "task_id": self.task_id, | |
| "eval": self.eval, | |
| "score": self.score, | |
| } | |
| ) | |
| data_to_save = existing_data | |
| # Write the updated data back to file | |
| with open(full_path, 'w', encoding='utf-8') as f: | |
| json.dump( | |
| data_to_save, | |
| f, | |
| ensure_ascii=False, | |
| indent=4, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to append step to file {full_path}: {e}") | |
| raise | |
| def collect_score(self, score: float) -> None: | |
| """ | |
| Collects a step, adding it to the steps list. | |
| Args: | |
| score (str): The description of the step to collect. | |
| """ | |
| self.score = score | |
| if settings.advanced_features.tracker_enabled: | |
| self.to_file() | |
| def collect_step_with_pass(self) -> None: | |
| """ | |
| Placeholder for collecting a step. | |
| """ | |
| pass | |
| def to_file(self): | |
| """Save current task data to file in the experiment directory.""" | |
| if self.experiment_folder: | |
| # Save to experiment directory | |
| source_dir = os.path.join(self._base_dir, self.experiment_folder) | |
| else: | |
| # Fallback to original behavior | |
| source_dir = "logging{}".format("_" + self.dataset_name if self.dataset_name else "") | |
| os.makedirs(source_dir, exist_ok=True) | |
| filename = self.task_id if self.task_id != "default" else self.session_id | |
| filepath = os.path.join(source_dir, f"{filename}.json") | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump( | |
| { | |
| "intent": self.intent, | |
| "dataset_name": self.dataset_name, | |
| "actions_count": self.actions_count, | |
| "task_id": self.task_id, | |
| "eval": self.eval, | |
| "steps": [d.model_dump() for d in self.steps], | |
| "score": self.score, | |
| }, | |
| f, | |
| ensure_ascii=False, | |
| indent=4, | |
| ) | |
| def finish_task( | |
| self, | |
| task_id: str, | |
| site: str, | |
| intent: str, | |
| agent_answer: Optional[str] = None, | |
| eval: Optional[str] = None, | |
| score: Optional[float] = None, | |
| exception: Optional[bool] = None, | |
| num_steps: Optional[int] = None, | |
| fail_category: Optional[str] = None, | |
| agent_v: Optional[str] = None, | |
| duration: Optional[int] = None, | |
| total_llm_calls: Optional[int] = None, | |
| total_tokens: Optional[int] = None, | |
| total_cost: Optional[float] = None, | |
| total_cache_input_tokens: Optional[int] = None, | |
| ) -> str: | |
| """ | |
| Mark a task as finished and update result files. | |
| Args: | |
| task_id (str): Required unique identifier for the task | |
| site (str): Required site name | |
| intent (str): Task intent/description | |
| agent_answer (str, optional): Agent's answer | |
| eval (str, optional): Evaluation details | |
| score (float, optional): Task score | |
| exception (bool, optional): Whether an exception occurred | |
| num_steps (int, optional): Number of steps taken | |
| fail_category (str, optional): Category of failure if applicable | |
| agent_v (str, optional): Agent version | |
| Returns: | |
| str: The ID of the finished task | |
| """ | |
| if not self.experiment_folder: | |
| raise ValueError("No experiment started. Call start_experiment() first.") | |
| # Calculate number of api calls | |
| api_calls_num = len([step for step in self.steps if "api_call" in step.name]) | |
| # Add task to internal storage | |
| self.tasks[task_id] = { | |
| "site": site, | |
| "intent": intent, | |
| "agent_answer": agent_answer, | |
| "eval": eval, | |
| "score": score, | |
| "exception": exception, | |
| "num_steps": num_steps if num_steps is not None else len(self.steps), | |
| "fail_category": fail_category, | |
| "agent_v": agent_v, | |
| "duration": duration if duration is not None else time.time() - self.start_time, | |
| "total_llm_calls": total_llm_calls, | |
| "total_tokens": self.token_usage if not total_tokens else total_tokens, | |
| "api_calls": api_calls_num, | |
| "total_cost": total_cost, | |
| "total_cache_input_tokens": total_cache_input_tokens, | |
| } | |
| # Update result files only if tracker is enabled | |
| if settings.advanced_features.tracker_enabled: | |
| self._update_result_files() | |
| self._add_to_progress_file(task_id) | |
| return task_id | |
| def _update_result_files(self) -> None: | |
| """Update both JSON and CSV result files.""" | |
| if not self.experiment_folder: | |
| return | |
| experiment_dir = os.path.join(self._base_dir, self.experiment_folder) | |
| # Update results.json | |
| results_json_path = os.path.join(experiment_dir, "results.json") | |
| with open(results_json_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.tasks, f, indent=2, ensure_ascii=False) | |
| # Update results.csv | |
| self._save_csv(experiment_dir) | |
| def _save_csv(self, experiment_dir: str) -> None: | |
| """Save current tasks to CSV file using pandas.""" | |
| # Define the column order | |
| columns = [ | |
| 'task_id', | |
| 'site', | |
| 'intent', | |
| 'agent_answer', | |
| 'eval', | |
| 'score', | |
| 'exception', | |
| 'num_steps', | |
| 'fail_category', | |
| 'agent_v', | |
| 'duration', | |
| 'total_llm_calls', | |
| 'total_tokens', | |
| 'api_calls', | |
| 'total_cost', | |
| 'total_cache_input_tokens', | |
| ] | |
| if not self.tasks: | |
| # Create empty DataFrame with headers if no tasks | |
| df = pd.DataFrame(columns=columns) | |
| else: | |
| # Convert tasks dictionary to list of dictionaries for DataFrame | |
| data = [] | |
| for task_id, task_data in self.tasks.items(): | |
| row = {'task_id': task_id} | |
| row.update(task_data) | |
| data.append(row) | |
| # Create DataFrame | |
| df = pd.DataFrame(data) | |
| # Reorder columns to match the desired order | |
| df = df.reindex(columns=columns) | |
| # Save to CSV | |
| results_csv_path = os.path.join(experiment_dir, "results.csv") | |
| df.to_csv(results_csv_path, index=False, encoding='utf-8') | |
| def _add_to_progress_file(self, task_id: str) -> None: | |
| """Add a task ID to the .progress file.""" | |
| if not self.experiment_folder: | |
| return | |
| progress_path = os.path.join(self._base_dir, self.experiment_folder, ".progress") | |
| with open(progress_path, 'a', encoding='utf-8') as f: | |
| f.write(task_id + '\n') | |
| def update_task( | |
| self, | |
| task_id: str, | |
| site: Optional[str] = None, | |
| intent: Optional[str] = None, | |
| agent_answer: Optional[str] = None, | |
| eval: Optional[str] = None, | |
| score: Optional[float] = None, | |
| exception: Optional[bool] = None, | |
| num_steps: Optional[int] = None, | |
| fail_category: Optional[str] = None, | |
| agent_v: Optional[str] = None, | |
| ) -> bool: | |
| """ | |
| Update an existing task. | |
| Args: | |
| task_id (str): ID of the task to update | |
| site (str, optional): New site | |
| intent (str, optional): New intent | |
| agent_answer (str, optional): New agent answer | |
| eval (str, optional): New evaluation | |
| score (float, optional): New score | |
| exception (bool, optional): New exception status | |
| num_steps (int, optional): New number of steps | |
| fail_category (str, optional): New fail category | |
| agent_v (str, optional): New agent version | |
| Returns: | |
| bool: True if task was updated, False if task not found | |
| """ | |
| if task_id not in self.tasks: | |
| return False | |
| # Update only provided fields | |
| if site is not None: | |
| self.tasks[task_id]["site"] = site | |
| if intent is not None: | |
| self.tasks[task_id]["intent"] = intent | |
| if agent_answer is not None: | |
| self.tasks[task_id]["agent_answer"] = agent_answer | |
| if eval is not None: | |
| self.tasks[task_id]["eval"] = eval | |
| if score is not None: | |
| self.tasks[task_id]["score"] = score | |
| if exception is not None: | |
| self.tasks[task_id]["exception"] = exception | |
| if num_steps is not None: | |
| self.tasks[task_id]["num_steps"] = num_steps | |
| if fail_category is not None: | |
| self.tasks[task_id]["fail_category"] = fail_category | |
| if agent_v is not None: | |
| self.tasks[task_id]["agent_v"] = agent_v | |
| if settings.advanced_features.tracker_enabled: | |
| self._update_result_files() | |
| return True | |
| def remove_task(self, task_id: str) -> bool: | |
| """ | |
| Remove a task from the results. | |
| Args: | |
| task_id (str): ID of the task to remove | |
| Returns: | |
| bool: True if task was removed, False if task not found | |
| """ | |
| if task_id in self.tasks: | |
| del self.tasks[task_id] | |
| if settings.advanced_features.tracker_enabled: | |
| self._update_result_files() | |
| return True | |
| return False | |
| def get_task(self, task_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get a specific task by ID. | |
| Args: | |
| task_id (str): ID of the task to retrieve | |
| Returns: | |
| Dict containing task data or None if not found | |
| """ | |
| return self.tasks.get(task_id) | |
| def get_all_tasks(self) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Get all tasks. | |
| Returns: | |
| Dict containing all tasks | |
| """ | |
| return self.tasks.copy() | |
| def find_tasks_by_score(self, score: float) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Find all tasks with a specific score. | |
| Args: | |
| score (float): Score to search for | |
| Returns: | |
| Dict containing matching tasks | |
| """ | |
| return {task_id: task for task_id, task in self.tasks.items() if task.get("score") == score} | |
| def find_tasks_by_site(self, site: str) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Find all tasks with a specific site. | |
| Args: | |
| site (str): Site to search for | |
| Returns: | |
| Dict containing matching tasks | |
| """ | |
| return {task_id: task for task_id, task in self.tasks.items() if task.get("site") == site} | |
| def find_tasks_by_exception(self, exception: bool) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Find all tasks with specific exception status. | |
| Args: | |
| exception (bool): Exception status to search for | |
| Returns: | |
| Dict containing matching tasks | |
| """ | |
| return {task_id: task for task_id, task in self.tasks.items() if task.get("exception") == exception} | |
| def find_tasks_by_agent_version(self, agent_v: str) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Find all tasks with a specific agent version. | |
| Args: | |
| agent_v (str): Agent version to search for | |
| Returns: | |
| Dict containing matching tasks | |
| """ | |
| return {task_id: task for task_id, task in self.tasks.items() if task.get("agent_v") == agent_v} | |
| def clear_all_tasks(self) -> None: | |
| """Remove all tasks from result files.""" | |
| self.tasks = {} | |
| if self.experiment_folder and settings.advanced_features.tracker_enabled: | |
| self._update_result_files() | |
| # Clear progress file | |
| progress_path = os.path.join(self._base_dir, self.experiment_folder, ".progress") | |
| with open(progress_path, 'w', encoding='utf-8') as f: | |
| f.truncate(0) | |
| def get_task_count(self) -> int: | |
| """ | |
| Get the total number of tasks. | |
| Returns: | |
| int: Number of tasks | |
| """ | |
| return len(self.tasks) | |
| def get_statistics(self) -> Dict[str, Any]: | |
| """ | |
| Get basic statistics about the tasks. | |
| Returns: | |
| Dict containing task statistics | |
| """ | |
| if not self.tasks: | |
| return {"total_tasks": 0} | |
| stats = { | |
| "total_tasks": len(self.tasks), | |
| "tasks_with_exceptions": len([t for t in self.tasks.values() if t.get("exception") is True]), | |
| "tasks_without_exceptions": len([t for t in self.tasks.values() if t.get("exception") is False]), | |
| "unique_sites": len(set(t.get("site") for t in self.tasks.values() if t.get("site"))), | |
| "unique_agent_versions": len( | |
| set(t.get("agent_v") for t in self.tasks.values() if t.get("agent_v")) | |
| ), | |
| } | |
| # Score statistics | |
| scores = [t.get("score") for t in self.tasks.values() if t.get("score") is not None] | |
| if scores: | |
| stats["average_score"] = sum(scores) / len(scores) | |
| stats["min_score"] = min(scores) | |
| stats["max_score"] = max(scores) | |
| return stats | |
| def get_dataframe(self) -> pd.DataFrame: | |
| """ | |
| Get all tasks as a pandas DataFrame. | |
| Returns: | |
| pd.DataFrame: DataFrame containing all tasks | |
| """ | |
| columns = [ | |
| 'task_id', | |
| 'site', | |
| 'intent', | |
| 'agent_answer', | |
| 'eval', | |
| 'score', | |
| 'exception', | |
| 'num_steps', | |
| 'fail_category', | |
| 'agent_v', | |
| ] | |
| if not self.tasks: | |
| return pd.DataFrame(columns=columns) | |
| data = [] | |
| for task_id, task_data in self.tasks.items(): | |
| row = {'task_id': task_id} | |
| row.update(task_data) | |
| data.append(row) | |
| df = pd.DataFrame(data) | |
| return df.reindex(columns=columns) | |
| def _copy_task_json_files( | |
| self, | |
| source_folders: List[str], | |
| target_folder: str, | |
| selected_task_ids: List[str], | |
| base_dir: str = None, | |
| ) -> None: | |
| """ | |
| Copy individual task JSON files from source folders to target folder. | |
| Args: | |
| source_folders (List[str]): List of source experiment folder names | |
| target_folder (str): Target experiment folder name | |
| selected_task_ids (List[str]): List of task IDs to copy | |
| base_dir (str, optional): Base directory. If None, uses instance base_dir | |
| """ | |
| if base_dir is None: | |
| base_dir = self._base_dir | |
| target_dir = os.path.join(base_dir, target_folder) | |
| copied_files = 0 | |
| skipped_files = 0 | |
| for task_id in selected_task_ids: | |
| file_found = False | |
| # Look for the task JSON file in each source folder | |
| for folder_name in source_folders: | |
| source_dir = os.path.join(base_dir, folder_name) | |
| source_file = os.path.join(source_dir, f"{task_id}.json") | |
| if os.path.exists(source_file): | |
| target_file = os.path.join(target_dir, f"{task_id}.json") | |
| try: | |
| # Copy the file | |
| shutil.copy2(source_file, target_file) | |
| logger.debug(f"Copied {task_id}.json from {folder_name}") | |
| copied_files += 1 | |
| file_found = True | |
| break # Found and copied, move to next task | |
| except Exception as e: | |
| logger.error(f"Failed to copy {task_id}.json from {folder_name}: {e}") | |
| if not file_found: | |
| logger.warning(f"Task JSON file {task_id}.json not found in any source folder") | |
| skipped_files += 1 | |
| logger.info(f"Task JSON files - Copied: {copied_files}, Skipped: {skipped_files}") | |
| def merge_experiments( | |
| self, | |
| experiment_folders: List[str], | |
| output_experiment_name: str, | |
| description: Optional[str] = "Merged experiments", | |
| output_folder: Optional[str] = None, | |
| ) -> MergeResult: | |
| """ | |
| Merge multiple experiment folders, preferring tasks with score 1.0 over 0.0. | |
| Also copies individual task JSON files from source experiments. | |
| Args: | |
| experiment_folders (List[str]): List of experiment folder names to merge | |
| output_experiment_name (str): Name for the merged experiment | |
| description (str, optional): Description for the merged experiment | |
| Returns: | |
| MergeResult: Contains folder_name and merged_task_ids | |
| """ | |
| logger.info(f"Starting merge of {len(experiment_folders)} experiments") | |
| # Create new experiment for merged results | |
| merged_folder = self.start_experiment( | |
| task_ids=[], # Will be populated with merged task IDs | |
| experiment_name=output_experiment_name, | |
| description=description, | |
| ) | |
| merged_tasks = {} | |
| all_task_ids = set() | |
| task_source_mapping = {} # Track which folder each task came from | |
| # First pass: collect all tasks and identify duplicates | |
| for folder_name in experiment_folders: | |
| folder_path = os.path.join(self._base_dir, folder_name) | |
| results_json_path = os.path.join(folder_path, "results.json") | |
| if not os.path.exists(results_json_path): | |
| logger.warning(f"Results file not found in {folder_name}, skipping") | |
| continue | |
| try: | |
| with open(results_json_path, 'r', encoding='utf-8') as f: | |
| folder_tasks = json.load(f) | |
| logger.info(f"Processing {len(folder_tasks)} tasks from {folder_name}") | |
| for task_id, task_data in folder_tasks.items(): | |
| all_task_ids.add(task_id) | |
| if task_id not in merged_tasks: | |
| # First occurrence of this task | |
| merged_tasks[task_id] = {**task_data, 'source_experiment': folder_name} | |
| task_source_mapping[task_id] = folder_name | |
| logger.debug(f"Added new task {task_id} from {folder_name}") | |
| else: | |
| # Task already exists, apply preference logic | |
| existing_score = merged_tasks[task_id].get('score', 0.0) | |
| new_score = task_data.get('score', 0.0) | |
| if existing_score == 1.0 and new_score != 1.0: | |
| # Keep existing (perfect score) | |
| should_replace = False | |
| elif existing_score != 1.0 and new_score == 1.0: | |
| # Replace with perfect score | |
| should_replace = True | |
| elif existing_score == new_score: | |
| # Same score, keep existing (first found) | |
| should_replace = False | |
| else: | |
| # Different scores, prefer higher | |
| should_replace = new_score > existing_score | |
| if should_replace: | |
| merged_tasks[task_id] = {**task_data, 'source_experiment': folder_name} | |
| task_source_mapping[task_id] = folder_name | |
| logger.debug( | |
| f"Replaced task {task_id}: {existing_score} -> {new_score} from {folder_name}" | |
| ) | |
| else: | |
| logger.debug( | |
| f"Kept existing task {task_id}: score {existing_score} vs {new_score}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing {folder_name}: {e}") | |
| continue | |
| # Update the merged experiment with final task list | |
| self.tasks = merged_tasks | |
| # Update metadata with actual task IDs | |
| if self.tasks_metadata: | |
| self.tasks_metadata.task_ids = list(all_task_ids) | |
| # Save updated metadata | |
| experiment_dir = os.path.join(self._base_dir, merged_folder) | |
| metadata_path = os.path.join(experiment_dir, "metadata.json") | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.tasks_metadata.model_dump(), f, indent=2, ensure_ascii=False) | |
| # Update result files with merged data only if tracker is enabled | |
| if settings.advanced_features.tracker_enabled: | |
| self._update_result_files() | |
| # Update progress file with all task IDs | |
| for task_id in merged_tasks.keys(): | |
| self._add_to_progress_file(task_id) | |
| # Copy individual task JSON files only if tracker is enabled | |
| if settings.advanced_features.tracker_enabled: | |
| logger.info("Copying individual task JSON files...") | |
| selected_task_ids = list(merged_tasks.keys()) | |
| self._copy_task_json_files(experiment_folders, merged_folder, selected_task_ids) | |
| logger.success(f"Successfully merged {len(merged_tasks)} tasks into {merged_folder}") | |
| logger.info(f"Source experiments: {experiment_folders}") | |
| score_distribution = {} | |
| source_distribution = {} | |
| for task_data in merged_tasks.values(): | |
| score = task_data.get('score', 0.0) | |
| source = task_data.get('source_experiment', 'unknown') | |
| score_distribution[score] = score_distribution.get(score, 0) + 1 | |
| source_distribution[source] = source_distribution.get(source, 0) + 1 | |
| logger.info(f"Score distribution in merged results: {score_distribution}") | |
| logger.info(f"Source distribution in merged results: {source_distribution}") | |
| # Return MergeResult | |
| return MergeResult(folder_name=merged_folder, merged_task_ids=list(merged_tasks.keys())) | |
| def list_experiment_folders(self, base_path: Optional[str] = None) -> List[str]: | |
| """ | |
| List all available experiment folders. | |
| Args: | |
| base_path (str, optional): Base directory to search for experiments. | |
| If None, uses instance base_dir | |
| Returns: | |
| List[str]: List of experiment folder names | |
| """ | |
| if base_path is None: | |
| base_path = self._base_dir | |
| if not os.path.exists(base_path): | |
| logger.warning(f"Base path {base_path} does not exist") | |
| return [] | |
| folders = [] | |
| for item in os.listdir(base_path): | |
| item_path = os.path.join(base_path, item) | |
| if os.path.isdir(item_path): | |
| # Check if it looks like an experiment folder (has metadata.json) | |
| metadata_path = os.path.join(item_path, "metadata.json") | |
| if os.path.exists(metadata_path): | |
| folders.append(item) | |
| logger.info(f"Found {len(folders)} experiment folders") | |
| return sorted(folders) | |
| def list_experiment_folders_static(base_path: str = "./logging/trajectory_data") -> List[str]: | |
| """ | |
| Static method to list all available experiment folders. | |
| Args: | |
| base_path (str): Base directory to search for experiments | |
| Returns: | |
| List[str]: List of experiment folder names | |
| """ | |
| if not os.path.exists(base_path): | |
| logger.warning(f"Base path {base_path} does not exist") | |
| return [] | |
| folders = [] | |
| for item in os.listdir(base_path): | |
| item_path = os.path.join(base_path, item) | |
| if os.path.isdir(item_path): | |
| # Check if it looks like an experiment folder (has metadata.json) | |
| metadata_path = os.path.join(item_path, "metadata.json") | |
| if os.path.exists(metadata_path): | |
| folders.append(item) | |
| logger.info(f"Found {len(folders)} experiment folders") | |
| return sorted(folders) | |
| def get_experiment_progress(self, experiment_folder_name: str) -> Dict[str, Any]: | |
| """ | |
| Get the progress of a specific experiment. | |
| Args: | |
| experiment_folder_name (str): The name of the experiment folder. | |
| Returns: | |
| Dict[str, Any]: A dictionary containing 'total_tasks', 'completed_tasks', and 'uncompleted_task_ids'. | |
| Returns default values if files are not found or errors occur. | |
| """ | |
| experiment_dir = os.path.join(self._base_dir, experiment_folder_name) | |
| metadata_path = os.path.join(experiment_dir, "metadata.json") | |
| progress_path = os.path.join(experiment_dir, ".progress") | |
| total_tasks = 0 | |
| completed_tasks = 0 | |
| all_task_ids = set() | |
| completed_task_ids = set() | |
| # Read total tasks from metadata.json | |
| if os.path.exists(metadata_path): | |
| try: | |
| with open(metadata_path, 'r', encoding='utf-8') as f: | |
| metadata = json.load(f) | |
| all_task_ids = set(metadata.get('task_ids', [])) | |
| total_tasks = len(all_task_ids) | |
| except (json.JSONDecodeError, IOError) as e: | |
| logger.error(f"Error reading metadata.json for {experiment_folder_name}: {e}") | |
| else: | |
| logger.warning(f"metadata.json not found for experiment: {experiment_folder_name}") | |
| # Read completed tasks from .progress | |
| if os.path.exists(progress_path): | |
| try: | |
| with open(progress_path, 'r', encoding='utf-8') as f: | |
| completed_task_ids = set(line.strip() for line in f if line.strip()) | |
| completed_tasks = len(completed_task_ids) | |
| except IOError as e: | |
| logger.error(f"Error reading .progress file for {experiment_folder_name}: {e}") | |
| else: | |
| logger.info(f".progress file not found for experiment: {experiment_folder_name}") | |
| uncompleted_task_ids = list(sorted(list(all_task_ids - completed_task_ids))) | |
| return { | |
| "total_tasks": total_tasks, | |
| "completed_tasks": completed_tasks, | |
| "uncompleted_task_ids": uncompleted_task_ids, | |
| } | |