OpenAI Gym Environments — Deep Dive
The Env protocol in detail
Every Gymnasium environment implements the gymnasium.Env abstract class. The contract is:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
class GridWorld(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
def __init__(self, size: int = 5, render_mode: str | None = None):
super().__init__()
self.size = size
self.render_mode = render_mode
# Where the agent is and where it wants to go
self._agent_location = np.array([0, 0])
self._target_location = np.array([size - 1, size - 1])
# Spaces
self.observation_space = spaces.Dict({
"agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
"target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
})
self.action_space = spaces.Discrete(4) # up, right, down, left
def _get_obs(self):
return {"agent": self._agent_location, "target": self._target_location}
def _get_info(self):
return {"distance": np.linalg.norm(
self._agent_location - self._target_location, ord=1
)}
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._agent_location = np.array([0, 0])
self._target_location = self.np_random.integers(0, self.size, size=2)
return self._get_obs(), self._get_info()
def step(self, action):
direction = {0: [0, 1], 1: [1, 0], 2: [0, -1], 3: [-1, 0]}[action]
self._agent_location = np.clip(
self._agent_location + direction, 0, self.size - 1
)
terminated = np.array_equal(self._agent_location, self._target_location)
reward = 1.0 if terminated else 0.0
return self._get_obs(), reward, terminated, False, self._get_info()
Key points:
resetreturns(obs, info)— the extrainfodict is mandatory since Gymnasium v0.26.stepreturns(obs, reward, terminated, truncated, info)— the split betweenterminated(task ended naturally) andtruncated(hit a time limit) replaced the old singledoneflag.seedis passed toreset, not to the constructor, enabling reproducible episodes.
Observation and action spaces
Spaces are more than labels; they carry shape, dtype, and bounds that libraries like Stable-Baselines3 read automatically.
# Continuous actions between -2.0 and 2.0
action_space = spaces.Box(low=-2.0, high=2.0, shape=(3,), dtype=np.float32)
# Multi-binary: 10 independent on/off switches
action_space = spaces.MultiBinary(10)
# Tuple of heterogeneous spaces
observation_space = spaces.Tuple((
spaces.Discrete(5),
spaces.Box(0, 1, shape=(3, 84, 84), dtype=np.float32),
))
The sample() method on any space generates a valid random element, which is useful for smoke-testing your environment:
assert env.observation_space.contains(env.observation_space.sample())
Writing robust custom environments
Registration
Register your environment so it can be instantiated with gymnasium.make:
gymnasium.register(
id="GridWorld-v0",
entry_point="my_package.envs:GridWorld",
max_episode_steps=200,
)
max_episode_steps automatically wraps the environment with TimeLimit, adding truncation.
The check_env utility
Gymnasium ships a linter that validates your implementation:
from gymnasium.utils.env_checker import check_env
check_env(GridWorld()) # raises on contract violations
Run this in CI. It catches subtle bugs like returning a numpy scalar instead of a float for the reward.
Deterministic seeding
Always call super().reset(seed=seed) and use self.np_random (a seeded NumPy Generator) for all randomness inside your environment. This guarantees reproducible rollouts.
Wrappers in depth
Wrappers subclass gymnasium.Wrapper (or specialised bases like ObservationWrapper, RewardWrapper, ActionWrapper).
class NormalizeReward(gymnasium.RewardWrapper):
def __init__(self, env, scale: float = 0.01):
super().__init__(env)
self.scale = scale
def reward(self, reward):
return reward * self.scale
Common built-in wrappers:
| Wrapper | Purpose |
|---|---|
TimeLimit | Truncate episodes after N steps |
RecordVideo | Save mp4 of rendered frames |
FlattenObservation | Flatten Dict/Tuple obs to a 1-D array |
FrameStack | Stack last N frames for Atari-style input |
NormalizeObservation | Running mean/std normalisation |
ClipAction | Clamp actions to the action space bounds |
RescaleAction | Linearly rescale actions to a new range |
Order matters: wrappers applied last are called first during step.
Vectorised environments
Training RL at scale means running many environments in parallel. Gymnasium provides gymnasium.vector.SyncVectorEnv and AsyncVectorEnv:
envs = gymnasium.make_vec("CartPole-v1", num_envs=8, vectorization_mode="sync")
obs, info = envs.reset()
# obs shape: (8, 4) — batch of 8 observations
actions = envs.action_space.sample() # shape: (8,)
obs, rewards, terminated, truncated, info = envs.step(actions)
AsyncVectorEnv runs each environment in a separate process, which helps when step is expensive (physics simulations, rendering). The trade-off is inter-process communication overhead.
Auto-reset
Vectorised environments auto-reset individual sub-environments when they terminate. The final observation of a terminated episode is stored in info["final_observation"] so the agent can compute the last value estimate. Missing this detail causes subtle value function bugs.
Performance considerations
- Avoid Python-heavy step logic. If your environment does per-step physics, consider writing the hot path in C/Cython or calling an existing engine (MuJoCo, PyBullet).
- Use
rgb_arrayonly when recording. Rendering every frame to an array is slow; passrender_mode=Noneduring training. - Profile with
cProfileon thestepmethod. A slow environment bottlenecks training more than a slow policy network. - Batch computation with NumPy. In vectorised envs, compute rewards and transitions as array operations rather than Python loops.
Testing strategies
- Deterministic rollout test — reset with a fixed seed, run a known action sequence, and assert exact observations and rewards.
- Space conformance — verify every returned observation is inside
observation_spaceusingenv.observation_space.contains(obs). - Wrapper parity — compare wrapped and unwrapped environments on the same seed to confirm wrappers only change what they claim.
- Stress test — run thousands of random episodes and assert no exceptions, memory leaks, or NaN rewards.
Real-world integration patterns
Connecting Gym to real hardware (a robot arm, a drone) means the step function sends a command and reads sensors instead of updating a simulation matrix. The API stays the same, but you add:
- Safety limits in
stepthat clamp actions before sending to actuators. - Async observation handling because sensors may lag behind commands.
- Graceful shutdown in
close()to park actuators in a safe position.
The beauty of the Gym interface is that a policy trained in simulation can be dropped onto real hardware with zero changes to the agent code — only the environment object changes.
The one thing to remember: Gymnasium’s Env contract — reset plus step returning (obs, reward, terminated, truncated, info) — is the universal socket that connects any RL algorithm to any world, simulated or physical.
See Also
- Python Environment Wrappers How thin add-on layers let you change what a learning program sees and does without rewriting the game itself
- Python Monte Carlo Tree Search The clever trick behind AlphaGo — how a program explores millions of possible moves by playing quick random games against itself
- Python Multi Agent Reinforcement What happens when multiple programs learn together in the same world — cooperation, competition, and emergent teamwork
- Python Policy Gradient Methods Instead of scoring every move, what if the program just learned which moves feel right? That is policy gradients
- Python Q Learning Implementation How a program builds a cheat sheet of every situation and every action to figure out the best move — no teacher required