Spaces:
Running
Running
| """ | |
| Simple tests ensuring CLI interface can be called, with underlying functionality mocked. | |
| """ | |
| import datetime | |
| import re | |
| from cuga.cli import app | |
| from typer.testing import CliRunner | |
| runner = CliRunner() | |
| def test_health_check(monkeypatch): | |
| from cuga.backend.memory import Memory | |
| monkeypatch.setattr(Memory, "health_check", lambda self: False) | |
| result = runner.invoke(app, ["memory", "namespace", "create", "foobar", "--user-id", "baz"]) | |
| assert result.exit_code == 1 | |
| assert result.output == "Memory service is not running.\n" | |
| def test_create_namespace(monkeypatch): | |
| from cuga.backend.memory import Memory, Namespace | |
| def create_namespace(self, namespace_id, user_id, agent_id, app_id): | |
| return Namespace(id="foobar", created_at=datetime.datetime.now(datetime.UTC), user_id="baz") | |
| monkeypatch.setattr(Memory, "health_check", lambda self: True) | |
| monkeypatch.setattr(Memory, "create_namespace", create_namespace) | |
| result = runner.invoke(app, ["memory", "namespace", "create", "foobar", "--user-id", "baz"]) | |
| assert result.exit_code == 0 | |
| assert result.output == "Created namespace `foobar`\n" | |
| def test_create_namespace_already_exists(monkeypatch): | |
| from cuga.backend.memory import Memory, APIRequestException | |
| def create_namespace(self, namespace_id, user_id, agent_id, app_id): | |
| raise APIRequestException('409') | |
| monkeypatch.setattr(Memory, "health_check", lambda self: True) | |
| monkeypatch.setattr(Memory, "create_namespace", create_namespace) | |
| result = runner.invoke(app, ["memory", "namespace", "create", "foobar", "--user-id", "baz"]) | |
| assert result.exit_code == 1 | |
| assert result.output == "Namespace `foobar` already exists.\n" | |
| def test_get_namespace_details(monkeypatch): | |
| from cuga.backend.memory import Memory, Namespace | |
| created_at = datetime.datetime.now(datetime.UTC) | |
| def get_namespace_details(self, namespace_id): | |
| return Namespace(id="foobar", created_at=created_at, user_id="baz") | |
| monkeypatch.setattr(Memory, "health_check", lambda self: True) | |
| monkeypatch.setattr(Memory, "get_namespace_details", get_namespace_details) | |
| result = runner.invoke(app, ["memory", "namespace", "details", "foobar"]) | |
| assert result.exit_code == 0 | |
| header = re.compile(r"β ID\s+β Created At\s+β User ID\s+β Agent ID\s+β Application ID\s+β Entities\s+β") | |
| assert header.search(result.output) is not None | |
| assert "foobar" in result.output | |
| assert "baz" in result.output | |
| assert created_at.isoformat()[:10] in result.output | |
| def test_get_namespace_details_not_found(monkeypatch): | |
| from cuga.backend.memory import Memory, NamespaceNotFoundException | |
| def get_namespace_details(self, namespace_id): | |
| raise NamespaceNotFoundException() | |
| monkeypatch.setattr(Memory, "health_check", lambda self: True) | |
| monkeypatch.setattr(Memory, "get_namespace_details", get_namespace_details) | |
| result = runner.invoke(app, ["memory", "namespace", "details", "foobar"]) | |
| assert result.exit_code == 1 | |
| assert result.output == "Namespace `foobar` not found.\n" | |
| def test_search_namespaces(monkeypatch): | |
| from cuga.backend.memory import Memory, Namespace | |
| created_at = datetime.datetime.now(datetime.UTC) | |
| def search_namespaces(self, user_id, agent_id, app_id, limit): | |
| return [Namespace(id="foobar", created_at=created_at, user_id="baz")] | |
| monkeypatch.setattr(Memory, "health_check", lambda self: True) | |
| monkeypatch.setattr(Memory, "search_namespaces", search_namespaces) | |
| result = runner.invoke(app, ["memory", "namespace", "search"]) | |
| assert result.exit_code == 0 | |
| header = re.compile(r"β ID\s+β Created At\s+β User ID\s+β Agent ID\s+β Application ID\s+β") | |
| assert header.search(result.output) is not None | |
| assert "foobar" in result.output | |
| assert "baz" in result.output | |
| assert created_at.isoformat()[:10] in result.output | |
| def test_delete_namespace(monkeypatch): | |
| from cuga.backend.memory import Memory | |
| def delete_namespace(self, namespace_id): | |
| pass | |
| monkeypatch.setattr(Memory, "health_check", lambda self: True) | |
| monkeypatch.setattr(Memory, "delete_namespace", delete_namespace) | |
| result = runner.invoke(app, ["memory", "namespace", "delete", "foobar"]) | |
| assert result.exit_code == 0 | |
| assert result.output == "Deleted namespace `foobar`\n" | |