Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ def build_base_parser() -> ArgumentParser:
"(e.g., block_size for JSON reading). This configuration is "
"especially useful when reading large JSON files.",
)
parser.add_argument(
"--override_num_blocks",
type=Optional[int],
default=None,
help="Override the number of output blocks for Ray Data read "
"operations. Useful for controlling parallelism when reading "
"large datasets.",
)
parser.add_argument(
"--work_dir",
type=str,
Expand Down
4 changes: 3 additions & 1 deletion data_juicer/core/data/load_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class RayLocalJsonDataLoadStrategy(RayDataLoadStrategy):
def load_data(self, **kwargs):
from data_juicer.core.data.ray_dataset import RayDataset

override_num_blocks = kwargs.pop("override_num_blocks", None)

path = self.ds_config["path"]

# Convert to absolute path if relative
Expand Down Expand Up @@ -281,7 +283,7 @@ def load_data(self, **kwargs):
else:
logger.info(f"Loading {data_format} data.")
try:
dataset = RayDataset.read(data_format, path)
dataset = RayDataset.read(data_format, path, override_num_blocks=override_num_blocks)
return RayDataset(dataset, dataset_path=path, cfg=self.cfg)
except Exception as e:
if auto_detect:
Expand Down
18 changes: 9 additions & 9 deletions data_juicer/core/data/ray_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,11 @@ def count(self) -> int:
return self.data.count()

@classmethod
def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset:
def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> RayDataset:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The return type hint RayDataset is incorrect. This method returns a ray.data.Dataset object, which is then wrapped by the caller (e.g., in load_strategy.py). This inconsistency exists in several methods in this class.

Suggested change
def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> RayDataset:
def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> ray.data.Dataset:

if data_format in {"json", "jsonl", "json.gz", "jsonl.gz", "json.zst", "jsonl.zst"}:
return RayDataset.read_json(paths)
return RayDataset.read_json(paths, **kwargs)
elif data_format == "webdataset":
return RayDataset.read_webdataset(paths)
return RayDataset.read_webdataset(paths, **kwargs)
elif data_format in {
"parquet",
"images",
Expand All @@ -369,23 +369,23 @@ def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset:
from data_juicer.utils.lazy_loader import LazyLoader

LazyLoader.check_packages(["pylance"])
return getattr(ray.data, f"read_{data_format}")(paths)
return getattr(ray.data, f"read_{data_format}")(paths, **kwargs)
Comment thread
fengrui-z marked this conversation as resolved.
Outdated

@classmethod
def read_json(cls, paths: Union[str, List[str]]) -> RayDataset:
def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None) -> RayDataset:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The read_json method is missing **kwargs in its signature. Since RayDataset.read (line 350) now passes **kwargs to read_json, any additional arguments (such as read_options from the config) will cause a TypeError. Also, the return type hint should be updated to ray.data.Dataset.

Suggested change
def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None) -> RayDataset:
def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None, **kwargs) -> ray.data.Dataset:

# Note: a temp solution for reading json stream
# TODO: replace with ray.data.read_json_stream once it is available
import pyarrow.json as js

try:
js.open_json
return read_json_stream(paths)
return read_json_stream(paths, override_num_blocks=override_num_blocks)
except AttributeError:
return ray.data.read_json(paths)
return ray.data.read_json(paths, override_num_blocks=override_num_blocks)
Comment thread
fengrui-z marked this conversation as resolved.
Outdated

@classmethod
def read_webdataset(cls, paths: Union[str, List[str]]) -> RayDataset:
return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"))
def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> RayDataset:
return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

ray.data.read_webdataset does not support override_num_blocks. If this argument is present in kwargs, the call will fail. Additionally, the return type hint should be corrected.

Suggested change
def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> RayDataset:
return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs)
@classmethod
def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> ray.data.Dataset:
override_num_blocks = kwargs.pop("override_num_blocks", None)
dataset = ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs)
if override_num_blocks:
dataset = dataset.repartition(override_num_blocks)
return dataset


def to_list(self) -> list:
return self.data.to_pandas().to_dict(orient="records")
Expand Down
5 changes: 4 additions & 1 deletion data_juicer/core/executor/ray_executor_partitioned.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,10 @@ def _run_impl(self, load_data_np: Optional[PositiveInt] = None, skip_return=Fals
# Load the full dataset using a single DatasetBuilder
logger.info("Loading dataset with single DatasetBuilder...")

dataset = self.datasetbuilder.load_dataset(num_proc=load_data_np)
override_num_blocks = getattr(self.cfg, "override_num_blocks", None)
dataset = self.datasetbuilder.load_dataset(
num_proc=load_data_np, override_num_blocks=override_num_blocks
)
columns = dataset.schema().columns

# Prepare operations
Expand Down
Empty file modified demos/agent/scripts/run_bad_case_pipeline.sh
100755 → 100644
Empty file.
Empty file modified demos/agent/scripts/verify_bad_case_export.py
100755 → 100644
Empty file.
Empty file modified demos/partition_and_checkpoint/robustness_benchmark.py
100755 → 100644
Empty file.
Empty file modified demos/partition_and_checkpoint/run_demo.py
100755 → 100644
Empty file.
Empty file modified thirdparty/LLM_ecosystems/setup_helm.sh
100755 → 100644
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed these files only changed executable mode (100755 -> 100644) with no content changes. Since they have shebangs, I’m not sure if that was intentional.

Empty file.
Empty file modified thirdparty/LLM_ecosystems/setup_megatron.sh
100755 → 100644
Empty file.
Empty file modified thirdparty/models/setup_easyanimate.sh
100755 → 100644
Empty file.
Empty file modified tools/check_s3_integration.py
100755 → 100644
Empty file.
Loading