Sami Marreed
feat: docker-v1 with optimized frontend
3289c58
from cuga.backend.memory.agentic_memory import V1MemoryClient
from cuga.backend.memory.agentic_memory.schema import Run, RecordedFact, Namespace
from typing import List, Dict, Optional, TYPE_CHECKING
import os
import json
from cuga.config import settings
if TYPE_CHECKING:
from cuga.backend.cuga_graph.state.agent_state import AgentState
class Memory:
_instance = None
_initialized = False
def __new__(cls, memory_config=None):
if cls._instance is None:
cls._instance = super(Memory, cls).__new__(cls)
return cls._instance
def __init__(self, memory_config=None):
if not self._initialized:
port = settings.server_ports.memory
self.memory_client = V1MemoryClient(
base_url=os.environ.get("MEMORY_BASE_URL", f"http://localhost:{port}"), timeout=600
)
self.user_id = None
Memory._initialized = True
def health_check(self) -> bool:
return self.memory_client.health_check()
def create_namespace(
self,
namespace_id: str | None = None,
user_id: str | None = None,
agent_id: str | None = None,
app_id: str | None = None,
) -> Namespace:
"""Create a new namespace for facts to exist in."""
return self.memory_client.create_namespace(
namespace_id=namespace_id, user_id=user_id, agent_id=agent_id, app_id=app_id
)
def get_namespace_details(self, namespace_id: str) -> Namespace:
"""Get details about a specific namespace."""
return self.memory_client.get_namespace_details(namespace_id=namespace_id)
def search_namespaces(
self,
user_id: str | None = None,
agent_id: str | None = None,
app_id: str | None = None,
limit: int = 10,
) -> list[Namespace]:
"""Search namespace with filters."""
return self.memory_client.search_namespaces(
user_id=user_id, agent_id=agent_id, app_id=app_id, limit=limit
)
def delete_namespace(self, namespace_id: str):
"""Delete a namespace."""
self.memory_client.delete_namespace(namespace_id=namespace_id)
def create_and_store_fact(self, namespace_id: str, content: str, metadata: Optional[Dict] = None) -> str:
"""Add a single fact to a namespace."""
return self.memory_client.create_and_store_fact(
namespace_id=namespace_id, content=content, metadata=metadata
)
def search_for_facts(
self, namespace_id: str, query: Optional[str] = None, filters: dict | None = None, limit: int = 10
) -> List[RecordedFact]:
"""Search for facts in a namespace."""
return self.memory_client.search_for_facts(
namespace_id=namespace_id, query=query, filters=filters, limit=limit
)
def get_all_facts(self, namespace_id: str, limit: int = 100) -> List[RecordedFact]:
return self.memory_client.get_all_facts(namespace_id=namespace_id, limit=limit)
def get_matching_tips(
self,
namespace_id: str,
agent_id: str,
query: str,
limit: int = 3,
) -> list[str]:
"""Get matching facts and return them as JSON string.
This provides backward compatibility with the old get_matching_facts function
while using the new V1MemoryClient internally.
"""
recorded_facts = self.search_for_facts(
namespace_id=namespace_id, query=query, limit=limit, filters={"agent": agent_id, "user_id": "100"}
)
# Extract facts from the response (assuming similar structure to old implementation)
facts = [fact.content for fact in recorded_facts]
# Print debug info (maintaining original behavior)
print(query)
print("------ICLs--------")
for f in facts:
print(f)
return facts
def create_run(self, namespace_id: str, run_id: str | None = None) -> Run:
"""Create a new run to track Agent steps."""
return self.memory_client.create_run(namespace_id, run_id)
def get_run(self, namespace_id: str, run_id: str) -> Run:
"""Get an existing run."""
return self.memory_client.get_run(namespace_id, run_id)
def delete_run(self, namespace_id: str, run_id: str):
"""Delete an existing run."""
return self.memory_client.delete_run(namespace_id, run_id)
def search_runs(
self, namespace_id: str, query: str | None = None, filters: dict[str, str] | None = None
) -> Run | None:
"""Search a namespace for a run based on it's step which best matches a query."""
return self.memory_client.search_runs(namespace_id, query, filters)
def end_run(self, namespace_id: str, run_id: str):
"""End an existing run."""
return self.memory_client.end_run(namespace_id, run_id)
def add_step(self, namespace_id: str, run_id: str, step: dict, prompt: str) -> str:
"""Add a new step into a run."""
return self.memory_client.add_step(namespace_id, run_id, step, prompt)
def _get_user_id(self, state: "AgentState") -> str:
"""Extract or generate user ID for memory scoping"""
# Use the pi field from AgentState
if hasattr(state, 'pi') and state.pi:
pi_dict = json.loads(state.pi)
state.user_id = str(f"{pi_dict["first_name"]}_{pi_dict["last_name"]}_{pi_dict["phone_number"]}")
else:
state.user_id = "default_user"
self.user_id = state.user_id
return state.user_id