From adc0b641861b7f8887a6ba6f6ed652141950aa8c Mon Sep 17 00:00:00 2001 From: Benjamin Bolte Date: Fri, 17 Jan 2025 01:10:34 -0800 Subject: [PATCH] real-time --- kos_sim/simulator.py | 20 ++++++++++---------- kos_sim/test_simulator.py | 38 ++++++++++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/kos_sim/simulator.py b/kos_sim/simulator.py index 682bf7e..389a689 100644 --- a/kos_sim/simulator.py +++ b/kos_sim/simulator.py @@ -1,6 +1,7 @@ """Wrapper around MuJoCo simulation.""" import threading +from pathlib import Path import mujoco import mujoco_viewer @@ -13,7 +14,7 @@ class MujocoSimulator: def __init__( self, - model_path: str, + model_path: str | Path, config: SimulatorConfig | None = None, render: bool = False, gravity: bool = True, @@ -23,7 +24,7 @@ def __init__( self._config = config or SimulatorConfig.default() # Load MuJoCo model and initialize data - self._model = mujoco.MjModel.from_xml_path(model_path) + self._model = mujoco.MjModel.from_xml_path(str(model_path)) self._model.opt.timestep = self._config.dt # Use dt from config self._data = mujoco.MjData(self._model) @@ -35,14 +36,6 @@ def __init__( if not self._gravity: self._model.opt.gravity[2] = 0.0 - # Initialize default position from keyframe if available - try: - self._data.qpos = self._model.keyframe("default").qpos - logger.info("Loaded default position from keyframe") - except Exception: - logger.warning("No default keyframe found, using zero initialization") - self._data.qpos = np.zeros_like(self._data.qpos) - # If suspended, store initial position and orientation if self._suspended: # Find the free joint that controls base position and orientation @@ -114,6 +107,9 @@ def step(self) -> None: self._data.qvel[i : i + 6] = 0 break + return self._data + + def render(self) -> None: if self._render_enabled: self._viewer.render() @@ -197,3 +193,7 @@ def close(self) -> None: except Exception as e: logger.error("Error closing viewer: %s", e) self._viewer = None + + @property + def timestep(self) -> float: + return self._model.opt.timestep diff --git a/kos_sim/test_simulator.py b/kos_sim/test_simulator.py index c688b65..6f6a6ab 100644 --- a/kos_sim/test_simulator.py +++ b/kos_sim/test_simulator.py @@ -1,32 +1,50 @@ """Test script for the simulator.""" import argparse -import time +import asyncio + +from kscale import K from kos_sim.simulator import MujocoSimulator -def test_simulation(model_path: str, duration: float = 5.0, speed: float = 1.0, render: bool = True) -> None: - simulator = MujocoSimulator(model_path, render=render) +async def test_simulation(model_name: str, duration: float = 5.0, speed: float = 1.0, render: bool = True) -> None: + api = K() + bot_dir = await api.download_and_extract_urdf(model_name) + bot_mjcf = next(bot_dir.glob("*.mjcf")) + + simulator = MujocoSimulator(bot_mjcf, render=render) + + timestep = simulator.timestep + initial_update = last_update = asyncio.get_event_loop().time() + + while True: + current_time = asyncio.get_event_loop().time() + if current_time - initial_update > duration: + break + + sim_time = current_time - last_update + last_update = current_time + while sim_time > 0: + simulator.step() + sim_time -= timestep - for _ in range(int(duration / 0.001)): - simulator.step() - time.sleep(0.001) + simulator.render() simulator.close() -def main() -> None: +async def main() -> None: parser = argparse.ArgumentParser(description="Test the MuJoCo simulation.") - parser.add_argument("--model-path", type=str, required=True, help="Path to MuJoCo XML model file") + parser.add_argument("model_name", type=str, help="Name of the model to simulate") parser.add_argument("--duration", type=float, default=5.0, help="Duration to run simulation (seconds)") parser.add_argument("--speed", type=float, default=1.0, help="Simulation speed multiplier") parser.add_argument("--no-render", action="store_true", help="Disable rendering") args = parser.parse_args() - test_simulation(args.model_path, duration=args.duration, speed=args.speed, render=not args.no_render) + await test_simulation(args.model_name, duration=args.duration, speed=args.speed, render=not args.no_render) if __name__ == "__main__": # python -m kos_sim.test_simulator - main() + asyncio.run(main())