OpenAI Gym Environments — Deep Dive

The Env protocol in detail

Every Gymnasium environment implements the gymnasium.Env abstract class. The contract is:

import gymnasium as gym
from gymnasium import spaces
import numpy as np

class GridWorld(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, size: int = 5, render_mode: str | None = None):
        super().__init__()
        self.size = size
        self.render_mode = render_mode

        # Where the agent is and where it wants to go
        self._agent_location = np.array([0, 0])
        self._target_location = np.array([size - 1, size - 1])

        # Spaces
        self.observation_space = spaces.Dict({
            "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
            "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
        })
        self.action_space = spaces.Discrete(4)  # up, right, down, left

    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}

    def _get_info(self):
        return {"distance": np.linalg.norm(
            self._agent_location - self._target_location, ord=1
        )}

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self._agent_location = np.array([0, 0])
        self._target_location = self.np_random.integers(0, self.size, size=2)
        return self._get_obs(), self._get_info()

    def step(self, action):
        direction = {0: [0, 1], 1: [1, 0], 2: [0, -1], 3: [-1, 0]}[action]
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )
        terminated = np.array_equal(self._agent_location, self._target_location)
        reward = 1.0 if terminated else 0.0
        return self._get_obs(), reward, terminated, False, self._get_info()

Key points:

  • reset returns (obs, info) — the extra info dict is mandatory since Gymnasium v0.26.
  • step returns (obs, reward, terminated, truncated, info) — the split between terminated (task ended naturally) and truncated (hit a time limit) replaced the old single done flag.
  • seed is passed to reset, not to the constructor, enabling reproducible episodes.

Observation and action spaces

Spaces are more than labels; they carry shape, dtype, and bounds that libraries like Stable-Baselines3 read automatically.

# Continuous actions between -2.0 and 2.0
action_space = spaces.Box(low=-2.0, high=2.0, shape=(3,), dtype=np.float32)

# Multi-binary: 10 independent on/off switches
action_space = spaces.MultiBinary(10)

# Tuple of heterogeneous spaces
observation_space = spaces.Tuple((
    spaces.Discrete(5),
    spaces.Box(0, 1, shape=(3, 84, 84), dtype=np.float32),
))

The sample() method on any space generates a valid random element, which is useful for smoke-testing your environment:

assert env.observation_space.contains(env.observation_space.sample())

Writing robust custom environments

Registration

Register your environment so it can be instantiated with gymnasium.make:

gymnasium.register(
    id="GridWorld-v0",
    entry_point="my_package.envs:GridWorld",
    max_episode_steps=200,
)

max_episode_steps automatically wraps the environment with TimeLimit, adding truncation.

The check_env utility

Gymnasium ships a linter that validates your implementation:

from gymnasium.utils.env_checker import check_env
check_env(GridWorld())  # raises on contract violations

Run this in CI. It catches subtle bugs like returning a numpy scalar instead of a float for the reward.

Deterministic seeding

Always call super().reset(seed=seed) and use self.np_random (a seeded NumPy Generator) for all randomness inside your environment. This guarantees reproducible rollouts.

Wrappers in depth

Wrappers subclass gymnasium.Wrapper (or specialised bases like ObservationWrapper, RewardWrapper, ActionWrapper).

class NormalizeReward(gymnasium.RewardWrapper):
    def __init__(self, env, scale: float = 0.01):
        super().__init__(env)
        self.scale = scale

    def reward(self, reward):
        return reward * self.scale

Common built-in wrappers:

WrapperPurpose
TimeLimitTruncate episodes after N steps
RecordVideoSave mp4 of rendered frames
FlattenObservationFlatten Dict/Tuple obs to a 1-D array
FrameStackStack last N frames for Atari-style input
NormalizeObservationRunning mean/std normalisation
ClipActionClamp actions to the action space bounds
RescaleActionLinearly rescale actions to a new range

Order matters: wrappers applied last are called first during step.

Vectorised environments

Training RL at scale means running many environments in parallel. Gymnasium provides gymnasium.vector.SyncVectorEnv and AsyncVectorEnv:

envs = gymnasium.make_vec("CartPole-v1", num_envs=8, vectorization_mode="sync")
obs, info = envs.reset()
# obs shape: (8, 4) — batch of 8 observations
actions = envs.action_space.sample()  # shape: (8,)
obs, rewards, terminated, truncated, info = envs.step(actions)

AsyncVectorEnv runs each environment in a separate process, which helps when step is expensive (physics simulations, rendering). The trade-off is inter-process communication overhead.

Auto-reset

Vectorised environments auto-reset individual sub-environments when they terminate. The final observation of a terminated episode is stored in info["final_observation"] so the agent can compute the last value estimate. Missing this detail causes subtle value function bugs.

Performance considerations

  • Avoid Python-heavy step logic. If your environment does per-step physics, consider writing the hot path in C/Cython or calling an existing engine (MuJoCo, PyBullet).
  • Use rgb_array only when recording. Rendering every frame to an array is slow; pass render_mode=None during training.
  • Profile with cProfile on the step method. A slow environment bottlenecks training more than a slow policy network.
  • Batch computation with NumPy. In vectorised envs, compute rewards and transitions as array operations rather than Python loops.

Testing strategies

  1. Deterministic rollout test — reset with a fixed seed, run a known action sequence, and assert exact observations and rewards.
  2. Space conformance — verify every returned observation is inside observation_space using env.observation_space.contains(obs).
  3. Wrapper parity — compare wrapped and unwrapped environments on the same seed to confirm wrappers only change what they claim.
  4. Stress test — run thousands of random episodes and assert no exceptions, memory leaks, or NaN rewards.

Real-world integration patterns

Connecting Gym to real hardware (a robot arm, a drone) means the step function sends a command and reads sensors instead of updating a simulation matrix. The API stays the same, but you add:

  • Safety limits in step that clamp actions before sending to actuators.
  • Async observation handling because sensors may lag behind commands.
  • Graceful shutdown in close() to park actuators in a safe position.

The beauty of the Gym interface is that a policy trained in simulation can be dropped onto real hardware with zero changes to the agent code — only the environment object changes.

The one thing to remember: Gymnasium’s Env contract — reset plus step returning (obs, reward, terminated, truncated, info) — is the universal socket that connects any RL algorithm to any world, simulated or physical.

pythonreinforcement-learningaisimulation

See Also