Build Gym-style Interface

Note

Please first complete Basics before continuing this tutorial.

OpenAI Gym is widely used for research on reinforcement learning. It provides a base class gym.Env as the interface for many RL tasks. We are going to showcase how to write a gym-style environment with SAPIEN.

In this tutorial, you will learn the following:

  • Implement a simplified Ant environment based on SAPIEN

  • Save and restore the simulation states

../../_images/ant.gif

gym and transforms3d are required for this example, which can be installed by pip install gym transforms3d. The full code of the Ant environment can be downloaded here ant.py

SapienEnv: base class

Let’s start with a base class SapienEnv, which inherits gym.Env. Similar to MujocoEnv, it is a virtual class with several unimplemented member functions. The full code of the base class can be downloaded here sapien_env.py

import sapien.core as sapien

import gym
from gym.utils import seeding


class SapienEnv(gym.Env):
    """Superclass for Sapien environments."""

    def __init__(self, control_freq, timestep):
        self.control_freq = control_freq  # alias: frame_skip in mujoco_py
        self.timestep = timestep

        self._engine = sapien.Engine()
        self._renderer = sapien.SapienRenderer()
        self._engine.set_renderer(self._renderer)
        self._scene = self._engine.create_scene()
        self._scene.set_timestep(timestep)

        self._build_world()
        self.viewer = None
        self.seed()

    def _build_world(self):
        raise NotImplementedError()

    def _setup_viewer(self):
        raise NotImplementedError()

    # ---------------------------------------------------------------------------- #
    # Override gym functions
    # ---------------------------------------------------------------------------- #
    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def close(self):
        if self.viewer is not None:
            pass  # release viewer

    def render(self, mode='human'):
        if mode == 'human':
            if self.viewer is None:
                self._setup_viewer()
            self._scene.update_render()
            self.viewer.render()
        else:
            raise NotImplementedError('Unsupported render mode {}.'.format(mode))

    # ---------------------------------------------------------------------------- #
    # Utilities
    # ---------------------------------------------------------------------------- #
    def get_actor(self, name) -> sapien.ArticulationBase:
        all_actors = self._scene.get_all_actors()
        actor = [x for x in all_actors if x.name == name]
        if len(actor) > 1:
            raise RuntimeError(f'Not a unique name for actor: {name}')
        elif len(actor) == 0:
            raise RuntimeError(f'Actor not found: {name}')
        return actor[0]

    def get_articulation(self, name) -> sapien.ArticulationBase:
        all_articulations = self._scene.get_all_articulations()
        articulation = [x for x in all_articulations if x.name == name]
        if len(articulation) > 1:
            raise RuntimeError(f'Not a unique name for articulation: {name}')
        elif len(articulation) == 0:
            raise RuntimeError(f'Articulation not found: {name}')
        return articulation[0]

    @property
    def dt(self):
        return self.timestep * self.control_freq

In the constructor, we first set up the engine, scene and renderer. Then, we call self._build_world() to build the simulation world. _build_world is a virtual function to implement. Besides, _setup_viewer is another virtual function used for on-screen visualization.

Note

SAPIEN does not support creating a simulation world from a XML direclty, like Mujoco MJCF. But users can write their own parsers with their preferred formats.

AntEnv: environment

Based on SapienEnv, we can create a gym-style environment AntEnv. First, we need to update the constructor and implement _build_world to build the simulation world. It creates the ground and an ant articulation. The implementation of create_ant is not shown here. The initial state of the actuator (ant) is stored, which will be restored every time the environment is reset.

class AntEnv(SapienEnv):
    def __init__(self):
        super().__init__(control_freq=5, timestep=0.01)

        self.actuator = self.get_articulation('ant')
        self._scene.step()  # simulate one step for steady state
        self._init_state = self.actuator.pack()

        dof = self.actuator.dof
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=[5 + dof + 6 + dof], dtype=np.float32)
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=[dof], dtype=np.float32)
        # Following the original implementation, we scale the action (qf)
        self._action_scale_factor = 50.0

    # ---------------------------------------------------------------------------- #
    # Simulation world
    # ---------------------------------------------------------------------------- #
    def _build_world(self):
        physical_material = self._scene.create_physical_material(1.0, 1.0, 0.0)
        self._scene.default_physical_material = physical_material
        render_material = self._renderer.create_material()
        render_material.set_base_color([0.8, 0.9, 0.8, 1])
        self._scene.add_ground(0.0, render_material=render_material)
        ant = self.create_ant(self._scene)
        ant.set_pose(Pose([0., 0., 0.55]))

    @staticmethod
    def create_ant(scene: sapien.Scene, color=(0.8, 0.6, 0.4), 
                   friction=0.0, damping=1.0, density=20.0):

Furthermore, we need to implement two important virtual functions of gym.Env, step and reset.

    def step(self, action):
        ant = self.actuator

        x_before = ant.pose.p[0]
        ant.set_qf(action * self._action_scale_factor)
        for i in range(self.control_freq):
            self._scene.step()
        x_after = ant.pose.p[0]

        forward_reward = (x_after - x_before) / self.dt
        ctrl_cost = 0.5 * np.square(action).sum()
        survive_reward = 1.0
        # Note that we do not include contact cost as the original version
        reward = forward_reward - ctrl_cost + survive_reward

        state = self.state_vector()
        is_healthy = (np.isfinite(state).all() and 0.2 <= state[2] <= 1.0)
        done = not is_healthy

        obs = self._get_obs()

        return obs, reward, done, dict(
            reward_forward=forward_reward,
            reward_ctrl=-ctrl_cost,
            reward_survive=survive_reward)

    def reset(self):
        self.actuator.unpack(self._init_state)
        # add some random noise
        init_qpos = self.actuator.get_qpos()
        init_qvel = self.actuator.get_qvel()
        qpos = init_qpos + self.np_random.uniform(size=self.actuator.dof, low=-0.1, high=0.1)
        qvel = init_qvel + self.np_random.normal(size=self.actuator.dof) * 0.1
        self.actuator.set_qpos(qpos)
        self.actuator.set_qvel(qvel)
        obs = self._get_obs()
        return obs

step runs one timestep of the environment’s dynamics, and reset resets the state of the environment. For our implementation, we restore the state of the actuator (ant) and add some noise to initial joint states when the environment is reset.

Random Agent

As a gym environment, we can run the environment with a random agent.

def main():
    env = AntEnv()
    env.reset()
    for step in range(1000):
        env.render()
        action = env.action_space.sample()
        obs, reward, done, info = env.step(action)
        if done:
            print(f'Done at step {step}')
            obs = env.reset()
    env.close()