File size: 11,129 Bytes
0646b18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import uvicorn

from typing import Annotated
from dotenv import load_dotenv
from cuga.backend.memory.agentic_memory.backend.mem0_backend import Mem0MemoryBackend
from cuga.backend.memory.agentic_memory.config import get_config
from cuga.backend.memory.agentic_memory.utils.logging import Logging
from cuga.backend.memory.agentic_memory.schema import Fact, Message, RecordedFact, Run, Namespace
from fastapi import APIRouter, FastAPI, HTTPException, Path, Body, Query, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from mem0 import Memory
from mem0.configs.base import MemoryConfig
from pydantic.json_schema import SkipJsonSchema

load_dotenv(override=True)
memory = Memory(config=MemoryConfig.model_validate(get_config()))

logger = Logging.get_logger()
app = FastAPI(debug=True)
app.openapi_version = '3.0.3'

# Add CORS middleware to handle preflight OPTIONS requests
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # In production, replace with specific origins
    allow_credentials=True,
    allow_methods=["*"],  # Allow all HTTP methods including OPTIONS
    allow_headers=["*"],  # Allow all headers
)

router_v1 = APIRouter(prefix="/v1")

memory_backend = Mem0MemoryBackend()


@router_v1.get("/health/live")
@router_v1.get("/health/ready")
async def ready():
    """Attempt to query if the memory service is available"""
    if memory_backend.ready():
        return {"status": "ok"}
    raise HTTPException(status_code=503, detail="Memory service is unavailable")


@router_v1.post("/namespaces", status_code=201, response_description='The ID of the created namespace.')
def create_namespace(
    namespace_id: Annotated[
        str | SkipJsonSchema[None], Body(description='The namespace ID to create', pattern=r'^[a-zA-Z0-9_]+$')
    ] = None,
    user_id: Annotated[
        str | SkipJsonSchema[None],
        Body(description='The user that created the namespace.', pattern=r'^[a-zA-Z0-9_]+$'),
    ] = None,
    agent_id: Annotated[
        str | SkipJsonSchema[None],
        Body(description='The agent associated with the namespace.', pattern=r'^[a-zA-Z0-9_]+$'),
    ] = None,
    app_id: Annotated[
        str | SkipJsonSchema[None],
        Body(description='The application associated with the namespace.', pattern=r'^[a-zA-Z0-9_]+$'),
    ] = None,
) -> Namespace:
    """Create a new namespace for facts to exist in."""
    try:
        namespace = memory_backend.create_namespace(namespace_id, user_id, agent_id, app_id)
        return namespace
    except RuntimeError:
        raise HTTPException(status_code=409, detail=f'Namespace `{namespace_id}` already exists.')


@router_v1.get("/namespaces/{namespace_id}", response_description='The ID of the created namespace.')
def get_namespace_details(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
) -> Namespace:
    """Get the details of a specific namespace."""
    try:
        namespace = memory_backend.get_namespace_details(namespace_id)
    except LookupError:
        raise HTTPException(status_code=404, detail=f"Namespace {namespace_id} not found.")
    return namespace


@router_v1.get("/namespaces", response_description='A list of namespaces matching the filters.')
def search_namespaces(
    user_id: Annotated[
        str | SkipJsonSchema[None], Query(description='The user that created the namespace.')
    ] = None,
    agent_id: Annotated[
        str | SkipJsonSchema[None], Query(description='The agent associated with the namespace.')
    ] = None,
    app_id: Annotated[
        str | SkipJsonSchema[None], Query(description='The application associated with the namespace.')
    ] = None,
    limit: Annotated[int, Query(description='The number of results to return.')] = 10,
) -> list[Namespace]:
    """Find namespaces matching the filters."""
    if user_id is None and agent_id is None and app_id is None:
        return memory_backend.all_namespaces()
    else:
        return list(memory_backend.search_namespaces(user_id, agent_id, app_id, limit))


@router_v1.delete("/namespaces/{namespace_id}", status_code=204)
def delete_namespace(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
):
    """Delete a namespace that facts exist in."""
    memory_backend.delete_namespace(namespace_id=namespace_id)


@router_v1.put(
    "/namespaces/{namespace_id}/facts", status_code=201, response_description='The ID of the created fact.'
)
def create_and_store_fact(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    fact: Annotated[
        Fact,
        Body(
            description='A fact about the user, their personal preferences, upcoming plans, '
            'professional details, and other miscellaneous information.'
        ),
    ],
) -> str:
    """Based on something the user previously said, create and store in memory a new fact about the user,
    their personal preferences, upcoming plans, professional details, and other miscellaneous information.
    """
    return memory_backend.create_and_store_fact(namespace_id=namespace_id, fact=fact)


