Skip to content

feat: Wire override_num_blocks through full call chain for Ray Data read operations#984

Open
fengrui-z wants to merge 7 commits into
datajuicer:mainfrom
fengrui-z:debug
Open

feat: Wire override_num_blocks through full call chain for Ray Data read operations#984
fengrui-z wants to merge 7 commits into
datajuicer:mainfrom
fengrui-z:debug

Conversation

@fengrui-z
Copy link
Copy Markdown
Collaborator

Summary

Enable users to control Ray Data's block parallelism via a new --override_num_blocks CLI argument. This parameter was already implemented at the lowest layer (read_json_stream()) but was never wired through the upstream call chain, making it inaccessible without monkey-patching.

Motivation

When processing very large datasets (billions of records / PB-scale), Ray Data's default block size (128MB) creates an excessive number of blocks (~40M blocks for 5PB), leading to:

  • Driver OOM: Block metadata (~1KB/block) overwhelms driver memory
  • Poor parallelism control: Users cannot tune the read parallelism to match their cluster topology (e.g., 96 CPUs need different block counts than 8 CPUs)
  • Startup latency: Default partitioning scans all blocks before processing begins

Previously, the only workaround was to monkey-patch RayLocalJsonDataLoadStrategy.load_data() (as seen in benchmark scripts). This PR provides a clean, user-facing configuration path.

Changes

File Change
data_juicer/config/config.py Add --override_num_blocks CLI argument (Optional[int], default None)
data_juicer/core/executor/ray_executor_partitioned.py Extract cfg.override_num_blocks and pass to load_dataset()
data_juicer/core/data/load_strategy.py RayLocalJsonDataLoadStrategy.load_data() extracts and forwards override_num_blocks
data_juicer/core/data/ray_dataset.py read() accepts **kwargs; read_json() accepts override_num_blocks and passes to read_json_stream()

Usage

# Control block parallelism for large datasets
python process.py --override_num_blocks 1000 --dataset_path /data/huge_dataset.jsonl

# Or in YAML config
override_num_blocks: 1000

Backward Compatibility

  • Default is None (no override) — existing behavior is unchanged
  • All changes use **kwargs pass-through or Optional parameters with None defaults
  • No breaking changes to existing APIs

Testing

  • Smoke test: python -c "import data_juicer" passes
  • The parameter correctly flows through: cfgPartitionedRayExecutor._run_impl()DatasetBuilder.load_dataset()RayLocalJsonDataLoadStrategy.load_data()RayDataset.read()RayDataset.read_json()read_json_stream()ray.data.read_datasource(override_num_blocks=N)

Related

  • Prerequisite for PB-scale data processing support

fengrui-z and others added 5 commits March 18, 2026 15:18
- config.py: add --override_num_blocks CLI argument
- ray_executor_partitioned.py: pass override_num_blocks to load_dataset()
- load_strategy.py: extract and forward override_num_blocks in RayLocalJsonDataLoadStrategy
- ray_dataset.py: add kwargs to read(), add override_num_blocks to read_json()
- Enables users to control Ray Data block parallelism for large datasets (5PB+)
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new configuration option, --override_num_blocks, to control the number of output blocks during Ray Data read operations, enhancing parallelism control. The changes involve propagating this parameter through the RayDataset loading methods and the partitioned executor. Feedback highlights several potential TypeError risks because standard Ray read functions do not natively support the override_num_blocks parameter; reviewers suggest using the .repartition() method as a workaround. Additionally, the review identifies missing **kwargs in method signatures and incorrect return type hints that need to be addressed for consistency and correctness.

Comment thread data_juicer/core/data/ray_dataset.py Outdated
Comment thread data_juicer/core/data/ray_dataset.py Outdated

@classmethod
def read_json(cls, paths: Union[str, List[str]]) -> RayDataset:
def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None) -> RayDataset:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The read_json method is missing **kwargs in its signature. Since RayDataset.read (line 350) now passes **kwargs to read_json, any additional arguments (such as read_options from the config) will cause a TypeError. Also, the return type hint should be updated to ray.data.Dataset.

