Spaces:
Runtime error
Runtime error
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| import io | |
| import json | |
| import logging | |
| import os | |
| import tarfile | |
| import time | |
| from functools import wraps | |
| from pathlib import Path | |
| from random import randint | |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union | |
| import requests | |
| from pydantic import BaseModel | |
| from tqdm import tqdm | |
| from camel.runtime import BaseRuntime, TaskConfig | |
| from camel.toolkits import FunctionTool | |
| if TYPE_CHECKING: | |
| from docker.models.containers import Container | |
| logger = logging.getLogger(__name__) | |
| class DockerRuntime(BaseRuntime): | |
| r"""A class representing a runtime environment using Docker. | |
| This class automatically wraps functions to be executed | |
| in a Docker container. | |
| Args: | |
| image (str): The name of the Docker image to use for the runtime. | |
| port (int): The port number to use for the runtime API. (default: :obj: | |
| `8000`) | |
| remove (bool): Whether to remove the container after stopping it. ' | |
| (default: :obj: `True`) | |
| kwargs (dict): Additional keyword arguments to pass to the | |
| Docker client. | |
| """ | |
| def __init__( | |
| self, image: str, port: int = 8000, remove: bool = True, **kwargs | |
| ): | |
| super().__init__() | |
| import docker | |
| self.client = docker.from_env() | |
| self.container: Optional[Container] = None | |
| api_path = Path(__file__).parent / "api.py" | |
| self.mounts: Dict[Path, Path] = dict() | |
| self.cp: Dict[Path, Path] = {api_path: Path("/home")} | |
| self.entrypoint: Dict[str, str] = dict() | |
| self.tasks: List[TaskConfig] = [] | |
| self.docker_config = kwargs | |
| self.image = image | |
| self.port = port if port > 0 else randint(10000, 20000) | |
| self.remove = remove | |
| if not self.client.images.list(name=self.image): | |
| logger.warning( | |
| f"Image {self.image} not found. Pulling from Docker Hub." | |
| ) | |
| self.client.images.pull(self.image) | |
| def mount(self, path: str, mount_path: str) -> "DockerRuntime": | |
| r"""Mount a local directory to the container. | |
| Args: | |
| path (str): The local path to mount. | |
| mount_path (str): The path to mount the local directory to in the | |
| container. | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| _path, _mount_path = Path(path), Path(mount_path) | |
| if not _path.exists(): | |
| raise FileNotFoundError(f"Path {_path} does not exist.") | |
| if not _path.is_dir(): | |
| raise NotADirectoryError(f"Path {_path} is not a directory.") | |
| if not _path.is_absolute(): | |
| raise ValueError(f"Path {_path} is not absolute.") | |
| if not _mount_path.is_absolute(): | |
| raise ValueError(f"Mount path {_mount_path} is not absolute.") | |
| self.mounts[_path] = _mount_path | |
| return self | |
| def copy(self, source: str, dest: str) -> "DockerRuntime": | |
| r"""Copy a file or directory to the container. | |
| Args: | |
| source (str): The local path to the file. | |
| dest (str): The path to copy the file to in the container. | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| _source, _dest = Path(source), Path(dest) | |
| if not _source.exists(): | |
| raise FileNotFoundError(f"Source {_source} does not exist.") | |
| self.cp[_source] = _dest | |
| return self | |
| def add_task( | |
| self, | |
| task: TaskConfig, | |
| ) -> "DockerRuntime": | |
| r"""Add a task to run a command inside the container when building. | |
| Similar to `docker exec`. | |
| Args: | |
| task (TaskConfig): The configuration for the task. | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| self.tasks.append(task) | |
| return self | |
| def exec_run( | |
| self, | |
| task: TaskConfig, | |
| ) -> Any: | |
| r"""Run a command inside this container. Similar to `docker exec`. | |
| Args: | |
| task (TaskConfig): The configuration for the task. | |
| Returns: | |
| (ExecResult): A tuple of (exit_code, output) | |
| exit_code: (int): | |
| Exit code for the executed command or `None` if | |
| either `stream` or `socket` is `True`. | |
| output: (generator, bytes, or tuple): | |
| If `stream=True`, a generator yielding response chunks. | |
| If `socket=True`, a socket object for the connection. | |
| If `demux=True`, a tuple of two bytes: stdout and stderr. | |
| A bytestring containing response data otherwise. | |
| Raises: | |
| RuntimeError: If the container does not exist. | |
| """ | |
| if not self.container: | |
| raise RuntimeError( | |
| "Container does not exist. Please build the container first." | |
| ) | |
| return self.container.exec_run(**task.model_dump()) | |
| def build(self, time_out: int = 15) -> "DockerRuntime": | |
| r"""Build the Docker container and start it. | |
| Args: | |
| time_out (int): The number of seconds to wait for the container to | |
| start. (default: :obj: `15`) | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| if self.container: | |
| logger.warning("Container already exists. Nothing to build.") | |
| return self | |
| import docker | |
| from docker.types import Mount | |
| mounts = [] | |
| for local_path, mount_path in self.mounts.items(): | |
| mounts.append( | |
| Mount( | |
| target=str(mount_path), source=str(local_path), type="bind" | |
| ) | |
| ) | |
| container_params = { | |
| "image": self.image, | |
| "detach": True, | |
| "mounts": mounts, | |
| "command": "sleep infinity", | |
| **self.docker_config, | |
| } | |
| container_params["ports"] = {"8000/tcp": self.port} | |
| try: | |
| self.container = self.client.containers.create(**container_params) | |
| except docker.errors.APIError as e: | |
| raise RuntimeError(f"Failed to create container: {e!s}") | |
| try: | |
| self.container.start() | |
| # Wait for the container to start | |
| for _ in range(time_out): | |
| self.container.reload() | |
| logger.debug(f"Container status: {self.container.status}") | |
| if self.container.status == "running": | |
| break | |
| time.sleep(1) | |
| except docker.errors.APIError as e: | |
| raise RuntimeError(f"Failed to start container: {e!s}") | |
| # Copy files to the container if specified | |
| for local_path, container_path in self.cp.items(): | |
| logger.info(f"Copying {local_path} to {container_path}") | |
| try: | |
| with io.BytesIO() as tar_stream: | |
| with tarfile.open(fileobj=tar_stream, mode="w") as tar: | |
| tar.add( | |
| local_path, arcname=os.path.basename(local_path) | |
| ) | |
| tar_stream.seek(0) | |
| self.container.put_archive( | |
| str(container_path), tar_stream.getvalue() | |
| ) | |
| except docker.errors.APIError as e: | |
| raise RuntimeError( | |
| f"Failed to copy file {local_path} to container: {e!s}" | |
| ) | |
| if self.tasks: | |
| for task in tqdm(self.tasks, desc="Running tasks"): | |
| self.exec_run(task) | |
| exec = ["python3", "api.py", *list(self.entrypoint.values())] | |
| self.container.exec_run(exec, workdir="/home", detach=True) | |
| logger.info(f"Container started on port {self.port}") | |
| return self | |
| def add( # type: ignore[override] | |
| self, | |
| funcs: Union[FunctionTool, List[FunctionTool]], | |
| entrypoint: str, | |
| redirect_stdout: bool = False, | |
| arguments: Optional[Dict[str, Any]] = None, | |
| ) -> "DockerRuntime": | |
| r"""Add a function or list of functions to the runtime. | |
| Args: | |
| funcs (Union[FunctionTool, List[FunctionTool]]): The function or | |
| list of functions to add. | |
| entrypoint (str): The entrypoint for the function. | |
| redirect_stdout (bool): Whether to return the stdout of | |
| the function. (default: :obj: `False`) | |
| arguments (Optional[Dict[str, Any]]): The arguments for the | |
| function. (default: :obj: `None`) | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| if not isinstance(funcs, list): | |
| funcs = [funcs] | |
| if arguments is not None: | |
| entrypoint += json.dumps(arguments) | |
| for func in funcs: | |
| inner_func = func.func | |
| # Create a wrapper that explicitly binds `func` | |
| def wrapper( | |
| *args, func=func, redirect_stdout=redirect_stdout, **kwargs | |
| ): | |
| for key, value in kwargs.items(): | |
| if isinstance(value, BaseModel): | |
| kwargs[key] = value.model_dump() | |
| resp = requests.post( | |
| f"http://localhost:{self.port}/{func.get_function_name()}", | |
| json=dict( | |
| args=args, | |
| kwargs=kwargs, | |
| redirect_stdout=redirect_stdout, | |
| ), | |
| ) | |
| if resp.status_code != 200: | |
| logger.error( | |
| f"""ailed to execute function: | |
| {func.get_function_name()}, | |
| status code: {resp.status_code}, | |
| response: {resp.text}""" | |
| ) | |
| return { | |
| "error": f"""Failed to execute function: | |
| {func.get_function_name()}, | |
| response: {resp.text}""" | |
| } | |
| data = resp.json() | |
| if redirect_stdout: | |
| print(data["stdout"]) | |
| return json.loads(data["output"]) | |
| func.func = wrapper | |
| self.tools_map[func.get_function_name()] = func | |
| self.entrypoint[func.get_function_name()] = entrypoint | |
| return self | |
| def reset(self) -> "DockerRuntime": | |
| r"""Reset the DockerRuntime instance. | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| return self.stop().build() | |
| def stop(self, remove: Optional[bool] = None) -> "DockerRuntime": | |
| r"""stop the Docker container. | |
| Args: | |
| remove (Optional[bool]): Whether to remove the container | |
| after stopping it. (default: :obj: `None`) | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| if self.container: | |
| self.container.stop() | |
| if remove is None: | |
| remove = self.remove | |
| if remove: | |
| logger.info("Removing container.") | |
| self.container.remove() | |
| self.container = None | |
| else: | |
| logger.warning("No container to stop.") | |
| return self | |
| def ok(self) -> bool: | |
| r"""Check if the API Server is running. | |
| Returns: | |
| bool: Whether the API Server is running. | |
| """ | |
| if not self.container: | |
| return False | |
| try: | |
| _ = requests.get(f"http://localhost:{self.port}") | |
| return True | |
| except requests.exceptions.ConnectionError: | |
| return False | |
| def wait(self, timeout: int = 10) -> bool: | |
| r"""Wait for the API Server to be ready. | |
| Args: | |
| timeout (int): The number of seconds to wait. (default: :obj: `10`) | |
| Returns: | |
| bool: Whether the API Server is ready. | |
| """ | |
| for _ in range(timeout): | |
| if self.ok: | |
| return True | |
| time.sleep(1) | |
| return False | |
| def __enter__(self) -> "DockerRuntime": | |
| r"""Enter the context manager. | |
| Returns: | |
| DockerRuntime: The DockerRuntime instance. | |
| """ | |
| if not self.container: | |
| return self.build() | |
| logger.warning( | |
| "Container already exists. Returning existing container." | |
| ) | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| r"""Exit the context manager.""" | |
| self.stop() | |
| def docs(self) -> str: | |
| r"""Get the URL for the API documentation. | |
| Returns: | |
| str: The URL for the API documentation. | |
| """ | |
| return f"http://localhost:{self.port}/docs" | |