Spaces:
Sleeping
Sleeping
File size: 3,407 Bytes
4729fab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Benchmark Environment HTTP Client.
This module provides the client for connecting to a Benchmark Environment server
over HTTP. Useful for testing concurrency and infrastructure.
"""
from typing import Dict
from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core.http_env_client import HTTPEnvClient
from .models import BenchmarkAction, BenchmarkObservation
class BenchmarkEnv(HTTPEnvClient[BenchmarkAction, BenchmarkObservation]):
"""
HTTP client for the Benchmark Environment.
This client connects to a BenchmarkEnvironment HTTP server and provides
methods to interact with it: reset(), step(), and state access.
Example:
>>> # Connect to a running server
>>> client = BenchmarkEnv(base_url="http://localhost:8000")
>>> result = client.reset()
>>> print(result.observation.host_url)
>>> print(result.observation.pid)
>>> print(result.observation.session_hash)
>>>
>>> # Test concurrency by waiting
>>> result = client.step(BenchmarkAction(wait_seconds=2.0))
>>> print(result.observation.waited_seconds)
Example with Docker:
>>> # Automatically start container and connect
>>> client = BenchmarkEnv.from_docker_image("benchmark-env:latest")
>>> result = client.reset()
>>> result = client.step(BenchmarkAction(wait_seconds=1.0))
"""
def _step_payload(self, action: BenchmarkAction) -> Dict:
"""
Convert BenchmarkAction to JSON payload for step request.
Args:
action: BenchmarkAction instance
Returns:
Dictionary representation suitable for JSON encoding
"""
return {
"wait_seconds": action.wait_seconds,
}
def _parse_result(self, payload: Dict) -> StepResult[BenchmarkObservation]:
"""
Parse server response into StepResult[BenchmarkObservation].
Args:
payload: JSON response from server
Returns:
StepResult with BenchmarkObservation
"""
obs_data = payload.get("observation", {})
observation = BenchmarkObservation(
host_url=obs_data.get("host_url", ""),
pid=obs_data.get("pid", 0),
session_hash=obs_data.get("session_hash", ""),
waited_seconds=obs_data.get("waited_seconds", 0.0),
timestamp=obs_data.get("timestamp", 0.0),
done=payload.get("done", False),
reward=payload.get("reward"),
metadata=obs_data.get("metadata", {}),
)
return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)
def _parse_state(self, payload: Dict) -> State:
"""
Parse server response into State object.
Args:
payload: JSON response from /state endpoint
Returns:
State object with episode_id and step_count
"""
return State(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)
|