Spaces:
Running
Running
File size: 5,573 Bytes
0646b18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
from cuga.backend.memory.agentic_memory import V1MemoryClient
from cuga.backend.memory.agentic_memory.schema import Run, RecordedFact, Namespace
from typing import List, Dict, Optional, TYPE_CHECKING
import os
import json
from cuga.config import settings
if TYPE_CHECKING:
from cuga.backend.cuga_graph.state.agent_state import AgentState
class Memory:
_instance = None
_initialized = False
def __new__(cls, memory_config=None):
if cls._instance is None:
cls._instance = super(Memory, cls).__new__(cls)
return cls._instance
def __init__(self, memory_config=None):
if not self._initialized:
port = settings.server_ports.memory
self.memory_client = V1MemoryClient(
base_url=os.environ.get("MEMORY_BASE_URL", f"http://localhost:{port}"), timeout=600
)
self.user_id = None
Memory._initialized = True
def health_check(self) -> bool:
return self.memory_client.health_check()
def create_namespace(
self,
namespace_id: str | None = None,
user_id: str | None = None,
agent_id: str | None = None,
app_id: str | None = None,
) -> Namespace:
"""Create a new namespace for facts to exist in."""
return self.memory_client.create_namespace(
namespace_id=namespace_id, user_id=user_id, agent_id=agent_id, app_id=app_id
)
def get_namespace_details(self, namespace_id: str) -> Namespace:
"""Get details about a specific namespace."""
return self.memory_client.get_namespace_details(namespace_id=namespace_id)
def search_namespaces(
self,
user_id: str | None = None,
agent_id: str | None = None,
app_id: str | None = None,
limit: int = 10,
) -> list[Namespace]:
"""Search namespace with filters."""
return self.memory_client.search_namespaces(
user_id=user_id, agent_id=agent_id, app_id=app_id, limit=limit
)
def delete_namespace(self, namespace_id: str):
"""Delete a namespace."""
self.memory_client.delete_namespace(namespace_id=namespace_id)
def create_and_store_fact(self, namespace_id: str, content: str, metadata: Optional[Dict] = None) -> str:
"""Add a single fact to a namespace."""
return self.memory_client.create_and_store_fact(
namespace_id=namespace_id, content=content, metadata=metadata
)
def search_for_facts(
self, namespace_id: str, query: Optional[str] = None, filters: dict | None = None, limit: int = 10
) -> List[RecordedFact]:
"""Search for facts in a namespace."""
return self.memory_client.search_for_facts(
namespace_id=namespace_id, query=query, filters=filters, limit=limit
)
def get_all_facts(self, namespace_id: str, limit: int = 100) -> List[RecordedFact]:
return self.memory_client.get_all_facts(namespace_id=namespace_id, limit=limit)
def get_matching_tips(
self,
namespace_id: str,
agent_id: str,
query: str,
limit: int = 3,
) -> list[str]:
"""Get matching facts and return them as JSON string.
This provides backward compatibility with the old get_matching_facts function
while using the new V1MemoryClient internally.
"""
recorded_facts = self.search_for_facts(
namespace_id=namespace_id, query=query, limit=limit, filters={"agent": agent_id, "user_id": "100"}
)
# Extract facts from the response (assuming similar structure to old implementation)
facts = [fact.content for fact in recorded_facts]
# Print debug info (maintaining original behavior)
print(query)
print("------ICLs--------")
for f in facts:
print(f)
return facts
def create_run(self, namespace_id: str, run_id: str | None = None) -> Run:
"""Create a new run to track Agent steps."""
return self.memory_client.create_run(namespace_id, run_id)
def get_run(self, namespace_id: str, run_id: str) -> Run:
"""Get an existing run."""
return self.memory_client.get_run(namespace_id, run_id)
def delete_run(self, namespace_id: str, run_id: str):
"""Delete an existing run."""
return self.memory_client.delete_run(namespace_id, run_id)
def search_runs(
self, namespace_id: str, query: str | None = None, filters: dict[str, str] | None = None
) -> Run | None:
"""Search a namespace for a run based on it's step which best matches a query."""
return self.memory_client.search_runs(namespace_id, query, filters)
def end_run(self, namespace_id: str, run_id: str):
"""End an existing run."""
return self.memory_client.end_run(namespace_id, run_id)
def add_step(self, namespace_id: str, run_id: str, step: dict, prompt: str) -> str:
"""Add a new step into a run."""
return self.memory_client.add_step(namespace_id, run_id, step, prompt)
def _get_user_id(self, state: "AgentState") -> str:
"""Extract or generate user ID for memory scoping"""
# Use the pi field from AgentState
if hasattr(state, 'pi') and state.pi:
pi_dict = json.loads(state.pi)
state.user_id = str(f"{pi_dict["first_name"]}_{pi_dict["last_name"]}_{pi_dict["phone_number"]}")
else:
state.user_id = "default_user"
self.user_id = state.user_id
return state.user_id
|