diff --git a/kos_sim/simulator.py b/kos_sim/simulator.py index 389a689..57409ae 100644 --- a/kos_sim/simulator.py +++ b/kos_sim/simulator.py @@ -1,11 +1,14 @@ """Wrapper around MuJoCo simulation.""" +import argparse +import asyncio import threading from pathlib import Path import mujoco import mujoco_viewer import numpy as np +from kscale import K from kos_sim import logger from kos_sim.config import SimulatorConfig @@ -197,3 +200,52 @@ def close(self) -> None: @property def timestep(self) -> float: return self._model.opt.timestep + + +async def test_simulation_adhoc( + model_name: str, duration: float = 5.0, speed: float = 1.0, render: bool = True +) -> None: + api = K() + bot_dir = await api.download_and_extract_urdf(model_name) + bot_mjcf = next(bot_dir.glob("*.mjcf")) + + simulator = MujocoSimulator(bot_mjcf, render=render) + + timestep = simulator.timestep + initial_update = last_update = asyncio.get_event_loop().time() + + while True: + current_time = asyncio.get_event_loop().time() + if current_time - initial_update > duration: + break + + sim_time = current_time - last_update + last_update = current_time + while sim_time > 0: + simulator.step() + sim_time -= timestep + + simulator.render() + + simulator.close() + + +async def main() -> None: + parser = argparse.ArgumentParser(description="Test the MuJoCo simulation.") + parser.add_argument("model_name", type=str, help="Name of the model to simulate") + parser.add_argument("--duration", type=float, default=5.0, help="Duration to run simulation (seconds)") + parser.add_argument("--speed", type=float, default=1.0, help="Simulation speed multiplier") + parser.add_argument("--no-render", action="store_true", help="Disable rendering") + + args = parser.parse_args() + await test_simulation_adhoc( + args.model_name, + duration=args.duration, + speed=args.speed, + render=not args.no_render, + ) + + +if __name__ == "__main__": + # python -m kos_sim.simulator + asyncio.run(main()) diff --git a/kos_sim/test_simulator.py b/kos_sim/test_simulator.py deleted file mode 100644 index 6f6a6ab..0000000 --- a/kos_sim/test_simulator.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Test script for the simulator.""" - -import argparse -import asyncio - -from kscale import K - -from kos_sim.simulator import MujocoSimulator - - -async def test_simulation(model_name: str, duration: float = 5.0, speed: float = 1.0, render: bool = True) -> None: - api = K() - bot_dir = await api.download_and_extract_urdf(model_name) - bot_mjcf = next(bot_dir.glob("*.mjcf")) - - simulator = MujocoSimulator(bot_mjcf, render=render) - - timestep = simulator.timestep - initial_update = last_update = asyncio.get_event_loop().time() - - while True: - current_time = asyncio.get_event_loop().time() - if current_time - initial_update > duration: - break - - sim_time = current_time - last_update - last_update = current_time - while sim_time > 0: - simulator.step() - sim_time -= timestep - - simulator.render() - - simulator.close() - - -async def main() -> None: - parser = argparse.ArgumentParser(description="Test the MuJoCo simulation.") - parser.add_argument("model_name", type=str, help="Name of the model to simulate") - parser.add_argument("--duration", type=float, default=5.0, help="Duration to run simulation (seconds)") - parser.add_argument("--speed", type=float, default=1.0, help="Simulation speed multiplier") - parser.add_argument("--no-render", action="store_true", help="Disable rendering") - - args = parser.parse_args() - await test_simulation(args.model_name, duration=args.duration, speed=args.speed, render=not args.no_render) - - -if __name__ == "__main__": - # python -m kos_sim.test_simulator - asyncio.run(main()) diff --git a/kos_sim/test_viewer.py b/kos_sim/test_viewer.py deleted file mode 100644 index 9af82f4..0000000 --- a/kos_sim/test_viewer.py +++ /dev/null @@ -1,71 +0,0 @@ -"""Test script for the viewer. - -This script is used to test the viewer. - -Usage: - -python -m kos_sim.test_viewer -""" - -import asyncio -import math - -import mujoco -import mujoco_viewer -from kscale import K - - -async def main() -> None: - api = K() - - # Gets the base path. - bot_dir = await api.download_and_extract_urdf("kbot-v1") - # bot_dir = await api.download_and_extract_urdf("zbot-v2") - bot_mjcf = next(bot_dir.glob("*.mjcf")) - - model = mujoco.MjModel.from_xml_path(str(bot_mjcf)) - data = mujoco.MjData(model) - - # create the viewer object - viewer = mujoco_viewer.MujocoViewer(model, data) - - # Get simulation timestep - timestep = model.opt.timestep - last_update = asyncio.get_event_loop().time() - start_time = last_update - - mujoco.mj_step(model, data) - - # simulate and render - for _ in range(100000): - if viewer.is_alive: - current_time = asyncio.get_event_loop().time() - sim_time = current_time - last_update - - # Calculate elapsed time since start for sinusoidal motion - elapsed_time = current_time - start_time - - # Apply sinusoidal motion to each joint - # Adjust amplitude and frequency as needed - amplitude = 3.14159 # radians - frequency = 0.5 # Hz - for i in range(model.nu): - data.ctrl[i] = amplitude * math.sin(2 * math.pi * frequency * elapsed_time) - - # Step the simulation to match real time - while sim_time > 0: - mujoco.mj_step(model, data) - sim_time -= timestep - - last_update = current_time - viewer.render() - else: - break - - # close - viewer.close() - - -if __name__ == "__main__": - # python -m kos_sim.test_viewer - asyncio.run(main())