Refactor datapackerdataloader to be modular#19
Merged
Conversation
Four-role decomposition (DataDistributor / RawItemProcessor / SampleBatcher / BatchCollator) replacing the fused DataPacker + PackingIterableDataset. Covers VLM + VFM behavior-preserving migration and arbitrary user dataflows. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ment Validate the four-role abstraction against ALL dataloaders (not just VLM/VFM): WebDataset base is vestigial, joint_dataloader fusions are composition artifacts, videophy2 + DROID covered, legacy Iterative/Random expressible. Record per-recipe processor placement and collator nuances. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…h debt IdentityProcessor + still-processing dataset is a conscious break from the four-role separation, chosen for safety/minimal diff; tracked as a follow-up to extract real SFTVideoProcessor / DROIDProcessor later. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Keep old dataloaders as a living baseline; validate each live recipe via a mirror *_datapacker experiment with golden-batch + deterministic loss-curve equivalence (leveraging train.py --deterministic). Delete old code only in a final cleanup PR after all mirrors are validated. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task-oriented bring-your-own-dataset tutorial (mental model, quickstart, four roles, use-case recipes, wiring, resume, sharding, FAQ, worked example). Wired into the implementation phases and docs index. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Align built-in-roles note with the per-recipe IdentityProcessor decision (SFTVideoProcessor/DROIDProcessor are deferred follow-ups). Bump status. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Bite-sized TDD plan for the four role ABCs + IdentityProcessor / DefaultBatchCollator / SimpleBatcher / IterableDistributor / MapDistributor + the DataPackerDataLoader orchestrator (batch_size sugar, DP-coord resolution). New dataflow/ package, no existing recipe touched. Resume and packing batchers deferred to follow-up plans. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Resume + checkpoint saving must not break (blocking gate, dedicated resume test). Regression = baseline launch scripts vs mirror *_datapacker experiments, logging_iter=1/max_iter=500, wandb project cosmos_oss_alignment, fresh run names, env via cosmos3-run-env. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ression PoolPackingBatcher (port of the packing engine), VLMProcessor/VLMCollator extraction, golden-batch equality test, mirror experiment + TOML + launch wrapper, and the loss-curve regression run (baseline vs mirror, logging_iter=1, max_iter=500, wandb cosmos_oss_alignment). Resume invariant honored (iterable source, callbacks untouched). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…smosDataLoader) DataPacker is gone and packing is just one batcher, so the orchestrator gets a fitting name. New loader lives in dataflow/loader.py; legacy DataPackerDataLoader / JointDataPackerDataLoader stay as the regression baseline until cleanup. Legacy references in golden tests / coverage audit kept intact. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Plan 3: MapDistributor env-var resume + CosmosDataLoader meta-threading,
callback unchanged, resume integration test (HARD INVARIANT).
Plan 4: videophy2 migration (VideoPhy2Processor, reuse VLMCollator), golden +
regression.
Plan 5: VFM migration (VFMListCollator, SequentialPackingBatcher,
RankPartitionedDistributor) + MixtureDistributor + JointCosmosDataLoader.
Plan 6: docs/dataflow.md tutorial + cleanup (promote mirrors, delete legacy);
execution gated LAST on all regressions passing.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…SampleBatcher/BatchCollator) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Introduces loader.py with CosmosDataLoader (thin DataLoader subclass) and _DataflowIterableDataset, wiring DataDistributor -> RawItemProcessor -> SampleBatcher -> BatchCollator per worker. Adds batch_size= convenience sugar (builds SimpleBatcher + DefaultBatchCollator) and rejects ambiguous combos. Exports CosmosDataLoader from the dataflow package __init__. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Appends test_multiworker_disjoint_and_complete_one_epoch to loader_test.py: spins up CosmosDataLoader with num_workers=2 over a 12-item MapDataset, consumes exactly one epoch, and asserts all 12 indices are seen exactly once with no duplicates or gaps. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…istent_workers coercion MapDistributor.stream now returns immediately when n==0 or stream_id>=n, preventing an infinite spin that would hang training when the dataset is empty or smaller than dp_world_size*num_workers. Three regression tests added (empty dataset, empty shard, non-empty shard stays infinite). CosmosDataLoader now logs an info message and explicitly sets persistent_workers=False when num_workers==0, instead of silently coercing True to False inside the dict literal. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…t engine) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copies _decode_image / _sharegpt_to_openai / sft_process_sample / sft_collate_fn logic verbatim from VLMDataPacker into standalone RawItemProcessor + BatchCollator roles, ready for use with CosmosDataLoader. Legacy files are untouched. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Proves that CosmosDataLoader+VLMProcessor+VLMCollator+PoolPackingBatcher produces identical batches to the legacy DataPackerDataLoader+VLMDataPacker on the same fixed iterable source (40 items, drain 10 batches). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… dataflow loader Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…sion Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add _DataflowIterableDataset._cached_gen so single-process (num_workers=0) MapDistributor iter() calls reuse the same generator, preserving resume state across iter() calls (analogous to persistent_workers for multi-worker mode). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…le v2 dataloader_train - loader.py: revert the _cached_gen generator caching (commit 1c4d017). __iter__ returns a fresh generator per call, matching legacy/torch semantics. The resume test was the real bug, not the loader. - resume_test.py: use a single iterator (it2 = iter(loader2)) so the env-var fast-forward is consumed once and iteration continues 5,6,7. - llava_ov_datapacker_v2.toml: drop [dataloader_train].max_samples_per_batch — it remaps to dataloader_train.max_batch_size which does not exist on the new CosmosDataLoader node (PoolPackingBatcher owns max_batch_size), causing ConfigAttributeError at config resolution. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…guity guard - PoolPackingBatcher.sample_size default uses len(sample["input_ids"]) (matches the documented contract; identical to .shape[0] for 1-D tensors, also handles list input_ids). - CosmosDataLoader: keep the legacy bit-for-bit max(epoch)/max(pos) resume stamp (correct for all live recipes — VLM/videophy2 use max_batch_size=1; VFM uses a non-resumable iterable source). NOT switching to min-1 (Codex's suggestion), which would make max_batch_size=1 replay the last sample on every resume. Instead, add a contiguity/epoch guard: a multi-sample batch whose _dp_stream_pos are non-contiguous or span epochs (reordering batcher + batch_size>1) now raises loudly instead of silently skipping buffered lower positions on resume. - Add regression test for the guard. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…epcopy vision_sft_nano_v2 differed from vision_sft_nano only in job.name and the training dataloader (legacy PackingDataLoader+RankPartitionedDataLoader vs the four-role CosmosDataLoader stack). Derive v2 by deep-copying vision_sft_nano and overriding just those two fields; delete the standalone vision_sft_nano_v2.py module + its config.py import (v2 now registers from vision_sft_nano.py). Experiment name "vision_sft_nano_v2" is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ment.py via deepcopy pre_exp012_llava_ov_mapstyle_dataloader differed from pre_exp012_llava_ov only in job.name, the dataloader_state callback (distributor_type="cosmos_dataloader"), checkpoint.save_iter, and the dataloader (streaming IterableDistributor vs map-style MapDistributor + num_workers=0). Derive it by deep-copying pre_exp012_llava_ov and overriding just those fields; move get_llava_ov_map alongside get_llava_ov_streaming. Delete the standalone module (VLM experiments auto-register via import_all_modules_from_package). Experiment name and TOML unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the per-module unit tests that sat next to the dataflow source (base/batchers/collators/distributors/loader/processors/joint_loader tests, the VLM/VideoPhy2 dataflow_roles tests, and the vision_sft_nano_v2 registration test). Keep the two behavior guarantees: golden_vfm_test.py (byte-identical to the legacy loader) and resume_test.py (checkpoint resume = no dup/skip). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Commit 7bd72ee added the CosmosDataLoader imports + removed the standalone module/config.py import, but the vision_sft_nano_v2 deepcopy block and the registration-loop update were dropped in a messy git-reset sequence — leaving config.py expecting an experiment that vision_sft_nano.py no longer defined ("Could not find experiment/vision_sft_nano_v2"). Re-add the deepcopy derivation and register both experiments. Verified: all 6 example recipes dry-run clean (vision_sft_nano_v2 resolves to CosmosDataLoader -> RankPartitionedDistributor -> SequentialPackingBatcher -> VFMListCollator). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
lfengad
reviewed
Jun 8, 2026
yy-code-nv
previously approved these changes
Jun 8, 2026
lfengad
previously approved these changes
Jun 9, 2026
…dataloader_state.py dataloader_state.py was modified in commit 4855c86 to add "cosmos_dataloader" as a distributor_type and change the env prefix to COSMOS_DL_STATE_. This entangled CosmosDataLoader-specific resume logic with the legacy DataLoaderStateCallback that serves WebDataset "no_replace" / "data_packer" shardlist strategies. Revert dataloader_state.py fully to main (data_packer, DP_STATE_ prefix) and introduce cosmos_dataloader_state.py with two dedicated classes: - CosmosDataLoaderStateCallback: purpose-built for CosmosDataLoader(MapDistributor). No distributor_type discriminator — always active. Writes COSMOS_DL_STATE_* env vars on load_state_dict so MapDistributor.stream fast-forwards on resume. Accepts distributor_type kwarg (ignored) for Hydra struct-merge compat with the base DataLoaderStateCallback entry in the basic_log callbacks group. - JointCosmosDataLoaderStateCallback: for JointCosmosDataLoader. Persists outer global_id + inner per-dataset per-worker (epoch, index) via one CosmosDataLoaderStateCallback per inner loader. Update llava_ov_vlm.py and resume_test.py to use the new callback. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…llback DataPackerDataLoader and JointDataPackerDataLoader were removed earlier in this branch. Clean up the remnants in dataloader_state.py: - Drop "data_packer" from _ACTIVE_DISTRIBUTOR_TYPES (only "no_replace" remains) - Remove the DP_STATE_ env-var branch in load_state_dict - Delete JointDataLoaderStateCallback (existed solely for the deleted JointDataPackerDataLoader; JointCosmosDataLoaderStateCallback covers the CosmosDataLoader joint-loader case) - Fix stale JointDataLoaderStateCallback docstring reference in loader.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
lfengad
previously approved these changes
Jun 9, 2026
…cipe Mirrors launch_sft_llava_ov.sh for the MapDistributor-backed VLM SFT recipe. Supports optional RESUME_FROM_CKPT env var for checkpoint resume via CosmosDataLoaderStateCallback. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
186116a to
fe95d63
Compare
lfengad
approved these changes
Jun 11, 2026
yy-code-nv
approved these changes
Jun 12, 2026
rahul-steiger-nv
pushed a commit
to rahul-steiger-nv/cosmos-framework
that referenced
this pull request
Jun 15, 2026
## Summary
Refactors the training data layer from the monolithic
`DataPackerDataLoader` /
`DataPacker` / `PackingIterableDataset` into a modular, four-role
abstraction
wired by a single loader. Behavior is preserved (golden-batch
byte-identical to
the legacy loader; resume validated live), and all existing recipes are
migrated.
DataDistributor → RawItemProcessor → SampleBatcher → BatchCollator
(shard/shuffle/ (raw item → one (samples → (group → one
resume) sample dict) batch groups) batch dict)
Each role is a small ABC with one required method; pick a built-in per
slot or
write your own. `CosmosDataLoader` is a `torch.utils.data.DataLoader`
subclass, so
it drops into the existing training loop.
## What changed
### New dataflow package — `cosmos_framework/data/vfm/dataflow/`
- **Loaders:** `CosmosDataLoader` (+ `batch_size=` sugar →
`SimpleBatcher` +
`DefaultBatchCollator`), `JointCosmosDataLoader` (ratio-weighted
heterogeneous join).
- **Distributors:** `IterableDistributor`, `MapDistributor` (resumable),
`RankPartitionedDistributor`, `MixtureDistributor`.
- **Processors:** `IdentityProcessor` (+ recipe-specific `VLMProcessor`,
`VideoPhy2Processor`).
- **Batchers:** `SimpleBatcher`, `PoolPackingBatcher`,
`SequentialPackingBatcher`.
- **Collators:** `DefaultBatchCollator`, `VFMListCollator` (+ recipe
`VLMCollator`).
### Legacy removal
- Deleted `data_packer.py`, `data_packer_dataloader.py`,
`packing_iterable_dataset.py`, `test_dp_state_distributed.py` (+ old
tests).
### Experiment migrations
- VLM `llava_ov` (renamed from `llava_ov_datapacker`, streaming
`IterableDistributor`).
- VLM `videophy2_sft_nano`.
- VFM: existing path unchanged; added `vision_sft_nano_v2` (new-loader
variant).
- Added `llava_ov_mapresume` — map-style
(`load_dataset(streaming=False)` +
`MapDistributor`) resumable example.
### Config / TOML
- `PATH_REMAPS["vlm"]`: route `dataloader_train.{max_samples_per_batch,
max_sequence_length}` → nested `batcher.{max_batch_size, max_tokens}`.
### Checkpoint / resume
- Renamed the resume-state selector value `"data_packer"` →
`"cosmos_dataloader"`
and env prefix `DP_STATE_` → `COSMOS_DL_STATE_`
(`DataLoaderStateCallback`,
`JointDataLoaderStateCallback`, `MapDistributor`). On-disk format
unchanged.
### CI / tests / docs
- Updated `tests/launch_regression_test.py` + launch scripts for the
`llava_ov`
rename (golden loss keyed by `llava_ov`; workflow `-k llava_ov`).
- Added golden-batch, resume, and per-role unit tests.
- Replaced `docs/custom_dataset.md` with the `CosmosDataLoader`
tutorial; removed
`docs/dataflow.md`.
## Validation
- **Golden-batch equality:** VLM / videophy2 / VFM batches
byte-identical to the
legacy loader.
- **Live save→stop→resume** on `pre_exp012_llava_ov_mapresume` (8 dp
ranks,
`save_iter=100`): per-rank `input_ids` shapes identical across the
resume
boundary — **792 `(iter, rank)` keys, 0 mismatches** — and loss curves
match.
No duplicated/skipped samples on any rank.
- **No CI risk:** the `llava_ov` golden recipe and its streaming data
path are
unchanged; the remap only affects the 3 VLM TOMLs, all of which compose
cleanly
onto a real `PoolPackingBatcher`.
## Hard invariant
Dataloader resume + checkpoint saving must not regress. Held: resume is
preserved
through the existing `DataLoaderStateCallback`, with map-style
fast-forward and the
multi-sample contiguity guard, and validated end-to-end above.
---------
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Refactors the training data layer from the monolithic
DataPackerDataLoader/DataPacker/PackingIterableDatasetinto a modular, four-role abstractionwired by a single loader. Behavior is preserved (golden-batch byte-identical to
the legacy loader; resume validated live), and all existing recipes are migrated.
DataDistributor → RawItemProcessor → SampleBatcher → BatchCollator
(shard/shuffle/ (raw item → one (samples → (group → one
resume) sample dict) batch groups) batch dict)
Each role is a small ABC with one required method; pick a built-in per slot or
write your own.
CosmosDataLoaderis atorch.utils.data.DataLoadersubclass, soit drops into the existing training loop.
What changed
New dataflow package —
cosmos_framework/data/vfm/dataflow/CosmosDataLoader(+batch_size=sugar →SimpleBatcher+DefaultBatchCollator),JointCosmosDataLoader(ratio-weighted heterogeneous join).IterableDistributor,MapDistributor(resumable),RankPartitionedDistributor,MixtureDistributor.IdentityProcessor(+ recipe-specificVLMProcessor,VideoPhy2Processor).SimpleBatcher,PoolPackingBatcher,SequentialPackingBatcher.DefaultBatchCollator,VFMListCollator(+ recipeVLMCollator).Legacy removal
data_packer.py,data_packer_dataloader.py,packing_iterable_dataset.py,test_dp_state_distributed.py(+ old tests).Experiment migrations
llava_ov(renamed fromllava_ov_datapacker, streamingIterableDistributor).videophy2_sft_nano.vision_sft_nano_v2(new-loader variant).llava_ov_mapresume— map-style (load_dataset(streaming=False)+MapDistributor) resumable example.Config / TOML
PATH_REMAPS["vlm"]: routedataloader_train.{max_samples_per_batch, max_sequence_length}→ nestedbatcher.{max_batch_size, max_tokens}.Checkpoint / resume
"data_packer"→"cosmos_dataloader"and env prefix
DP_STATE_→COSMOS_DL_STATE_(DataLoaderStateCallback,JointDataLoaderStateCallback,MapDistributor). On-disk format unchanged.CI / tests / docs
tests/launch_regression_test.py+ launch scripts for thellava_ovrename (golden loss keyed by
llava_ov; workflow-k llava_ov).docs/custom_dataset.mdwith theCosmosDataLoadertutorial; removeddocs/dataflow.md.Validation
legacy loader.
pre_exp012_llava_ov_mapresume(8 dp ranks,save_iter=100): per-rankinput_idsshapes identical across the resumeboundary — 792
(iter, rank)keys, 0 mismatches — and loss curves match.No duplicated/skipped samples on any rank.
llava_ovgolden recipe and its streaming data path areunchanged; the remap only affects the 3 VLM TOMLs, all of which compose cleanly
onto a real
PoolPackingBatcher.Hard invariant
Dataloader resume + checkpoint saving must not regress. Held: resume is preserved
through the existing
DataLoaderStateCallback, with map-style fast-forward and themulti-sample contiguity guard, and validated end-to-end above.