Spaces:
Running
Running
| import json | |
| import uuid | |
| from typing import Literal, Optional, Dict, Callable | |
| from langchain_core.messages import HumanMessage, ToolCall, BaseMessage | |
| from loguru import logger | |
| from cuga.backend.activity_tracker.tracker import ActivityTracker, Step | |
| from cuga.backend.cuga_graph.nodes.shared.base_agent import create_partial | |
| from cuga.backend.cuga_graph.nodes.chat.chat_agent.chat_agent import ChatAgent | |
| from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode | |
| from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import ( | |
| create_flow_approve, | |
| create_new_flow_approve, | |
| ) | |
| from cuga.backend.cuga_graph.state.agent_state import AgentState | |
| from cuga.backend.cuga_graph.utils.nodes_names import NodeNames, ActionIds | |
| from langgraph.types import Command | |
| from cuga.config import settings | |
| tracker = ActivityTracker() | |
| ENABLE_SAVE_REUSE = settings.features.save_reuse | |
| class ChatHumanInTheLoopHandler: | |
| """Handler for chat-specific human-in-the-loop interactions""" | |
| def __init__(self): | |
| self._action_handlers: Dict[str, Callable] = { | |
| ActionIds.FLOW_APPROVE: self._handle_tool_execute, | |
| # Add chat-specific action handlers here | |
| # Example: ActionIds.TOOL_EXECUTE: self._handle_tool_execute, | |
| # ActionIds.CHAT_CONTINUE: self._handle_chat_continue, | |
| } | |
| def handle_human_response(self, state: AgentState, node_name: str) -> Command: | |
| """Handle any human response based on action_id""" | |
| action_id = state.hitl_response.action_id | |
| if action_id in self._action_handlers: | |
| return self._action_handlers[action_id](state, node_name) | |
| # Default fallback for chat - continue to final answer | |
| return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT) | |
| def add_action_handler(self, action_id: str, handler: Callable): | |
| """Add a custom action handler""" | |
| self._action_handlers[action_id] = handler | |
| def _handle_tool_execute(self, state: AgentState, node_name: str) -> Command: | |
| """Handle tool execution approval""" | |
| state.sender = node_name | |
| return Command(update=state.model_dump(), goto=NodeNames.WAIT_FOR_RESPONSE) | |
| def _handle_chat_continue(self, state: AgentState, node_name: str) -> Command: | |
| """Handle continuing chat conversation""" | |
| state.sender = node_name | |
| return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT) | |
| class ChatNode(BaseNode): | |
| def __init__(self): | |
| super().__init__() | |
| self.chat_agent: Optional[ChatAgent] = None | |
| self.hitl_handler = ChatHumanInTheLoopHandler() | |
| self._initialized = False | |
| async def create(cls): | |
| """Factory method to create and initialize the class""" | |
| instance = cls() | |
| instance.chat_agent = ChatAgent() | |
| if settings.features.chat: | |
| await instance.chat_agent.setup() | |
| instance.node = create_partial( | |
| ChatNode.node_handler, | |
| agent=instance.chat_agent, | |
| hitl_handler=instance.hitl_handler, | |
| name=instance.chat_agent.name, | |
| ) | |
| instance._initialized = True | |
| return instance | |
| def format_function_call(func_dict): | |
| name = func_dict["name"] | |
| args = func_dict["args"] | |
| def format_value(v): | |
| if isinstance(v, str): | |
| return f"'{v}'" | |
| elif isinstance(v, (list, dict)): | |
| return repr(v) | |
| else: | |
| return str(v) | |
| arg_strings = [f"{k}={format_value(v)}" for k, v in args.items()] | |
| return f"{name}({', '.join(arg_strings)})" | |
| async def node_handler( | |
| state: AgentState, agent: ChatAgent, hitl_handler: ChatHumanInTheLoopHandler, name: str | |
| ) -> Command[Literal["FinalAnswerAgent", "TaskAnalyzerAgent", "SuggestHumanActions"]]: | |
| # Handle human-in-the-loop responses | |
| if ( | |
| state.sender == NodeNames.WAIT_FOR_RESPONSE | |
| and state.hitl_response.action_id == ActionIds.FLOW_APPROVE | |
| ): | |
| tool = ToolCall(**state.hitl_response.additional_data.tool) | |
| res = await agent.execute_tool(tool) | |
| parsed_result = res | |
| if isinstance(res, str): | |
| try: | |
| parsed_result = json.loads(res) | |
| except (json.JSONDecodeError, TypeError): | |
| # If parsing fails, keep original string | |
| parsed_result = res | |
| # Get tool details | |
| tool_name = tool.get("name") | |
| tool_args = tool.get("args") | |
| # Add to variable manager | |
| var_name = f"tool_result_{str(uuid.uuid4())[:5]}" | |
| state.variables_manager.add_variable( | |
| parsed_result, var_name, f"Result of tool {tool_name} with args {tool_args}" | |
| ) | |
| state.sender = "ChatAgentTool" | |
| state.last_planner_answer = state.variables_manager.present_variable(var_name) | |
| return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT) | |
| if ( | |
| state.sender == NodeNames.WAIT_FOR_RESPONSE | |
| and state.hitl_response.action_id == ActionIds.NEW_FLOW_APPROVE | |
| ): | |
| logger.debug("tool call in chat node") | |
| tool = ToolCall(**state.hitl_response.additional_data.tool) | |
| state.input = tool.get("args").get("user_task") | |
| state.sender = "ChatAgent" | |
| return Command(update=state.model_dump(), goto=NodeNames.TASK_ANALYZER_AGENT) | |
| # If chat feature is disabled, go directly to task analyzer | |
| if not settings.features.chat: | |
| state.sender = name | |
| return Command(update=state.model_dump(), goto=NodeNames.TASK_ANALYZER_AGENT) | |
| # Process chat input | |
| state.sender = name | |
| state.chat_agent_messages.append(HumanMessage(content=state.input)) | |
| res: BaseMessage = await agent.invoke(state.chat_agent_messages, state) | |
| state.chat_agent_messages.append(res) | |
| # Handle tool calls - require human approval | |
| if ENABLE_SAVE_REUSE and res.tool_calls and res.tool_calls[0].get("name") == "run_new_flow": | |
| state.final_answer = state.chat_agent_messages[-1].content | |
| state.sender = name | |
| state.hitl_action = create_new_flow_approve(tool=res.tool_calls[0]) | |
| return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS) | |
| if ENABLE_SAVE_REUSE and res.tool_calls: | |
| state.final_answer = state.chat_agent_messages[-1].content | |
| state.sender = name | |
| state.hitl_action = create_flow_approve(tool=res.tool_calls[0]) | |
| return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS) | |
| if ( | |
| not ENABLE_SAVE_REUSE | |
| and res.tool_calls | |
| and len(res.tool_calls) > 0 | |
| and res.tool_calls[0].get("name") == "execute_task" | |
| ): | |
| logger.debug(f"tool call in chat node {res.tool_calls[0]}") | |
| variables_rel = res.tool_calls[0].get("args").get("relevant_variables") | |
| if variables_rel and len(variables_rel) > 0: | |
| state.input = ( | |
| f"task: {res.tool_calls[0].get('args').get('task')}" | |
| + f"\n relevant variables from history: {res.tool_calls[0].get('args').get('relevant_variables')}" | |
| ) | |
| else: | |
| state.input = res.tool_calls[0].get("args").get("task") | |
| return Command(update=state.model_dump(), goto="TaskAnalyzerAgent") | |
| # Regular chat response - add to messages and continue+ | |
| res.content = state.variables_manager.replace_variables_placeholders(res.content) | |
| state.messages.append(res) | |
| tracker.collect_step( | |
| step=Step( | |
| name=name, | |
| data=res.content, | |
| current_url=state.url, | |
| ) | |
| ) | |
| state.final_answer = state.chat_agent_messages[-1].content | |
| return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT) | |