Add mujoco playground

This commit is contained in:
Nam-dada
2025-04-10 17:28:09 +08:00
parent d2baa4bbff
commit d8ad708e5b

View File

@@ -0,0 +1,119 @@
import jax.numpy as jnp
from jax import Array
from mujoco_playground import registry
from .rl_jit import RLEnv, norm_obs
class MujocoEnv(RLEnv):
def __init__(
self, env_name: str = "SwimmerSwimmer6", *args, **kwargs
):
super().__init__(*args, **kwargs)
self.env_name = env_name
self.env = registry.load(env_name=env_name)
def env_step(self, randkey, env_state, action):
state = self.env.step(env_state, action)
obs = state.obs
if not isinstance(obs, Array):
if "state" in obs:
obs = obs["state"]
else:
raise ImportError(
f"This Pytree observation space is not supported yet: {obs}"
)
return obs, state, state.reward, state.done.astype(jnp.bool_), state.info
def env_reset(self, randkey):
init_state = self.env.reset(randkey)
obs = init_state.obs
if not isinstance(obs, Array):
if "state" in obs:
obs = obs["state"]
else:
raise ImportError(
f"This Pytree observation space is not supported yet: {obs}"
)
return obs, init_state
@property
def input_shape(self):
return (self.env.observation_size,)
@property
def output_shape(self):
return (self.env.action_size,)
def show(
self,
state,
randkey,
act_func,
params,
save_path=None,
height=480,
width=480,
output_type="rgb_array",
*args,
**kwargs,
):
assert output_type in ["gif", "mp4"]
import jax
import imageio
from brax.io import image
import numpy as np
obs, env_state = self.reset(randkey)
reward, done = 0.0, False
state_histories = [env_state.pipeline_state]
def step(key, env_state, obs):
key, _ = jax.random.split(key)
if self.obs_normalization:
obs = norm_obs(state, obs)
if self.action_policy is not None:
forward_func = lambda obs: act_func(state, params, obs)
action = self.action_policy(key, forward_func, obs)
else:
action = act_func(state, params, obs)
obs, env_state, r, done, info = self.step(randkey, env_state, action)
return key, env_state, obs, r, done
jit_step = jax.jit(step)
for _ in range(self.max_step):
key, env_state, obs, r, done = jit_step(randkey, env_state, obs)
state_histories.append(env_state.pipeline_state)
reward += r
if done:
break
print("Total reward: ", reward)
try:
imgs = image.render_array(
sys=self.env.sys, trajectory=state_histories, height=height, width=width, camera="track"
)
except ValueError:
imgs = image.render_array(
sys=self.env.sys, trajectory=state_histories, height=height, width=width
)
if save_path is None:
save_path = f"{self.env_name}.{output_type}"
imageio.mimsave(save_path, imgs, *args, **kwargs)
if output_type == "gif":
imageio.mimsave(save_path, imgs, *args, **kwargs)
elif output_type == "mp4":
fps = kwargs.get("fps", 30)
imageio.mimsave(save_path, imgs, fps=fps, codec="libx264", format="mp4")
print(f"{output_type} saved to: ", save_path)