cuga-agent / src /cuga /backend /browser_env /tools /extension_commands.py
Sami Marreed
feat: docker-v1 with optimized frontend
0646b18
"""
Extension-based implementations of browser interaction commands.
These helpers are invoked when the Chrome extension is enabled (settings.advanced_features.use_extension == True).
They wrap lower-level helpers such as `_send_browser_command` and element-lookup utilities so the public
`@tool` functions in `tools.py` can simply delegate to them.
"""
from typing import Any, Dict, List, Literal, Optional
from langchain_core.runnables import RunnableConfig
from loguru import logger
from cuga.backend.browser_env.page_understanding.types.dom_tree_types import (
DomTreeResult,
NodeData,
TextNodeData,
)
from cuga.backend.cuga_graph.nodes.browser.action_agent.tools.alert import Alert
IDENTIFIER_ELEMENT = "dom-tree-id"
def _get_communicator(config: RunnableConfig | None) -> Any | None:
"""Retrieve the ChromeExtensionCommunicator instance.
Preference order:
1. Provided via the tool's RunnableConfig under ``configurable.communicator``.
2. From page_data in configurable if available.
3. Global FastAPI app instance created in ``server.main`` (``app.state.chrome_extension_communicator``).
4. Return ``None`` if no communicator can be found.
"""
# 1) Try config
if config and (comm := config.get("configurable", {}).get("communicator")):
return comm
# 2) Try page_data in configurable
if config and (page_data := config.get("configurable", {}).get("page_data")):
if isinstance(page_data, dict) and "chrome_extension_communicator" in page_data:
return page_data["chrome_extension_communicator"]
# 3) Try to import the running FastAPI app created in server.main
try:
from server.main import app # type: ignore
comm = getattr(app.state, "chrome_extension_communicator", None)
if comm:
return comm
except Exception:
pass
return None
async def _send_browser_command(command: str, args: Dict[str, Any], config: RunnableConfig | None):
"""Send a browser command to the Chrome extension via WebSocket or HTTP stream.
This is a *best-effort* operation: if no communicator is available we simply log and exit so
the agent can continue operating without throwing errors.
"""
communicator = _get_communicator(config)
if communicator is None:
logger.warning(
f"[tools.py] No ChromeExtensionCommunicator available – cannot send command '{command}'."
)
return None
try:
msg = {"type": "browser_command", "command": command, "args": args}
# Handle different communicator types
if hasattr(communicator, 'server'):
# WebSocket communicator
response = await communicator.server.send_request(msg, timeout=10.0)
else:
# HTTP stream communicator
response = await communicator.send_request(msg, timeout=10.0)
logger.debug(f"[tools.py] Sent browser_command: {msg}")
logger.debug(f"[tools.py] Received response: {response}")
if response and response.get("type") == "error":
logger.error(f"[tools.py] Browser command '{command}' failed: {response.get('message')}")
return None
return response
except Exception as e:
logger.error(f"[tools.py] Failed to send browser command '{command}': {e}")
return None
async def _add_animation(
bid: str,
icon_type: str,
banner_text: str,
config: RunnableConfig | None = None,
):
"""
Add a visual animation to the element with the given BID.
Args:
bid: The browsergym ID of the element
icon_type: Type of icon to display (e.g., "typing", "loading", "success")
banner_text: Text to display in the banner
"""
response = await _send_browser_command(
"add_animation",
{"bid": bid, "icon_type": icon_type, "banner_text": banner_text},
config,
)
return response
def _get_page_data(config: RunnableConfig | None) -> Optional[Dict[str, Any]]:
"""Retrieve page data from the config.
Returns:
Dict containing page data (dom_object, accessibility_tree, extra_properties, etc.) or None
"""
if not config:
return None
# Try to get from configurable.page_data
page_data = config.get("configurable", {}).get("page_data")
if page_data:
return page_data
return None
def _get_dom_tree(config: RunnableConfig | None):
"""Retrieve the DOM tree from page data.
Returns:
DomTreeResult object or None if not found
"""
page_data = _get_page_data(config)
if not page_data:
return None
return page_data.get("dom_tree")
def get_node_by_dom_tree_id(dom_tree_id: int, dom_tree: DomTreeResult) -> NodeData | TextNodeData | None:
# Traverse all nodes to find matching DOM Tree ID
target_node = None
for node in dom_tree.map.values():
if isinstance(node, NodeData) and node.dom_tree_id == dom_tree_id:
target_node = node
break
if not target_node:
logger.warning(f"No element found with dom_tree_id #{dom_tree_id} in DOM tree")
return target_node
def _find_browsergym_id_in_children(
element: NodeData, dom_tree: DomTreeResult, max_depth: int = 2
) -> str | None:
"""
Search for IDENTIFIER_ELEMENT attribute in element's children up to max_depth levels.
Args:
element: The DOM element to search in
dom_tree: The DomTreeResult to get child nodes from
max_depth: Maximum depth to search (default 2)
Returns:
The browsergym ID if found, None otherwise
"""
def search_recursive(node: NodeData, current_depth: int) -> str | None:
if current_depth > max_depth:
return None
# Check if this node has the IDENTIFIER_ELEMENT
if hasattr(node, 'attributes') and node.attributes:
browsergym_id = node.attributes.get(IDENTIFIER_ELEMENT)
if browsergym_id:
return browsergym_id
# Search children if we haven't reached max depth
if current_depth < max_depth and hasattr(node, 'children') and node.children:
for child_id in node.children:
child_node = dom_tree.get_node(child_id)
if child_node and isinstance(child_node, NodeData): # Skip text nodes
result = search_recursive(child_node, current_depth + 1)
if result:
return result
return None
return search_recursive(element, 0)
def get_element_name_by_bid(bid: str, page_data: dict) -> str | None:
"""Get element name/description by BID from accessibility tree.
Args:
bid: The browsergym ID of the element
page_data: Page data containing accessibility_tree
Returns:
Element name/description or None if not found
"""
if not page_data or not bid:
return None
accessibility_tree = page_data.get("axtree_object", {})
nodes = accessibility_tree.get("nodes", [])
for node in nodes:
if node.get("browsergym_id") == bid:
# Try to get name from various accessibility properties
name = (
node.get("name", {}).get("value")
or node.get("role", {}).get("value")
or node.get("description", {}).get("value")
)
return name
return None
async def _get_element_by_bid_with_validation(
bid: str, config: RunnableConfig | None
) -> tuple[str | None, Alert | None]:
"""
Common helper function to get and validate an element by BID.
Args:
bid: The dom_tree_id of the target element as string
config: RunnableConfig containing page data
Returns:
Tuple of (actual_browsergym_id, error_alert_or_none)
If successful, returns (browsergym_id, None)
If failed, returns (None, Alert_with_error_message)
"""
# Get page data to access element information
dom_tree = _get_dom_tree(config)
page_data = _get_page_data(config)
if not dom_tree or not page_data:
return None, Alert(message="Could not get page data or dom tree")
try:
dom_tree_id_int = int(bid)
except (TypeError, ValueError):
return None, Alert(message=f"Invalid dom_tree_id provided: {bid}")
desired_element = get_node_by_dom_tree_id(dom_tree_id_int, dom_tree)
logger.info(f"Found element {desired_element} on page")
if not desired_element or isinstance(desired_element, TextNodeData):
logger.warning(f"Element with dom_tree_id {bid} not found")
return None, Alert(message=f"Element with dom_tree_id {bid} not found")
# First try to get the IDENTIFIER_ELEMENT from the element itself
desired_bid = desired_element.attributes.get(IDENTIFIER_ELEMENT)
# If not found, search up to 2 levels down in children
if not desired_bid:
logger.info(f"IDENTIFIER_ELEMENT not found on element {bid}, searching children...")
desired_bid = _find_browsergym_id_in_children(desired_element, dom_tree, max_depth=2)
if not desired_bid:
logger.warning(
f"Attribute {IDENTIFIER_ELEMENT} not found in element {bid} or its children (up to 2 levels)"
)
return None, Alert(
message=f"Attribute {IDENTIFIER_ELEMENT} not found in element {bid} or its children"
)
return desired_bid, None
"""Get the tag name for an element by BID from DOM snapshot.
Args:
bid: The browsergym ID of the element
page_data: Page data containing dom_object
Returns:
Tag name (e.g., 'div', 'button', 'input') or None if not found
"""
if not page_data or not bid:
return None
dom_object = page_data.get("dom_object", {})
if not dom_object:
return None
def to_string(idx):
if idx == -1:
return None
else:
return dom_object["strings"][idx]
# Pre-locate the bid string ID
try:
bid_string_id = dom_object["strings"].index("data-browsergym-id")
except ValueError:
return None
# Find the node with this BID
for document in dom_object.get("documents", []):
backend_node_ids = document.get("nodes", {}).get("backendNodeId", [])
node_attributes = document.get("nodes", {}).get("attributes", [])
node_names = document.get("nodes", {}).get("nodeName", [])
for node_idx, node_attrs in enumerate(node_attributes):
if node_idx >= len(backend_node_ids) or node_idx >= len(node_names):
continue
# Check if this node has the target BID
found_bid = None
for i in range(0, len(node_attrs), 2):
if i + 1 >= len(node_attrs):
break
name_string_id = node_attrs[i]
value_string_id = node_attrs[i + 1]
if name_string_id == bid_string_id:
found_bid = to_string(value_string_id)
break
if found_bid == bid:
# Found the node, get its tag name
node_name_id = node_names[node_idx]
return to_string(node_name_id)
return None
# ---------------------------------------------------------------------------
# Low-level helpers (extension only)
# ---------------------------------------------------------------------------
async def click_impl(
*,
bid: str,
button: Literal["left", "middle", "right"] = "left",
modifiers: Optional[List[Literal["Alt", "Control", "Meta", "Shift"]]] = None,
config: RunnableConfig | None = None,
) -> Optional[Alert]:
"""Implementation of the *click* command when the extension is enabled."""
modifiers = modifiers or []
# Validate / map the provided DOM-tree id to the browsergym id that the
# extension understands.
desired_bid, error_alert = await _get_element_by_bid_with_validation(bid, config)
if error_alert:
return error_alert # early exit
# Visual feedback in the browser (purple glow & banner)
try:
await _add_animation(desired_bid, "success", "Clicked!", config) # type: ignore
except Exception as e: # pragma: no cover – animation failures are non-fatal
logger.warning(f"[extension_commands] Failed to trigger click animation: {e}")
# Finally send command to the browser via the communicator
response = await _send_browser_command(
"click",
{"bid": desired_bid, "button": button, "modifiers": modifiers},
config,
)
if response and response.get("result", {}).get("success"):
logger.info(f"Click successful on element {bid}")
return None
error_msg = response.get("message", "Unknown error") if response else "No response from browser"
logger.error(f"Click failed on element {bid}: {error_msg}")
return Alert(message=f"Click failed: {error_msg}")
async def type_impl(
*,
bid: str,
value: str,
press_enter: bool,
config: RunnableConfig | None = None,
) -> Optional[Alert]:
"""Implementation of the *type* command when the extension is enabled."""
desired_bid, error_alert = await _get_element_by_bid_with_validation(bid, config)
if error_alert:
return error_alert
try:
await _add_animation(desired_bid, "typing", "Typing...", config) # type: ignore
except Exception as e:
logger.warning(f"[extension_commands] Failed to trigger typing animation: {e}")
response = await _send_browser_command(
"type",
{"bid": desired_bid, "value": value, "press_enter": press_enter},
config,
)
if response and response.get("result", {}).get("success"):
logger.info(f"Type successful on element {bid}")
return None
error_msg = response.get("message", "Unknown error") if response else "No response from browser"
logger.error(f"Type failed on element {bid}: {error_msg}")
return Alert(message=f"Type failed: {error_msg}")
async def select_option_impl(
*,
bid: str,
options: str | List[str],
config: RunnableConfig | None = None,
) -> Optional[Alert]:
"""Implementation of *select_option* when the extension is enabled."""
desired_bid, error_alert = await _get_element_by_bid_with_validation(bid, config)
if error_alert:
return error_alert
try:
await _add_animation(desired_bid, "success", "Selected!", config) # type: ignore
except Exception as e:
logger.warning(f"[extension_commands] Failed to trigger selection animation: {e}")
response = await _send_browser_command("select_option", {"bid": desired_bid, "options": options}, config)
if response and response.get("result", {}).get("success"):
logger.info(f"Select successful on element {bid}")
return None
error_msg = response.get("message", "Unknown error") if response else "No response from browser"
logger.error(f"Select failed on element {bid}: {error_msg}")
return Alert(message=f"Select failed: {error_msg}")
async def open_app_impl(*, app_name: str, config: RunnableConfig | None = None):
"""Implementation of *open_app* when the extension is enabled."""
# Delegate actual work to the background extension via communicator.
await _send_browser_command("open_app", {"app_name": app_name}, config)
# Nothing to return – any error will be logged by `_send_browser_command`.
return None
async def open_dropdown_impl(
*,
bid: str,
config: RunnableConfig | None = None,
) -> Optional[Alert]:
"""Open a dropdown element using the extension’s click handler."""
# This re-uses the click implementation but forces `button="left"` and no modifiers.
return await click_impl(bid=bid, button="left", modifiers=[], config=config)
async def go_back_impl(config: RunnableConfig | None = None):
"""
Go back to previous page.
Examples:
"""
await _send_browser_command("go_back", {}, config)