Spaces:
Running
Running
File size: 11,573 Bytes
0646b18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
from cuga.backend.activity_tracker.tracker import ActivityTracker
from cuga.backend.cuga_graph.utils.controller import AgentRunner, ExperimentResult
from cuga.evaluation.langfuse.get_langfuse_data import LangfuseTraceHandler
from loguru import logger
import traceback
from pydantic import BaseModel
from typing import List, Dict, Iterable, Any, Optional
import json
import csv
from calculate_test_score import evaluate_test_and_details, TestScore, TestScoreDetails, ToolCall
from statistics import mean
from pathlib import Path
import os
tracker = ActivityTracker()
class ExpectedOutput(BaseModel):
"""
The expected output a test case
"""
response: str
keywords: List[str]
tool_calls: List[ToolCall]
class TestCase(BaseModel):
"""
This is the model for your test cases, i.e. the input you give the evaluation loop
"""
app: str
name: str
description: str
intent: str
expected_output: ExpectedOutput
class TestResult(BaseModel):
"""
The evaluation loop output of a run on a single test case
"""
app: str
index: int
test_name: str
score: TestScore
details: TestScoreDetails
def dict_subset_with_reason(sup: Dict, sub: Dict, path="") -> List[str]:
"""Return list of reasons why sub is not a subset of sup."""
reasons = []
for k, v in sub.items():
if k not in sup:
reasons.append(f"Missing key '{path + k}'")
else:
sv = sup[k]
if isinstance(v, dict) and isinstance(sv, dict):
reasons.extend(dict_subset_with_reason(sv, v, path + k + "."))
elif sv != v:
reasons.append(f"Value mismatch at '{path + k}': expected {v}, got {sv}")
return reasons
def compare_toolcalls(a_list: Iterable[ToolCall], b_list: Iterable[ToolCall]) -> List[str]:
all_reasons = []
for a in a_list:
matched = False
for b in b_list:
if b.name in a.name:
reasons = dict_subset_with_reason(a.args, b.args)
if not reasons: # perfect match
matched = True
break
if not matched:
if not any(b.name in a.name for b in b_list):
all_reasons.append(f"No ToolCall in B has name substring matching '{a.name}'")
else:
all_reasons.append(f"Args mismatch for ToolCall '{a.name}'")
for b in b_list:
if b.name in a.name:
mismatch = dict_subset_with_reason(a.args, b.args)
if mismatch:
all_reasons.extend([f" vs B({b.name}): {r}" for r in mismatch])
return all_reasons
def parse_test_cases(json_file_path: str) -> dict[Any, list[Any]]:
"""Parse JSON test cases into TestCase objects."""
# Resolve path: use absolute paths as-is, resolve relative paths from user's terminal location
path = Path(json_file_path)
if not path.is_absolute():
path = Path.cwd() / path
with open(path, 'r') as f:
data = json.load(f)
test_cases = {}
for app in data:
for test_case_data in app['test_cases']:
# Extract user input as intent (first user input)
intent = test_case_data['intent'] if test_case_data['intent'] else ""
# Parse tool calls
tool_calls = [
ToolCall(name=call['name'], args=call['args'])
for call in test_case_data['expected_output']['tool_calls']
]
# Parse expected output
expected_output = ExpectedOutput(
response=test_case_data['expected_output']['response'],
keywords=test_case_data['expected_output']['keywords'],
tool_calls=tool_calls,
)
# Create TestCase object
test_case = TestCase(
app=app['name'],
name=test_case_data['name'],
description=test_case_data['description'],
intent=intent,
expected_output=expected_output,
)
if app['name'] not in test_cases:
test_cases[app['name']] = []
test_cases[app['name']].append(test_case)
return test_cases
async def run_cuga(test_file_path: str, result_file_path: str) -> (List[TestCase], List[ExperimentResult]):
test_cases = parse_test_cases(test_file_path)
print(f"test cases: {len(test_cases)}\napps: {list(test_cases.keys())}")
agent_runner = AgentRunner(browser_enabled=False)
results = []
for app in test_cases:
task_ids = [f"{app}_{str(i)}" for i in enumerate(test_cases[app])]
tracker.start_experiment(task_ids=task_ids, experiment_name=app, description="")
for i, task in enumerate(test_cases[app]):
try:
tracker.reset(intent=task.intent, task_id=f"{app}_{str(i)}")
result = await agent_runner.run_task_generic(
eval_mode=False, goal=task.intent, current_datetime=tracker.current_date
)
# Reset variables after task completion using the current state
state = agent_runner.get_current_state()
state.variables_manager.reset()
results.append(result)
parsed_results = parse_test_results([task], [result])
save_test_results(parsed_results, result_file_path)
# Extract langfuse trace ID (applicable only if `langfuse_tracing=true` in settings)
langfuse_trace_id = agent_runner.agent_loop_obj.get_langfuse_trace_id()
langfuse_handler = LangfuseTraceHandler(langfuse_trace_id)
langfuse_data = await langfuse_handler.get_langfuse_data()
tracker.finish_task(
intent=task.intent,
site="",
task_id=f"{app}_{str(i)}",
eval="",
score=mean(
[
parsed_results[0].score.keyword_score,
parsed_results[0].score.response_score,
parsed_results[0].score.tool_call_score,
]
),
agent_answer=result.answer,
exception=False,
agent_v="",
total_llm_calls=langfuse_data.total_llm_calls if langfuse_data else None,
total_tokens=langfuse_data.total_tokens if langfuse_data else None,
total_cost=langfuse_data.total_cost if langfuse_data else None,
total_cache_input_tokens=langfuse_data.total_cache_input_tokens
if langfuse_data
else None,
)
except Exception as e:
results.append(ExperimentResult(answer=f"Error {e}", score=0, messages=[], steps=[]))
tracker.finish_task(
intent=task.intent,
site="",
task_id=f"{app}_{str(i)}",
eval="",
score=0,
agent_answer=f"Error: {e}",
exception=True,
agent_v="",
)
logger.error(traceback.format_exc())
logger.error(e)
return test_cases, results
def parse_test_results(
test_cases: List[TestCase], experiment_results: List[ExperimentResult]
) -> List[TestResult]:
if len(test_cases) != len(experiment_results):
raise ValueError(f"Mismatch: {len(test_cases)} test cases vs {len(experiment_results)} results")
results = []
for i, (test_case, experiment_result) in enumerate(zip(test_cases, experiment_results)):
# Get answer text (handle None case)
answer = experiment_result.answer or ""
keywords = test_case.expected_output.keywords
expected_tools = [tool for tool in test_case.expected_output.tool_calls]
tool_calls = []
for call in [step for step in experiment_result.steps if "api_call" in step.name]:
call_json = json.loads(call.data)
tool_calls.append(ToolCall(name=call_json['function_name'], args=call_json['args']))
test_score, test_score_details = evaluate_test_and_details(
keywords, tool_calls, expected_tools, answer, test_case.expected_output.response
)
result = TestResult(
app=test_case.app,
index=i,
test_name=test_case.name,
score=test_score,
details=test_score_details,
)
results.append(result)
return results
def save_test_results(
results: List["TestResult"],
json_path: str = "test_results.json",
csv_path: Optional[str] = None,
) -> None:
"""
Save test results to JSON (as a list) and CSV (append rows, no duplicate headers).
"""
if csv_path is None:
csv_path = json_path[:-5] + ".csv" if json_path.endswith(".json") else json_path + ".csv"
# ---- JSON ----
# Load existing results (list), append, then overwrite
if os.path.exists(json_path) and os.path.getsize(json_path) > 0:
with open(json_path, "r", encoding="utf-8") as f:
try:
existing = json.load(f)
if not isinstance(existing, list):
existing = []
except json.JSONDecodeError:
existing = []
else:
existing = []
existing.extend(r.model_dump() for r in results)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(existing, f, indent=2, ensure_ascii=False)
# ---- CSV ----
def j(obj):
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
rows = []
for r in results:
rows.append(
{
"app": r.app,
"index": r.index,
"test_name": r.test_name,
"keyword_score": r.score.keyword_score,
"tool_call_score": r.score.tool_call_score,
"response_score": r.score.response_score,
"expected_keywords": j(r.details.expected_keywords),
"missing_keywords": j(r.details.missing_keywords),
"tool_call_mismatches": j([m.model_dump() for m in r.details.tool_call_mismatches]),
"response_expected": r.details.response_expected,
"response_actual": r.details.response_actual,
}
)
write_header = not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0
with open(csv_path, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
if write_header:
writer.writeheader()
writer.writerows(rows)
print(f"Saved {len(results)} results → JSON: {json_path} | CSV: {csv_path}")
if __name__ == "__main__":
import asyncio
import argparse
from cuga.config import settings
settings.update({"ADVANCED_FEATURES": {"TRACKER_ENABLED": True}}, merge=True)
parser = argparse.ArgumentParser(description="Run tests and save results.")
parser.add_argument("-t", "--test-file-path", required=True, help="Path to the test file")
parser.add_argument("-r", "--result-file-path", required=True, help="Path to the result file")
args = parser.parse_args()
tasks, results = asyncio.run(run_cuga(args.test_file_path, args.result_file_path))
|