openenv-benchmark / client.py
burtenshaw's picture
burtenshaw HF Staff
Upload folder using huggingface_hub
4729fab verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Benchmark Environment HTTP Client.
This module provides the client for connecting to a Benchmark Environment server
over HTTP. Useful for testing concurrency and infrastructure.
"""
from typing import Dict
from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core.http_env_client import HTTPEnvClient
from .models import BenchmarkAction, BenchmarkObservation
class BenchmarkEnv(HTTPEnvClient[BenchmarkAction, BenchmarkObservation]):
"""
HTTP client for the Benchmark Environment.
This client connects to a BenchmarkEnvironment HTTP server and provides
methods to interact with it: reset(), step(), and state access.
Example:
>>> # Connect to a running server
>>> client = BenchmarkEnv(base_url="http://localhost:8000")
>>> result = client.reset()
>>> print(result.observation.host_url)
>>> print(result.observation.pid)
>>> print(result.observation.session_hash)
>>>
>>> # Test concurrency by waiting
>>> result = client.step(BenchmarkAction(wait_seconds=2.0))
>>> print(result.observation.waited_seconds)
Example with Docker:
>>> # Automatically start container and connect
>>> client = BenchmarkEnv.from_docker_image("benchmark-env:latest")
>>> result = client.reset()
>>> result = client.step(BenchmarkAction(wait_seconds=1.0))
"""
def _step_payload(self, action: BenchmarkAction) -> Dict:
"""
Convert BenchmarkAction to JSON payload for step request.
Args:
action: BenchmarkAction instance
Returns:
Dictionary representation suitable for JSON encoding
"""
return {
"wait_seconds": action.wait_seconds,
}
def _parse_result(self, payload: Dict) -> StepResult[BenchmarkObservation]:
"""
Parse server response into StepResult[BenchmarkObservation].
Args:
payload: JSON response from server
Returns:
StepResult with BenchmarkObservation
"""
obs_data = payload.get("observation", {})
observation = BenchmarkObservation(
host_url=obs_data.get("host_url", ""),
pid=obs_data.get("pid", 0),
session_hash=obs_data.get("session_hash", ""),
waited_seconds=obs_data.get("waited_seconds", 0.0),
timestamp=obs_data.get("timestamp", 0.0),
done=payload.get("done", False),
reward=payload.get("reward"),
metadata=obs_data.get("metadata", {}),
)
return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)
def _parse_state(self, payload: Dict) -> State:
"""
Parse server response into State object.
Args:
payload: JSON response from /state endpoint
Returns:
State object with episode_id and step_count
"""
return State(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)