""" Extension-based implementations of browser interaction commands. These helpers are invoked when the Chrome extension is enabled (settings.advanced_features.use_extension == True). They wrap lower-level helpers such as `_send_browser_command` and element-lookup utilities so the public `@tool` functions in `tools.py` can simply delegate to them. """ from typing import Any, Dict, List, Literal, Optional from langchain_core.runnables import RunnableConfig from loguru import logger from cuga.backend.browser_env.page_understanding.types.dom_tree_types import ( DomTreeResult, NodeData, TextNodeData, ) from cuga.backend.cuga_graph.nodes.browser.action_agent.tools.alert import Alert IDENTIFIER_ELEMENT = "dom-tree-id" def _get_communicator(config: RunnableConfig | None) -> Any | None: """Retrieve the ChromeExtensionCommunicator instance. Preference order: 1. Provided via the tool's RunnableConfig under ``configurable.communicator``. 2. From page_data in configurable if available. 3. Global FastAPI app instance created in ``server.main`` (``app.state.chrome_extension_communicator``). 4. Return ``None`` if no communicator can be found. """ # 1) Try config if config and (comm := config.get("configurable", {}).get("communicator")): return comm # 2) Try page_data in configurable if config and (page_data := config.get("configurable", {}).get("page_data")): if isinstance(page_data, dict) and "chrome_extension_communicator" in page_data: return page_data["chrome_extension_communicator"] # 3) Try to import the running FastAPI app created in server.main try: from server.main import app # type: ignore comm = getattr(app.state, "chrome_extension_communicator", None) if comm: return comm except Exception: pass return None async def _send_browser_command(command: str, args: Dict[str, Any], config: RunnableConfig | None): """Send a browser command to the Chrome extension via WebSocket or HTTP stream. This is a *best-effort* operation: if no communicator is available we simply log and exit so the agent can continue operating without throwing errors. """ communicator = _get_communicator(config) if communicator is None: logger.warning( f"[tools.py] No ChromeExtensionCommunicator available – cannot send command '{command}'." ) return None try: msg = {"type": "browser_command", "command": command, "args": args} # Handle different communicator types if hasattr(communicator, 'server'): # WebSocket communicator response = await communicator.server.send_request(msg, timeout=10.0) else: # HTTP stream communicator response = await communicator.send_request(msg, timeout=10.0) logger.debug(f"[tools.py] Sent browser_command: {msg}") logger.debug(f"[tools.py] Received response: {response}") if response and response.get("type") == "error": logger.error(f"[tools.py] Browser command '{command}' failed: {response.get('message')}") return None return response except Exception as e: logger.error(f"[tools.py] Failed to send browser command '{command}': {e}") return None async def _add_animation( bid: str, icon_type: str, banner_text: str, config: RunnableConfig | None = None, ): """ Add a visual animation to the element with the given BID. Args: bid: The browsergym ID of the element icon_type: Type of icon to display (e.g., "typing", "loading", "success") banner_text: Text to display in the banner """ response = await _send_browser_command( "add_animation", {"bid": bid, "icon_type": icon_type, "banner_text": banner_text}, config, ) return response def _get_page_data(config: RunnableConfig | None) -> Optional[Dict[str, Any]]: """Retrieve page data from the config. Returns: Dict containing page data (dom_object, accessibility_tree, extra_properties, etc.) or None """ if not config: return None # Try to get from configurable.page_data page_data = config.get("configurable", {}).get("page_data") if page_data: return page_data return None def _get_dom_tree(config: RunnableConfig | None): """Retrieve the DOM tree from page data. Returns: DomTreeResult object or None if not found """ page_data = _get_page_data(config) if not page_data: return None return page_data.get("dom_tree") def get_node_by_dom_tree_id(dom_tree_id: int, dom_tree: DomTreeResult) -> NodeData | TextNodeData | None: # Traverse all nodes to find matching DOM Tree ID target_node = None for node in dom_tree.map.values(): if isinstance(node, NodeData) and node.dom_tree_id == dom_tree_id: target_node = node break if not target_node: logger.warning(f"No element found with dom_tree_id #{dom_tree_id} in DOM tree") return target_node def _find_browsergym_id_in_children( element: NodeData, dom_tree: DomTreeResult, max_depth: int = 2 ) -> str | None: """ Search for IDENTIFIER_ELEMENT attribute in element's children up to max_depth levels. Args: element: The DOM element to search in dom_tree: The DomTreeResult to get child nodes from max_depth: Maximum depth to search (default 2) Returns: The browsergym ID if found, None otherwise """ def search_recursive(node: NodeData, current_depth: int) -> str | None: if current_depth > max_depth: return None # Check if this node has the IDENTIFIER_ELEMENT if hasattr(node, 'attributes') and node.attributes: browsergym_id = node.attributes.get(IDENTIFIER_ELEMENT) if browsergym_id: return browsergym_id # Search children if we haven't reached max depth if current_depth < max_depth and hasattr(node, 'children') and node.children: for child_id in node.children: child_node = dom_tree.get_node(child_id) if child_node and isinstance(child_node, NodeData): # Skip text nodes result = search_recursive(child_node, current_depth + 1) if result: return result return None return search_recursive(element, 0) def get_element_name_by_bid(bid: str, page_data: dict) -> str | None: """Get element name/description by BID from accessibility tree. Args: bid: The browsergym ID of the element page_data: Page data containing accessibility_tree Returns: Element name/description or None if not found """ if not page_data or not bid: return None accessibility_tree = page_data.get("axtree_object", {}) nodes = accessibility_tree.get("nodes", []) for node in nodes: if node.get("browsergym_id") == bid: # Try to get name from various accessibility properties name = ( node.get("name", {}).get("value") or node.get("role", {}).get("value") or node.get("description", {}).get("value") ) return name return None async def _get_element_by_bid_with_validation( bid: str, config: RunnableConfig | None ) -> tuple[str | None, Alert | None]: """ Common helper function to get and validate an element by BID. Args: bid: The dom_tree_id of the target element as string config: RunnableConfig containing page data Returns: Tuple of (actual_browsergym_id, error_alert_or_none) If successful, returns (browsergym_id, None) If failed, returns (None, Alert_with_error_message) """ # Get page data to access element information dom_tree = _get_dom_tree(config) page_data = _get_page_data(config) if not dom_tree or not page_data: return None, Alert(message="Could not get page data or dom tree") try: dom_tree_id_int = int(bid) except (TypeError, ValueError): return None, Alert(message=f"Invalid dom_tree_id provided: {bid}") desired_element = get_node_by_dom_tree_id(dom_tree_id_int, dom_tree) logger.info(f"Found element {desired_element} on page") if not desired_element or isinstance(desired_element, TextNodeData): logger.warning(f"Element with dom_tree_id {bid} not found") return None, Alert(message=f"Element with dom_tree_id {bid} not found") # First try to get the IDENTIFIER_ELEMENT from the element itself desired_bid = desired_element.attributes.get(IDENTIFIER_ELEMENT) # If not found, search up to 2 levels down in children if not desired_bid: logger.info(f"IDENTIFIER_ELEMENT not found on element {bid}, searching children...") desired_bid = _find_browsergym_id_in_children(desired_element, dom_tree, max_depth=2) if not desired_bid: logger.warning( f"Attribute {IDENTIFIER_ELEMENT} not found in element {bid} or its children (up to 2 levels)" ) return None, Alert( message=f"Attribute {IDENTIFIER_ELEMENT} not found in element {bid} or its children" ) return desired_bid, None """Get the tag name for an element by BID from DOM snapshot. Args: bid: The browsergym ID of the element page_data: Page data containing dom_object Returns: Tag name (e.g., 'div', 'button', 'input') or None if not found """ if not page_data or not bid: return None dom_object = page_data.get("dom_object", {}) if not dom_object: return None def to_string(idx): if idx == -1: return None else: return dom_object["strings"][idx] # Pre-locate the bid string ID try: bid_string_id = dom_object["strings"].index("data-browsergym-id") except ValueError: return None # Find the node with this BID for document in dom_object.get("documents", []): backend_node_ids = document.get("nodes", {}).get("backendNodeId", []) node_attributes = document.get("nodes", {}).get("attributes", []) node_names = document.get("nodes", {}).get("nodeName", []) for node_idx, node_attrs in enumerate(node_attributes): if node_idx >= len(backend_node_ids) or node_idx >= len(node_names): continue # Check if this node has the target BID found_bid = None for i in range(0, len(node_attrs), 2): if i + 1 >= len(node_attrs): break name_string_id = node_attrs[i] value_string_id = node_attrs[i + 1] if name_string_id == bid_string_id: found_bid = to_string(value_string_id) break if found_bid == bid: # Found the node, get its tag name node_name_id = node_names[node_idx] return to_string(node_name_id) return None # --------------------------------------------------------------------------- # Low-level helpers (extension only) # --------------------------------------------------------------------------- async def click_impl( *, bid: str, button: Literal["left", "middle", "right"] = "left", modifiers: Optional[List[Literal["Alt", "Control", "Meta", "Shift"]]] = None, config: RunnableConfig | None = None, ) -> Optional[Alert]: """Implementation of the *click* command when the extension is enabled.""" modifiers = modifiers or [] # Validate / map the provided DOM-tree id to the browsergym id that the # extension understands. desired_bid, error_alert = await _get_element_by_bid_with_validation(bid, config) if error_alert: return error_alert # early exit # Visual feedback in the browser (purple glow & banner) try: await _add_animation(desired_bid, "success", "Clicked!", config) # type: ignore except Exception as e: # pragma: no cover – animation failures are non-fatal logger.warning(f"[extension_commands] Failed to trigger click animation: {e}") # Finally send command to the browser via the communicator response = await _send_browser_command( "click", {"bid": desired_bid, "button": button, "modifiers": modifiers}, config, ) if response and response.get("result", {}).get("success"): logger.info(f"Click successful on element {bid}") return None error_msg = response.get("message", "Unknown error") if response else "No response from browser" logger.error(f"Click failed on element {bid}: {error_msg}") return Alert(message=f"Click failed: {error_msg}") async def type_impl( *, bid: str, value: str, press_enter: bool, config: RunnableConfig | None = None, ) -> Optional[Alert]: """Implementation of the *type* command when the extension is enabled.""" desired_bid, error_alert = await _get_element_by_bid_with_validation(bid, config) if error_alert: return error_alert try: await _add_animation(desired_bid, "typing", "Typing...", config) # type: ignore except Exception as e: logger.warning(f"[extension_commands] Failed to trigger typing animation: {e}") response = await _send_browser_command( "type", {"bid": desired_bid, "value": value, "press_enter": press_enter}, config, ) if response and response.get("result", {}).get("success"): logger.info(f"Type successful on element {bid}") return None error_msg = response.get("message", "Unknown error") if response else "No response from browser" logger.error(f"Type failed on element {bid}: {error_msg}") return Alert(message=f"Type failed: {error_msg}") async def select_option_impl( *, bid: str, options: str | List[str], config: RunnableConfig | None = None, ) -> Optional[Alert]: """Implementation of *select_option* when the extension is enabled.""" desired_bid, error_alert = await _get_element_by_bid_with_validation(bid, config) if error_alert: return error_alert try: await _add_animation(desired_bid, "success", "Selected!", config) # type: ignore except Exception as e: logger.warning(f"[extension_commands] Failed to trigger selection animation: {e}") response = await _send_browser_command("select_option", {"bid": desired_bid, "options": options}, config) if response and response.get("result", {}).get("success"): logger.info(f"Select successful on element {bid}") return None error_msg = response.get("message", "Unknown error") if response else "No response from browser" logger.error(f"Select failed on element {bid}: {error_msg}") return Alert(message=f"Select failed: {error_msg}") async def open_app_impl(*, app_name: str, config: RunnableConfig | None = None): """Implementation of *open_app* when the extension is enabled.""" # Delegate actual work to the background extension via communicator. await _send_browser_command("open_app", {"app_name": app_name}, config) # Nothing to return – any error will be logged by `_send_browser_command`. return None async def open_dropdown_impl( *, bid: str, config: RunnableConfig | None = None, ) -> Optional[Alert]: """Open a dropdown element using the extension’s click handler.""" # This re-uses the click implementation but forces `button="left"` and no modifiers. return await click_impl(bid=bid, button="left", modifiers=[], config=config) async def go_back_impl(config: RunnableConfig | None = None): """ Go back to previous page. Examples: """ await _send_browser_command("go_back", {}, config)