File size: 5,573 Bytes
0646b18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from cuga.backend.memory.agentic_memory import V1MemoryClient
from cuga.backend.memory.agentic_memory.schema import Run, RecordedFact, Namespace
from typing import List, Dict, Optional, TYPE_CHECKING
import os
import json

from cuga.config import settings

if TYPE_CHECKING:
    from cuga.backend.cuga_graph.state.agent_state import AgentState


class Memory:
    _instance = None
    _initialized = False

    def __new__(cls, memory_config=None):
        if cls._instance is None:
            cls._instance = super(Memory, cls).__new__(cls)
        return cls._instance

    def __init__(self, memory_config=None):
        if not self._initialized:
            port = settings.server_ports.memory
            self.memory_client = V1MemoryClient(
                base_url=os.environ.get("MEMORY_BASE_URL", f"http://localhost:{port}"), timeout=600
            )
            self.user_id = None
            Memory._initialized = True

    def health_check(self) -> bool:
        return self.memory_client.health_check()

    def create_namespace(
        self,
        namespace_id: str | None = None,
        user_id: str | None = None,
        agent_id: str | None = None,
        app_id: str | None = None,
    ) -> Namespace:
        """Create a new namespace for facts to exist in."""
        return self.memory_client.create_namespace(
            namespace_id=namespace_id, user_id=user_id, agent_id=agent_id, app_id=app_id
        )

    def get_namespace_details(self, namespace_id: str) -> Namespace:
        """Get details about a specific namespace."""
        return self.memory_client.get_namespace_details(namespace_id=namespace_id)

    def search_namespaces(
        self,
        user_id: str | None = None,
        agent_id: str | None = None,
        app_id: str | None = None,
        limit: int = 10,
    ) -> list[Namespace]:
        """Search namespace with filters."""
        return self.memory_client.search_namespaces(
            user_id=user_id, agent_id=agent_id, app_id=app_id, limit=limit
        )

    def delete_namespace(self, namespace_id: str):
        """Delete a namespace."""
        self.memory_client.delete_namespace(namespace_id=namespace_id)

    def create_and_store_fact(self, namespace_id: str, content: str, metadata: Optional[Dict] = None) -> str:
        """Add a single fact to a namespace."""
        return self.memory_client.create_and_store_fact(
            namespace_id=namespace_id, content=content, metadata=metadata
        )

    def search_for_facts(
        self, namespace_id: str, query: Optional[str] = None, filters: dict | None = None, limit: int = 10
    ) -> List[RecordedFact]:
        """Search for facts in a namespace."""
        return self.memory_client.search_for_facts(
            namespace_id=namespace_id, query=query, filters=filters, limit=limit
        )

    def get_all_facts(self, namespace_id: str, limit: int = 100) -> List[RecordedFact]:
        return self.memory_client.get_all_facts(namespace_id=namespace_id, limit=limit)

    def get_matching_tips(
        self,
        namespace_id: str,
        agent_id: str,
        query: str,
        limit: int = 3,
    ) -> list[str]:
        """Get matching facts and return them as JSON string.

        This provides backward compatibility with the old get_matching_facts function
        while using the new V1MemoryClient internally.
        """
        recorded_facts = self.search_for_facts(
            namespace_id=namespace_id, query=query, limit=limit, filters={"agent": agent_id, "user_id": "100"}
        )

        # Extract facts from the response (assuming similar structure to old implementation)
        facts = [fact.content for fact in recorded_facts]

        # Print debug info (maintaining original behavior)
        print(query)
        print("------ICLs--------")
        for f in facts:
            print(f)

        return facts

    def create_run(self, namespace_id: str, run_id: str | None = None) -> Run:
        """Create a new run to track Agent steps."""
        return self.memory_client.create_run(namespace_id, run_id)

    def get_run(self, namespace_id: str, run_id: str) -> Run:
        """Get an existing run."""
        return self.memory_client.get_run(namespace_id, run_id)

    def delete_run(self, namespace_id: str, run_id: str):
        """Delete an existing run."""
        return self.memory_client.delete_run(namespace_id, run_id)

    def search_runs(
        self, namespace_id: str, query: str | None = None, filters: dict[str, str] | None = None
    ) -> Run | None:
        """Search a namespace for a run based on it's step which best matches a query."""
        return self.memory_client.search_runs(namespace_id, query, filters)

    def end_run(self, namespace_id: str, run_id: str):
        """End an existing run."""
        return self.memory_client.end_run(namespace_id, run_id)

    def add_step(self, namespace_id: str, run_id: str, step: dict, prompt: str) -> str:
        """Add a new step into a run."""
        return self.memory_client.add_step(namespace_id, run_id, step, prompt)

    def _get_user_id(self, state: "AgentState") -> str:
        """Extract or generate user ID for memory scoping"""
        # Use the pi field from AgentState
        if hasattr(state, 'pi') and state.pi:
            pi_dict = json.loads(state.pi)
            state.user_id = str(f"{pi_dict["first_name"]}_{pi_dict["last_name"]}_{pi_dict["phone_number"]}")
        else:
            state.user_id = "default_user"
        self.user_id = state.user_id
        return state.user_id