Suggested change
def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None) -> RayDataset:
def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None, **kwargs) -> ray.data.Dataset:

Comment thread data_juicer/core/data/ray_dataset.py Outdated
Comment thread data_juicer/core/data/ray_dataset.py Outdated
Comment on lines +387 to +388
def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> RayDataset:
return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

ray.data.read_webdataset does not support override_num_blocks. If this argument is present in kwargs, the call will fail. Additionally, the return type hint should be corrected.

Suggested change
def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> RayDataset:
return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs)
@classmethod
def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> ray.data.Dataset:
override_num_blocks = kwargs.pop("override_num_blocks", None)
dataset = ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs)
if override_num_blocks:
dataset = dataset.repartition(override_num_blocks)
return dataset


@classmethod
def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset:
def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> RayDataset:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The return type hint RayDataset is incorrect. This method returns a ray.data.Dataset object, which is then wrapped by the caller (e.g., in load_strategy.py). This inconsistency exists in several methods in this class.

Suggested change
def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> RayDataset:
def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> ray.data.Dataset:

…ay read calls

- read(): pop override_num_blocks from kwargs, use repartition() for non-JSON formats
- read_json(): add **kwargs, use repartition() in fallback path instead of passing to ray.data.read_json
- read_webdataset(): pop override_num_blocks, apply repartition() after read
- Fix return type hints to ray.data.Dataset
@fengrui-z fengrui-z marked this pull request as ready for review May 25, 2026 07:42
@cmgzn
Copy link
Copy Markdown
Collaborator

cmgzn commented May 25, 2026

From the current config surface, this looks like the first top-level config that maps specifically to a ray.data.read_* argument. Since Ray read APIs expose quite a few related options (override_num_blocks, concurrency, ray_remote_args, shuffle, etc.), I wonder if it would be better to introduce a generic Ray read config shape instead of adding a one-off top-level field.

For example:

ray_read_options:
  override_num_blocks: 1000
  concurrency: 64
  ray_remote_args:
    num_cpus: 0.25

@fengrui-z
Copy link
Copy Markdown
Collaborator Author

From the current config surface, this looks like the first top-level config that maps specifically to a ray.data.read_* argument. Since Ray read APIs expose quite a few related options (override_num_blocks, concurrency, ray_remote_args, shuffle, etc.), I wonder if it would be better to introduce a generic Ray read config shape instead of adding a one-off top-level field.

For example:

ray_read_options:
  override_num_blocks: 1000
  concurrency: 64
  ray_remote_args:
    num_cpus: 0.25

Good suggestion — I considered this but decided to keep a top-level argument for the following reasons:

  1. Type safety & validation. A dedicated --override_num_blocks (type Optional[int]) catches typos and type errors at parse time. A generic dict silently ignores misspelled keys (e.g. overide_num_blocks: 1000 → no error, no effect).

  2. CLI discoverability. A top-level arg shows up in --help and supports tab-completion. With a dict, users need to know the internal key names upfront:

    # Clear and discoverable
    python process.py --override_num_blocks 1000
    
    # Requires prior knowledge of key names
    python process.py --ray_read_options '{"override_num_blocks": 1000}'
  3. The dict would be immediately destructured. Tracing through the call chain, the dict gets **-spread at the executor and each key is individually popped in RayDataset.read(). It never operates as a dict — it's purely a transport wrapper, adding indirection with no functional benefit.

  4. YAGNI. override_num_blocks is currently the only Ray read param that requires upstream wiring. The other candidates (concurrency, ray_remote_args) are auto-tuned by Ray and rarely need user override. If/when a second param genuinely needs to be exposed, that's a natural point to refactor into a grouped config.

  5. Precedent in this codebase. --ray_address, --batch_size, and other important Ray-related params are all top-level explicit args — grouping just this one into a dict would be inconsistent.

Happy to revisit if we find ourselves adding a second or third Ray read param in the near future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants