From 95c77e0d10a32740113b288185bf2156188aea04 Mon Sep 17 00:00:00 2001 From: fengrui-z Date: Wed, 18 Mar 2026 15:18:03 +0800 Subject: [PATCH 1/5] fix: use broader substring match for aesthetics-predictor normalization check to support local model paths --- data_juicer/ops/filter/image_aesthetics_filter.py | 2 +- data_juicer/ops/filter/video_aesthetics_filter.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data_juicer/ops/filter/image_aesthetics_filter.py b/data_juicer/ops/filter/image_aesthetics_filter.py index faed8c5fed4..b62f46b823f 100644 --- a/data_juicer/ops/filter/image_aesthetics_filter.py +++ b/data_juicer/ops/filter/image_aesthetics_filter.py @@ -73,7 +73,7 @@ def __init__( trust_remote_code=trust_remote_code, ) # the original score predicted by laion-ai's scorer is within [0, 10] - self.need_normalized_by_ten = "shunk031/aesthetics-predictor" in hf_scorer_model + self.need_normalized_by_ten = "aesthetics-predictor" in hf_scorer_model def compute_stats_single(self, sample, rank=None, context=False): # check if it's computed already diff --git a/data_juicer/ops/filter/video_aesthetics_filter.py b/data_juicer/ops/filter/video_aesthetics_filter.py index 4a7901d98b1..5e0f618a095 100644 --- a/data_juicer/ops/filter/video_aesthetics_filter.py +++ b/data_juicer/ops/filter/video_aesthetics_filter.py @@ -123,7 +123,7 @@ def __init__( trust_remote_code=trust_remote_code, ) # the original score predicted by laion-ai's scorer is within [0, 10] - self.need_normalized_by_ten = "shunk031/aesthetics-predictor" in hf_scorer_model + self.need_normalized_by_ten = "aesthetics-predictor" in hf_scorer_model self.frame_sampling_method = frame_sampling_method self.frame_num = frame_num From df51e5df65df7430e961e1dc4c4800477aaace5d Mon Sep 17 00:00:00 2001 From: fengrui-z Date: Fri, 22 May 2026 17:58:20 +0800 Subject: [PATCH 2/5] feat: wire override_num_blocks through full call chain - config.py: add --override_num_blocks CLI argument - ray_executor_partitioned.py: pass override_num_blocks to load_dataset() - load_strategy.py: extract and forward override_num_blocks in RayLocalJsonDataLoadStrategy - ray_dataset.py: add kwargs to read(), add override_num_blocks to read_json() - Enables users to control Ray Data block parallelism for large datasets (5PB+) --- data_juicer/config/config.py | 8 ++++++++ data_juicer/core/data/load_strategy.py | 4 +++- data_juicer/core/data/ray_dataset.py | 18 +++++++++--------- .../core/executor/ray_executor_partitioned.py | 5 ++++- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 9d06a878458..a1dc754a353 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -208,6 +208,14 @@ def build_base_parser() -> ArgumentParser: "(e.g., block_size for JSON reading). This configuration is " "especially useful when reading large JSON files.", ) + parser.add_argument( + "--override_num_blocks", + type=Optional[int], + default=None, + help="Override the number of output blocks for Ray Data read " + "operations. Useful for controlling parallelism when reading " + "large datasets.", + ) parser.add_argument( "--work_dir", type=str, diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index d07f7184d23..fb755e3238c 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -201,6 +201,8 @@ class RayLocalJsonDataLoadStrategy(RayDataLoadStrategy): def load_data(self, **kwargs): from data_juicer.core.data.ray_dataset import RayDataset + override_num_blocks = kwargs.pop("override_num_blocks", None) + path = self.ds_config["path"] # Convert to absolute path if relative @@ -281,7 +283,7 @@ def load_data(self, **kwargs): else: logger.info(f"Loading {data_format} data.") try: - dataset = RayDataset.read(data_format, path) + dataset = RayDataset.read(data_format, path, override_num_blocks=override_num_blocks) return RayDataset(dataset, dataset_path=path, cfg=self.cfg) except Exception as e: if auto_detect: diff --git a/data_juicer/core/data/ray_dataset.py b/data_juicer/core/data/ray_dataset.py index 538a90bbb95..a610619a34e 100644 --- a/data_juicer/core/data/ray_dataset.py +++ b/data_juicer/core/data/ray_dataset.py @@ -347,11 +347,11 @@ def count(self) -> int: return self.data.count() @classmethod - def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset: + def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> RayDataset: if data_format in {"json", "jsonl", "json.gz", "jsonl.gz", "json.zst", "jsonl.zst"}: - return RayDataset.read_json(paths) + return RayDataset.read_json(paths, **kwargs) elif data_format == "webdataset": - return RayDataset.read_webdataset(paths) + return RayDataset.read_webdataset(paths, **kwargs) elif data_format in { "parquet", "images", @@ -369,23 +369,23 @@ def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset: from data_juicer.utils.lazy_loader import LazyLoader LazyLoader.check_packages(["pylance"]) - return getattr(ray.data, f"read_{data_format}")(paths) + return getattr(ray.data, f"read_{data_format}")(paths, **kwargs) @classmethod - def read_json(cls, paths: Union[str, List[str]]) -> RayDataset: + def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None) -> RayDataset: # Note: a temp solution for reading json stream # TODO: replace with ray.data.read_json_stream once it is available import pyarrow.json as js try: js.open_json - return read_json_stream(paths) + return read_json_stream(paths, override_num_blocks=override_num_blocks) except AttributeError: - return ray.data.read_json(paths) + return ray.data.read_json(paths, override_num_blocks=override_num_blocks) @classmethod - def read_webdataset(cls, paths: Union[str, List[str]]) -> RayDataset: - return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL")) + def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> RayDataset: + return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs) def to_list(self) -> list: return self.data.to_pandas().to_dict(orient="records") diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index c9bb55b50d5..8dcebe655c3 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -414,7 +414,10 @@ def _run_impl(self, load_data_np: Optional[PositiveInt] = None, skip_return=Fals # Load the full dataset using a single DatasetBuilder logger.info("Loading dataset with single DatasetBuilder...") - dataset = self.datasetbuilder.load_dataset(num_proc=load_data_np) + override_num_blocks = getattr(self.cfg, "override_num_blocks", None) + dataset = self.datasetbuilder.load_dataset( + num_proc=load_data_np, override_num_blocks=override_num_blocks + ) columns = dataset.schema().columns # Prepare operations From b2531ae5655d993ba80c6b846e85a7f4e5ad1e3e Mon Sep 17 00:00:00 2001 From: fengrui-z Date: Fri, 22 May 2026 18:12:46 +0800 Subject: [PATCH 3/5] debug: WIP changes for scripts and benchmarks --- demos/agent/scripts/run_bad_case_pipeline.sh | 0 demos/agent/scripts/verify_bad_case_export.py | 0 demos/partition_and_checkpoint/robustness_benchmark.py | 0 demos/partition_and_checkpoint/run_demo.py | 0 thirdparty/LLM_ecosystems/setup_helm.sh | 0 thirdparty/LLM_ecosystems/setup_megatron.sh | 0 thirdparty/models/setup_easyanimate.sh | 0 tools/check_s3_integration.py | 0 8 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 demos/agent/scripts/run_bad_case_pipeline.sh mode change 100755 => 100644 demos/agent/scripts/verify_bad_case_export.py mode change 100755 => 100644 demos/partition_and_checkpoint/robustness_benchmark.py mode change 100755 => 100644 demos/partition_and_checkpoint/run_demo.py mode change 100755 => 100644 thirdparty/LLM_ecosystems/setup_helm.sh mode change 100755 => 100644 thirdparty/LLM_ecosystems/setup_megatron.sh mode change 100755 => 100644 thirdparty/models/setup_easyanimate.sh mode change 100755 => 100644 tools/check_s3_integration.py diff --git a/demos/agent/scripts/run_bad_case_pipeline.sh b/demos/agent/scripts/run_bad_case_pipeline.sh old mode 100755 new mode 100644 diff --git a/demos/agent/scripts/verify_bad_case_export.py b/demos/agent/scripts/verify_bad_case_export.py old mode 100755 new mode 100644 diff --git a/demos/partition_and_checkpoint/robustness_benchmark.py b/demos/partition_and_checkpoint/robustness_benchmark.py old mode 100755 new mode 100644 diff --git a/demos/partition_and_checkpoint/run_demo.py b/demos/partition_and_checkpoint/run_demo.py old mode 100755 new mode 100644 diff --git a/thirdparty/LLM_ecosystems/setup_helm.sh b/thirdparty/LLM_ecosystems/setup_helm.sh old mode 100755 new mode 100644 diff --git a/thirdparty/LLM_ecosystems/setup_megatron.sh b/thirdparty/LLM_ecosystems/setup_megatron.sh old mode 100755 new mode 100644 diff --git a/thirdparty/models/setup_easyanimate.sh b/thirdparty/models/setup_easyanimate.sh old mode 100755 new mode 100644 diff --git a/tools/check_s3_integration.py b/tools/check_s3_integration.py old mode 100755 new mode 100644 From f5623ffeebbd48b56535b4efab8fec4cb7e440e3 Mon Sep 17 00:00:00 2001 From: fengrui-z Date: Mon, 25 May 2026 15:34:47 +0800 Subject: [PATCH 4/5] style: fix black formatting --- data_juicer/core/executor/ray_executor_partitioned.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 8dcebe655c3..6b4a0ceafdd 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -415,9 +415,7 @@ def _run_impl(self, load_data_np: Optional[PositiveInt] = None, skip_return=Fals logger.info("Loading dataset with single DatasetBuilder...") override_num_blocks = getattr(self.cfg, "override_num_blocks", None) - dataset = self.datasetbuilder.load_dataset( - num_proc=load_data_np, override_num_blocks=override_num_blocks - ) + dataset = self.datasetbuilder.load_dataset(num_proc=load_data_np, override_num_blocks=override_num_blocks) columns = dataset.schema().columns # Prepare operations From aa3dd4cd6a560e760c2ed2bbd29b587c31cbfeac Mon Sep 17 00:00:00 2001 From: fengrui-z Date: Mon, 25 May 2026 15:40:08 +0800 Subject: [PATCH 5/5] fix: prevent TypeError by popping override_num_blocks before native Ray read calls - read(): pop override_num_blocks from kwargs, use repartition() for non-JSON formats - read_json(): add **kwargs, use repartition() in fallback path instead of passing to ray.data.read_json - read_webdataset(): pop override_num_blocks, apply repartition() after read - Fix return type hints to ray.data.Dataset --- data_juicer/core/data/ray_dataset.py | 31 +++++++++++++++++++++------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/data_juicer/core/data/ray_dataset.py b/data_juicer/core/data/ray_dataset.py index a610619a34e..3ff71e16773 100644 --- a/data_juicer/core/data/ray_dataset.py +++ b/data_juicer/core/data/ray_dataset.py @@ -348,10 +348,13 @@ def count(self) -> int: @classmethod def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> RayDataset: + # Pop override_num_blocks since native Ray read functions don't support it + override_num_blocks = kwargs.pop("override_num_blocks", None) + if data_format in {"json", "jsonl", "json.gz", "jsonl.gz", "json.zst", "jsonl.zst"}: - return RayDataset.read_json(paths, **kwargs) + return RayDataset.read_json(paths, override_num_blocks=override_num_blocks, **kwargs) elif data_format == "webdataset": - return RayDataset.read_webdataset(paths, **kwargs) + return RayDataset.read_webdataset(paths, override_num_blocks=override_num_blocks, **kwargs) elif data_format in { "parquet", "images", @@ -369,23 +372,35 @@ def read(cls, data_format: str, paths: Union[str, List[str]], **kwargs) -> RayDa from data_juicer.utils.lazy_loader import LazyLoader LazyLoader.check_packages(["pylance"]) - return getattr(ray.data, f"read_{data_format}")(paths, **kwargs) + dataset = getattr(ray.data, f"read_{data_format}")(paths, **kwargs) + if override_num_blocks: + dataset = dataset.repartition(override_num_blocks) + return dataset @classmethod - def read_json(cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None) -> RayDataset: + def read_json( + cls, paths: Union[str, List[str]], override_num_blocks: Optional[int] = None, **kwargs + ) -> ray.data.Dataset: # Note: a temp solution for reading json stream # TODO: replace with ray.data.read_json_stream once it is available import pyarrow.json as js try: js.open_json - return read_json_stream(paths, override_num_blocks=override_num_blocks) + return read_json_stream(paths, override_num_blocks=override_num_blocks, **kwargs) except AttributeError: - return ray.data.read_json(paths, override_num_blocks=override_num_blocks) + dataset = ray.data.read_json(paths, **kwargs) + if override_num_blocks: + dataset = dataset.repartition(override_num_blocks) + return dataset @classmethod - def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> RayDataset: - return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs) + def read_webdataset(cls, paths: Union[str, List[str]], **kwargs) -> ray.data.Dataset: + override_num_blocks = kwargs.pop("override_num_blocks", None) + dataset = ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"), **kwargs) + if override_num_blocks: + dataset = dataset.repartition(override_num_blocks) + return dataset def to_list(self) -> list: return self.data.to_pandas().to_dict(orient="records")