Build Gym-style Interface¶
Note
Please first complete Basic before continuing this tutorial.
OpenAI Gym is widely used for research on reinforcement learning.
It provides a base class gym.Env
as the interface for many RL tasks.
We are going to showcase how to write a gym-style environment with SAPIEN.
In this tutorial, you will learn the following:
Implement a simplified Ant environment based on SAPIEN
Save and restore the simulation states
gym
and transforms3d
are required for this example, which can be installed by pip install gym transforms3d
.
The full code of the Ant environment can be downloaded here ant.py
SapienEnv: base class¶
Let’s start with a base class SapienEnv
, which inherits gym.Env
.
Similar to MujocoEnv, it is a virtual class with several unimplemented member functions.
The full code of the base class can be downloaded here sapien_env.py
import sapien.core as sapien
import gym
from gym.utils import seeding
class SapienEnv(gym.Env):
"""Superclass for Sapien environments."""
def __init__(self, control_freq, timestep):
self.control_freq = control_freq # alias: frame_skip in mujoco_py
self.timestep = timestep
self._engine = sapien.Engine()
self._renderer = sapien.VulkanRenderer()
self._engine.set_renderer(self._renderer)
self._scene = self._engine.create_scene()
self._scene.set_timestep(timestep)
self._build_world()
self.viewer = None
self.seed()
def _build_world(self):
raise NotImplementedError()
def _setup_viewer(self):
raise NotImplementedError()
# ---------------------------------------------------------------------------- #
# Override gym functions
# ---------------------------------------------------------------------------- #
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close(self):
if self.viewer is not None:
pass # release viewer
def render(self, mode='human'):
if mode == 'human':
if self.viewer is None:
self._setup_viewer()
self._scene.update_render()
self.viewer.render()
else:
raise NotImplementedError('Unsupported render mode {}.'.format(mode))
# ---------------------------------------------------------------------------- #
# Utilities
# ---------------------------------------------------------------------------- #
def get_actor(self, name) -> sapien.ArticulationBase:
all_actors = self._scene.get_all_actors()
actor = [x for x in all_actors if x.name == name]
if len(actor) > 1:
raise RuntimeError(f'Not a unique name for actor: {name}')
elif len(actor) == 0:
raise RuntimeError(f'Actor not found: {name}')
return actor[0]
def get_articulation(self, name) -> sapien.ArticulationBase:
all_articulations = self._scene.get_all_articulations()
articulation = [x for x in all_articulations if x.name == name]
if len(articulation) > 1:
raise RuntimeError(f'Not a unique name for articulation: {name}')
elif len(articulation) == 0:
raise RuntimeError(f'Articulation not found: {name}')
return articulation[0]
@property
def dt(self):
return self.timestep * self.control_freq
In the constructor, we first set up the engine, scene and renderer.
Then, we call self._build_world()
to build the simulation world.
_build_world
is a virtual function to implement.
Besides, _setup_viewer
is another virtual function used for on-screen visualization.
Note
SAPIEN does not support creating a simulation world from a XML direclty, like Mujoco MJCF. But users can write their own parsers with their preferred formats.
AntEnv: environment¶
Based on SapienEnv
, we can create a gym-style environment AntEnv
.
First, we need to update the constructor and implement _build_world
to build the simulation world.
It creates the ground and an ant articulation.
The implementation of create_ant
is not shown here.
The initial state of the actuator (ant) is stored, which will be restored every time the environment is reset.
class AntEnv(SapienEnv):
def __init__(self):
super().__init__(control_freq=5, timestep=0.01)
self.actuator = self.get_articulation('ant')
self._scene.step() # simulate one step for steady state
self._init_state = self.actuator.pack()
dof = self.actuator.dof
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=[5 + dof + 6 + dof], dtype=np.float32)
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=[dof], dtype=np.float32)
# Following the original implementation, we scale the action (qf)
self._action_scale_factor = 50.0
# ---------------------------------------------------------------------------- #
# Simulation world
# ---------------------------------------------------------------------------- #
def _build_world(self):
physical_material = self._scene.create_physical_material(1.0, 1.0, 0.0)
self._scene.default_physical_material = physical_material
render_material = self._renderer.create_material()
render_material.set_base_color([0.8, 0.9, 0.8, 1])
self._scene.add_ground(0.0, render_material=render_material)
ant = self.create_ant(self._scene)
ant.set_pose(Pose([0., 0., 0.55]))
@staticmethod
def create_ant(scene: sapien.Scene, color=(0.8, 0.6, 0.4),
friction=0.0, damping=1.0, density=20.0):
Furthermore, we need to implement two important virtual functions of gym.Env
, step
and reset
.
def step(self, action):
ant = self.actuator
x_before = ant.pose.p[0]
ant.set_qf(action * self._action_scale_factor)
for i in range(self.control_freq):
self._scene.step()
x_after = ant.pose.p[0]
forward_reward = (x_after - x_before) / self.dt
ctrl_cost = 0.5 * np.square(action).sum()
survive_reward = 1.0
# Note that we do not include contact cost as the original version
reward = forward_reward - ctrl_cost + survive_reward
state = self.state_vector()
is_healthy = (np.isfinite(state).all() and 0.2 <= state[2] <= 1.0)
done = not is_healthy
obs = self._get_obs()
return obs, reward, done, dict(
reward_forward=forward_reward,
reward_ctrl=-ctrl_cost,
reward_survive=survive_reward)
def reset(self):
self.actuator.unpack(self._init_state)
# add some random noise
init_qpos = self.actuator.get_qpos()
init_qvel = self.actuator.get_qvel()
qpos = init_qpos + self.np_random.uniform(size=self.actuator.dof, low=-0.1, high=0.1)
qvel = init_qvel + self.np_random.normal(size=self.actuator.dof) * 0.1
self.actuator.set_qpos(qpos)
self.actuator.set_qvel(qvel)
obs = self._get_obs()
return obs
step
runs one timestep of the environment’s dynamics, and reset
resets the state of the environment.
For our implementation, we restore the state of the actuator (ant) and add some noise to initial joint states when the environment is reset.
Random Agent¶
As a gym environment, we can run the environment with a random agent.
def main():
env = AntEnv()
env.reset()
for step in range(1000):
env.render()
action = env.action_space.sample()
obs, reward, done, info = env.step(action)
if done:
print(f'Done at step {step}')
obs = env.reset()
env.close()