-
Notifications
You must be signed in to change notification settings - Fork 2.6k
async writing for mimic datagen #3951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR implements asynchronous HDF5 writing for mimic synthetic data generation to avoid blocking the environment loop during file I/O. The implementation adds a producer-consumer pattern with a background writer thread and early CPU offloading to reduce VRAM usage.
Key Changes:
- New
AsyncWriterclass with persistent HDF5 file handles and thread-safe locking AsyncWriterRecorderterm that integrates with the recorder manager- Early CPU offload flag in
EpisodeDatato immediately move tensors from GPU to CPU - Producer-consumer queue pattern in
RecorderManagerfor async export - New entry script
generate_dataset_async.pyfor async workflow
Performance Claims:
- 30 envs/100 trials: ~8% time reduction (3700s → 3400s), 45% VRAM reduction (22.7GB → 12.5GB)
- 10 envs/50 trials: ~6% time reduction (2050s → 1930s), 34% VRAM reduction (17.4GB → 11.6GB)
Critical Issues Found:
- Multiple data race conditions where episode buffers are cleared while background threads/tasks are still reading them (recorder_manager.py:502, async_writer_recorder.py:126, 136)
- Async function called synchronously without await, causing race condition
- Queue overflow risk with fixed size of 256 episodes
- Missing early_cpu_offload flag propagation in one location
Confidence Score: 1/5
- This PR contains critical data race bugs that will cause data corruption and crashes in production
- Multiple race conditions where episode data is accessed after being cleared, which will cause corrupted datasets, crashes, or silent data loss. The async function is called without await, and live episode objects are enqueued without cloning. These are fundamental concurrency bugs that must be fixed before merge.
- Pay immediate attention to
source/isaaclab/isaaclab/managers/recorder_manager.pyandsource/isaaclab_mimic/isaaclab_mimic/datagen/async_writer_recorder.py- both contain critical data races
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| source/isaaclab/isaaclab/managers/recorder_manager.py | 2/5 | Adds async writing infrastructure with critical data race bug at line 502 where live episode object is enqueued without cloning, plus potential queue overflow issue |
| source/isaaclab_mimic/isaaclab_mimic/datagen/async_writer_recorder.py | 1/5 | New recorder term with multiple data race issues: async function called synchronously at line 126, shallow snapshot at line 136, missing early_cpu_offload flag at line 129 |
| source/isaaclab_mimic/isaaclab_mimic/async_writer.py | 3/5 | New async writer implementation with persistent HDF5 file handles and threading locks; potential issue with file creation logic at line 558 |
Sequence Diagram
sequenceDiagram
participant Env as Environment Loop
participant RM as RecorderManager
participant AW as AsyncWriterRecorder
participant Queue as Writer Queue
participant Thread as Writer Thread
participant Writer as AsyncWriter
participant HDF5 as HDF5 File
Env->>RM: record_post_step()
RM->>RM: add data to episode buffer (GPU)
Note over RM: Early CPU offload enabled
RM->>RM: tensors moved to CPU immediately
Env->>RM: record_pre_reset(env_ids)
RM->>AW: record_pre_reset(env_ids)
Note over AW,RM: DATA RACE: Episode not cloned
AW->>AW: schedule_async_write_for_episode(episode)
AW->>AW: snapshot = episode.data (shallow)
par Async Task Created
AW->>AW: asyncio.create_task(_do_write)
and Episode Cleared
AW->>RM: clear episode buffer
RM->>RM: episodes[env_id] = EpisodeData()
end
Note over AW: Race condition: task may read<br/>cleared episode data
RM->>RM: export_episodes(env_ids)
alt Async Writing Path
Note over RM,Queue: Another data race
RM->>Queue: put_nowait(env_id, episode)
RM->>RM: episodes[env_id] = EpisodeData()
Thread->>Queue: get() blocking
Thread->>AW: schedule_sync_write_for_episode
AW->>Writer: _write_trajectory_data_hdf5
Writer->>Writer: acquire _file_lock
Writer->>HDF5: write demo to persistent file handle
Writer->>Writer: release _file_lock
else Sync Writing Path
RM->>HDF5: write_episode (blocking)
end
Note over Env: On shutdown
Env->>AW: flush_async()
AW->>AW: await all pending tasks
Env->>RM: close()
RM->>Thread: stop consumer thread
RM->>Writer: close()
Writer->>HDF5: close all file handles
8 files reviewed, 6 comments
| ) | ||
| # snapshot and enqueue | ||
| #snapshot = self._snapshot_episode_for_async(episode, env_id) | ||
| self._writer_queue.put_nowait((env_id, episode)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Directly enqueuing live episode object creates data race - episode buffer gets cleared at line 509 while background thread may still be reading it
| self._writer_queue.put_nowait((env_id, episode)) | |
| snapshot = self._snapshot_episode_for_async(episode, env_id) | |
| self._writer_queue.put_nowait((env_id, snapshot)) |
| episode = self._env.recorder_manager.get_episode(env_id) | ||
| if episode is None or episode.is_empty(): | ||
| continue | ||
| self.schedule_async_write_for_episode(episode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: schedule_async_write_for_episode is async but called synchronously (without await), so it returns immediately and the async task may access episode.data after the episode buffer is cleared at line 129
| self.schedule_async_write_for_episode(episode) | |
| await self.schedule_async_write_for_episode(episode) |
|
|
||
|
|
||
| async def schedule_async_write_for_episode(self, episode: EpisodeData) -> None: | ||
| snapshot = episode.data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Directly using episode.data without cloning creates data race - the snapshot still points to the same dict that gets cleared in the caller
| self._async_loop_thread = threading.Thread(target=self._async_loop.run_forever, daemon=True) | ||
| self._async_loop_thread.start() | ||
| # producer/consumer for non-blocking async writes | ||
| self._writer_queue: queue.Queue[tuple[int, EpisodeData] | None] = queue.Queue(maxsize=256) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Queue size of 256 episodes may overflow with 30 envs generating data faster than the writer thread can process, causing put_nowait to raise queue.Full exception
| f = self._file_map.get(filepath) | ||
| if f is None: | ||
| os.makedirs(os.path.dirname(filepath), exist_ok=True) | ||
| f = h5py.File(filepath, 'a') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
syntax: Opening file in append mode without checking if it needs to be created first - relies on os.makedirs at line 557 but may fail if parent directory doesn't exist
| self.schedule_async_write_for_episode(episode) | ||
| # Clear episode buffer synchronously | ||
| from isaaclab.utils.datasets import EpisodeData as _EpisodeData | ||
| self._env.recorder_manager._episodes[env_id] = _EpisodeData() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Creating new EpisodeData() without passing early_cpu_offload flag from the recorder configuration (available via self._env.recorder_manager.early_offload)
Test Results Summary2 665 tests 2 198 ✅ 2h 24m 15s ⏱️ For more details on these failures and errors, see this check. Results for commit 5d60c72. |
Description
Adding Replicator Writer to asynchronously export successful trajectories in Mimic synthetic data generation to HDF5 file such that the data generation loop is not blocked on long file I/O operations. Within the async routines are early cpu offload of EpisodeData tensors to reduce VRAM usage while remaining non-blocking for the environment + data generation loops
Benchmark results for synthetic data generation
30 envs / 100 trials:
10 envs / 50 trials:
Type of change
Checklist
pre-commitchecks with./isaaclab.sh --formatconfig/extension.tomlfileCONTRIBUTORS.mdor my name already exists there