@router_v1.post(
    "/namespaces/{namespace_id}/facts",
    response_description='A list of facts that may be relevant to the user.',
)
def search_for_facts(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    query: Annotated[
        str | SkipJsonSchema[None],
        Body(description='A question written in natural language about the user that needs an answer.'),
    ] = None,
    filters: Annotated[
        dict | SkipJsonSchema[None], Body(description='A list of facts relevant to the user.')
    ] = None,
    limit: Annotated[int, Query(description='The maximum number of facts to return.')] = 10,
) -> list[RecordedFact]:
    """Based on a query, find in memory a fact about the user, their personal preferences, upcoming plans,
    professional details, and other miscellaneous information.
    """
    return memory_backend.search_for_facts(
        namespace_id=namespace_id, query=query, filters=filters, limit=limit
    )


@router_v1.delete("/namespaces/{namespace_id}/facts/{fact_id}", status_code=204)
def delete_fact_by_id(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    fact_id: Annotated[str, Path(description='The ID of the fact to delete.')],
):
    """Remove a fact from memory by its ID."""
    memory_backend.delete_fact_by_id(namespace_id=namespace_id, fact_id=fact_id)


@router_v1.post(
    "/namespaces/{namespace_id}/messages",
    response_description='The number of messages received for extraction.',
)
def extract_facts_from_messages(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    messages: Annotated[
        list[Message], Body(description='A list of messages between a user and a chatbot.', embed=True)
    ],
) -> str:
    """Takes a list of messages between a user and a chatbot, extracting and storing facts about the user,
    their personal preferences, upcoming plans, professional details, and other miscellaneous information.
    """

    memory_backend.extract_facts_from_messages(namespace_id=namespace_id, messages=messages)
    return f'{len(messages)} messages received for namespace {namespace_id}'


@router_v1.post(
    "/namespaces/{namespace_id}/runs",
)
def create_run(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    run_id: Annotated[
        str | SkipJsonSchema[None], Body(description='Optional ID to create the run with.', embed=True)
    ],
) -> Run:
    """Add a new run into memory. Runs are a series of steps executed in an agentic workflow."""
    return memory_backend.create_run(namespace_id=namespace_id, run_id=run_id)


@router_v1.get(
    "/namespaces/{namespace_id}/runs/{run_id}",
)
def get_run(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
) -> Run:
    """Get a run."""
    return memory_backend.get_run(namespace_id=namespace_id, run_id=run_id)


@router_v1.delete("/namespaces/{namespace_id}/runs/{run_id}", status_code=204)
def delete_run(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
):
    """Delete a run from memory."""
    return memory_backend.delete_run(namespace_id=namespace_id, run_id=run_id)


@router_v1.post("/namespaces/{namespace_id}/runs/{run_id}/steps", response_description='The number of runs.')
def add_step(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
    step: Annotated[dict, Body(description='The step, an arbitrary JSON object.')],
    prompt: Annotated[str, Body(description='The prompt used by an LLM to parse a step.')],
) -> str:
    """Add a new step into a run."""
    return memory_backend.add_step(namespace_id, run_id, step, prompt)


@router_v1.post("/namespaces/{namespace_id}/runs/search")
def search_runs(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    query: Annotated[
        str | SkipJsonSchema[None],
        Body(description='A question written in natural language about the user that needs an answer.'),
    ] = None,
    filters: Annotated[
        dict[str, str] | SkipJsonSchema[None],
        Body(description='A set of filters to apply against the metadata of each step.'),
    ] = None,
) -> Run:
    return memory_backend.search_runs(namespace_id=namespace_id, query=query, filters=filters)


@router_v1.post("/namespaces/{namespace_id}/runs/{run_id}/end")
def end_run(
    namespace_id: Annotated[
        str, Path(description='The namespace which contains facts relevant to the user.')
    ],
    run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
    background_tasks: BackgroundTasks,
) -> None:
    """End a run and execute any background tasks."""
    memory_backend.end_run(namespace_id, run_id)
    background_tasks.add_task(memory_backend.analyze_run, namespace_id=namespace_id, run_id=run_id)


app.include_router(router_v1)


def main():
    """Main entry point for the agentic-memory server."""
    # Set debug logging if FastAPI debug mode is enabled
    log_level = "debug" if app.debug else "info"
    uvicorn.run(app, host='0.0.0.0', port=8888, log_level=log_level)


if __name__ == '__main__':
    main()