From 4410e8097adaeac0f7f51e799605294ef2c0ea6d Mon Sep 17 00:00:00 2001 From: "guohongyu.7" Date: Mon, 25 May 2026 17:27:36 +0800 Subject: [PATCH 1/2] feat: add repartition pipeline --- data_juicer/config/config_all.yaml | 5 ++ data_juicer/ops/pipeline/__init__.py | 3 +- .../ops/pipeline/repartition_pipeline.py | 39 +++++++++ docs/Operators.md | 3 +- .../pipeline/repartition_pipeline.md | 26 ++++++ .../ops/pipeline/test_repartition_pipeline.py | 82 +++++++++++++++++++ 6 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 data_juicer/ops/pipeline/repartition_pipeline.py create mode 100644 docs/operators/pipeline/repartition_pipeline.md create mode 100644 tests/ops/pipeline/test_repartition_pipeline.py diff --git a/data_juicer/config/config_all.yaml b/data_juicer/config/config_all.yaml index cfe56a631b9..a66b2f9ad3a 100644 --- a/data_juicer/config/config_all.yaml +++ b/data_juicer/config/config_all.yaml @@ -699,6 +699,11 @@ process: save_visualization_dir: None # The path for saving visualization results. - whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace. + # Pipeline ops + - repartition_pipeline: # repartition a Ray Dataset into a target number of blocks. + num_blocks: 1 # Target number of Ray Dataset blocks. + shuffle: False # Whether to shuffle records during repartition. + # Filter ops - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. tokenization: false # whether to count the ratio of alphanumeric to the total number of tokens. diff --git a/data_juicer/ops/pipeline/__init__.py b/data_juicer/ops/pipeline/__init__.py index fa64957e353..ef5e60f375b 100644 --- a/data_juicer/ops/pipeline/__init__.py +++ b/data_juicer/ops/pipeline/__init__.py @@ -1,4 +1,5 @@ from .llm_inference_with_ray_vllm_pipeline import LLMRayVLLMEnginePipeline +from .repartition_pipeline import RepartitionPipeline from .vlm_inference_with_ray_vllm_pipeline import VLMRayVLLMEnginePipeline -__all__ = ["LLMRayVLLMEnginePipeline", "VLMRayVLLMEnginePipeline"] +__all__ = ["LLMRayVLLMEnginePipeline", "RepartitionPipeline", "VLMRayVLLMEnginePipeline"] diff --git a/data_juicer/ops/pipeline/repartition_pipeline.py b/data_juicer/ops/pipeline/repartition_pipeline.py new file mode 100644 index 00000000000..c5850aa7194 --- /dev/null +++ b/data_juicer/ops/pipeline/repartition_pipeline.py @@ -0,0 +1,39 @@ +from data_juicer.ops.base_op import OPERATORS, Pipeline + +OP_NAME = "repartition_pipeline" + + +@OPERATORS.register_module(OP_NAME) +class RepartitionPipeline(Pipeline): + """Repartition a Ray Dataset into a target number of blocks. + + This operator performs dataset-level block repartitioning through Ray + Dataset's `repartition` API. It is intended for Ray executor pipelines only + because local datasets do not expose Ray Dataset blocks. + """ + + def __init__( + self, + num_blocks: int = 1, + shuffle: bool = False, + *args, + **kwargs, + ): + """ + Initialization method. + + :param num_blocks: target number of Ray Dataset blocks. + :param shuffle: whether to shuffle records during repartition. + """ + super().__init__(*args, **kwargs) + if not isinstance(num_blocks, int) or isinstance(num_blocks, bool) or num_blocks <= 0: + raise ValueError("num_blocks must be a positive integer") + self.num_blocks = num_blocks + self.shuffle = bool(shuffle) + + def run(self, dataset, *, exporter=None, tracer=None): + from data_juicer.core.data import NestedDataset + + if isinstance(dataset, NestedDataset): + raise RuntimeError("repartition_pipeline requires Ray executor because local datasets do not have blocks") + return dataset.repartition(num_blocks=self.num_blocks, shuffle=self.shuffle) diff --git a/docs/Operators.md b/docs/Operators.md index 186667ba2f9..52e50a01a45 100644 --- a/docs/Operators.md +++ b/docs/Operators.md @@ -47,7 +47,7 @@ Data-Juicer 中的算子分为以下 8 种类型。 | [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 | | [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 | | [mapper](#mapper) | 123 | Edits and transforms samples. 对数据样本进行编辑和转换。 | -| [pipeline](#pipeline) | 2 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 | +| [pipeline](#pipeline) | 3 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 | | [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 | All the specific operators are listed below, each featured with several capability tags. @@ -310,6 +310,7 @@ All the specific operators are listed below, each featured with several capabili | Operator 算子 | Tags 标签 | Description 描述 | Details 详情 | Reference 参考 | |----------|------|-------------|-------------|-------------| | llm_inference_with_ray_vllm_pipeline | 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - | +| repartition_pipeline | 💻CPU 🟡Beta | Repartition a Ray Dataset into a target number of blocks. 将射线数据集重新分区为目标数量的块。 | [info](operators/pipeline/repartition_pipeline.md) | - | | vlm_inference_with_ray_vllm_pipeline | 🏞Image 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - | ## selector diff --git a/docs/operators/pipeline/repartition_pipeline.md b/docs/operators/pipeline/repartition_pipeline.md new file mode 100644 index 00000000000..c2971fb696c --- /dev/null +++ b/docs/operators/pipeline/repartition_pipeline.md @@ -0,0 +1,26 @@ +# repartition_pipeline + +Repartition a Ray Dataset into a target number of blocks. + +将 Ray Dataset 重新分区到指定 block 数。 + +This is a Ray-only dataset-level operator. It changes the number of Ray Dataset +blocks and fails fast when used with the local executor because local datasets +do not expose Ray Dataset blocks. + +这是一个仅适用于 Ray 的数据集级算子。它会调整 Ray Dataset 的 block 数;如果在本地执行器中使用,会直接报错,因为本地数据集没有 Ray Dataset blocks。 + +Type 算子类型: **pipeline** + +Tags 标签: ray, cpu + +## 🔧 Parameter Configuration 参数配置 +| name 参数名 | type 类型 | default 默认值 | desc 说明 | +|--------|------|--------|------| +| `num_blocks` | | `1` | Target number of Ray Dataset blocks. | +| `shuffle` | | `False` | Whether to shuffle records during repartition. | + +## 🔗 related links 相关链接 +- [source code 源代码](../../../data_juicer/ops/pipeline/repartition_pipeline.py) +- [unit test 单元测试](../../../tests/ops/pipeline/test_repartition_pipeline.py) +- [Return operator list 返回算子列表](../../Operators.md) diff --git a/tests/ops/pipeline/test_repartition_pipeline.py b/tests/ops/pipeline/test_repartition_pipeline.py new file mode 100644 index 00000000000..5f96271841b --- /dev/null +++ b/tests/ops/pipeline/test_repartition_pipeline.py @@ -0,0 +1,82 @@ +import importlib +import unittest + +import pyarrow as pa + +_register_extension_type = pa.register_extension_type + + +def _register_extension_type_once(extension_type): + try: + _register_extension_type(extension_type) + except pa.ArrowKeyError: + if not extension_type.extension_name.startswith("datasets.features.features."): + raise + + +pa.register_extension_type = _register_extension_type_once + +NestedDataset = importlib.import_module("data_juicer.core.data").NestedDataset +load_ops = importlib.import_module("data_juicer.ops.load").load_ops +RepartitionPipeline = importlib.import_module("data_juicer.ops.pipeline.repartition_pipeline").RepartitionPipeline + +pa.register_extension_type = _register_extension_type + + +class FakeRayDataset: + def __init__(self): + self.repartition_kwargs = None + + def repartition(self, **kwargs): + self.repartition_kwargs = kwargs + return self + + +class RepartitionPipelineTest(unittest.TestCase): + def test_ray_dataset_repartitions_without_shuffle_by_default(self): + dataset = FakeRayDataset() + + output = RepartitionPipeline(num_blocks=128).run(dataset) + + self.assertIs(output, dataset) + self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 128, "shuffle": False}) + + def test_defaults_to_one_block(self): + dataset = FakeRayDataset() + + output = RepartitionPipeline().run(dataset) + + self.assertIs(output, dataset) + self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 1, "shuffle": False}) + + def test_ray_dataset_repartitions_with_shuffle(self): + dataset = FakeRayDataset() + + output = RepartitionPipeline(num_blocks=64, shuffle=True).run(dataset) + + self.assertIs(output, dataset) + self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 64, "shuffle": True}) + + def test_local_nested_dataset_fails_fast(self): + dataset = NestedDataset.from_list([{"id": 1}]) + + with self.assertRaisesRegex(RuntimeError, "requires Ray executor"): + RepartitionPipeline(num_blocks=2).run(dataset) + + def test_constructor_validates_num_blocks(self): + for invalid_num_blocks in [0, -1, True, 1.5, "2"]: + with self.subTest(num_blocks=invalid_num_blocks): + with self.assertRaisesRegex(ValueError, "num_blocks"): + RepartitionPipeline(num_blocks=invalid_num_blocks) + + def test_load_ops_can_load_repartition_pipeline(self): + ops = load_ops([{"repartition_pipeline": {"num_blocks": 32, "shuffle": True}}]) + + self.assertEqual(len(ops), 1) + self.assertIsInstance(ops[0], RepartitionPipeline) + self.assertEqual(ops[0].num_blocks, 32) + self.assertTrue(ops[0].shuffle) + + +if __name__ == "__main__": + unittest.main() From b3f45d6bb3e11dc9d56dc7e03c30a12fcef7f09a Mon Sep 17 00:00:00 2001 From: "guohongyu.7" Date: Tue, 26 May 2026 18:27:42 +0800 Subject: [PATCH 2/2] refactor: rename Ray repartition pipeline --- data_juicer/config/config_all.yaml | 2 +- data_juicer/ops/pipeline/__init__.py | 4 ++-- ...ipeline.py => ray_repartition_pipeline.py} | 8 ++++--- docs/Operators.md | 2 +- ...ipeline.md => ray_repartition_pipeline.md} | 6 ++--- ...ne.py => test_ray_repartition_pipeline.py} | 22 ++++++++++--------- 6 files changed, 24 insertions(+), 20 deletions(-) rename data_juicer/ops/pipeline/{repartition_pipeline.py => ray_repartition_pipeline.py} (84%) rename docs/operators/pipeline/{repartition_pipeline.md => ray_repartition_pipeline.md} (82%) rename tests/ops/pipeline/{test_repartition_pipeline.py => test_ray_repartition_pipeline.py} (73%) diff --git a/data_juicer/config/config_all.yaml b/data_juicer/config/config_all.yaml index a66b2f9ad3a..22e4e645c8a 100644 --- a/data_juicer/config/config_all.yaml +++ b/data_juicer/config/config_all.yaml @@ -700,7 +700,7 @@ process: - whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace. # Pipeline ops - - repartition_pipeline: # repartition a Ray Dataset into a target number of blocks. + - ray_repartition_pipeline: # repartition a Ray Dataset into a target number of blocks. num_blocks: 1 # Target number of Ray Dataset blocks. shuffle: False # Whether to shuffle records during repartition. diff --git a/data_juicer/ops/pipeline/__init__.py b/data_juicer/ops/pipeline/__init__.py index ef5e60f375b..0bc9de3f10b 100644 --- a/data_juicer/ops/pipeline/__init__.py +++ b/data_juicer/ops/pipeline/__init__.py @@ -1,5 +1,5 @@ from .llm_inference_with_ray_vllm_pipeline import LLMRayVLLMEnginePipeline -from .repartition_pipeline import RepartitionPipeline +from .ray_repartition_pipeline import RayRepartitionPipeline from .vlm_inference_with_ray_vllm_pipeline import VLMRayVLLMEnginePipeline -__all__ = ["LLMRayVLLMEnginePipeline", "RepartitionPipeline", "VLMRayVLLMEnginePipeline"] +__all__ = ["LLMRayVLLMEnginePipeline", "RayRepartitionPipeline", "VLMRayVLLMEnginePipeline"] diff --git a/data_juicer/ops/pipeline/repartition_pipeline.py b/data_juicer/ops/pipeline/ray_repartition_pipeline.py similarity index 84% rename from data_juicer/ops/pipeline/repartition_pipeline.py rename to data_juicer/ops/pipeline/ray_repartition_pipeline.py index c5850aa7194..d288b7cc748 100644 --- a/data_juicer/ops/pipeline/repartition_pipeline.py +++ b/data_juicer/ops/pipeline/ray_repartition_pipeline.py @@ -1,10 +1,10 @@ from data_juicer.ops.base_op import OPERATORS, Pipeline -OP_NAME = "repartition_pipeline" +OP_NAME = "ray_repartition_pipeline" @OPERATORS.register_module(OP_NAME) -class RepartitionPipeline(Pipeline): +class RayRepartitionPipeline(Pipeline): """Repartition a Ray Dataset into a target number of blocks. This operator performs dataset-level block repartitioning through Ray @@ -35,5 +35,7 @@ def run(self, dataset, *, exporter=None, tracer=None): from data_juicer.core.data import NestedDataset if isinstance(dataset, NestedDataset): - raise RuntimeError("repartition_pipeline requires Ray executor because local datasets do not have blocks") + raise RuntimeError( + "ray_repartition_pipeline requires Ray executor because local datasets do not have blocks" + ) return dataset.repartition(num_blocks=self.num_blocks, shuffle=self.shuffle) diff --git a/docs/Operators.md b/docs/Operators.md index 52e50a01a45..0eed74d69a4 100644 --- a/docs/Operators.md +++ b/docs/Operators.md @@ -310,7 +310,7 @@ All the specific operators are listed below, each featured with several capabili | Operator 算子 | Tags 标签 | Description 描述 | Details 详情 | Reference 参考 | |----------|------|-------------|-------------|-------------| | llm_inference_with_ray_vllm_pipeline | 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - | -| repartition_pipeline | 💻CPU 🟡Beta | Repartition a Ray Dataset into a target number of blocks. 将射线数据集重新分区为目标数量的块。 | [info](operators/pipeline/repartition_pipeline.md) | - | +| ray_repartition_pipeline | 💻CPU 🟡Beta | Repartition a Ray Dataset into a target number of blocks. 将射线数据集重新分区为目标数量的块。 | [info](operators/pipeline/ray_repartition_pipeline.md) | - | | vlm_inference_with_ray_vllm_pipeline | 🏞Image 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - | ## selector diff --git a/docs/operators/pipeline/repartition_pipeline.md b/docs/operators/pipeline/ray_repartition_pipeline.md similarity index 82% rename from docs/operators/pipeline/repartition_pipeline.md rename to docs/operators/pipeline/ray_repartition_pipeline.md index c2971fb696c..2eb6ab83da6 100644 --- a/docs/operators/pipeline/repartition_pipeline.md +++ b/docs/operators/pipeline/ray_repartition_pipeline.md @@ -1,4 +1,4 @@ -# repartition_pipeline +# ray_repartition_pipeline Repartition a Ray Dataset into a target number of blocks. @@ -21,6 +21,6 @@ Tags 标签: ray, cpu | `shuffle` | | `False` | Whether to shuffle records during repartition. | ## 🔗 related links 相关链接 -- [source code 源代码](../../../data_juicer/ops/pipeline/repartition_pipeline.py) -- [unit test 单元测试](../../../tests/ops/pipeline/test_repartition_pipeline.py) +- [source code 源代码](../../../data_juicer/ops/pipeline/ray_repartition_pipeline.py) +- [unit test 单元测试](../../../tests/ops/pipeline/test_ray_repartition_pipeline.py) - [Return operator list 返回算子列表](../../Operators.md) diff --git a/tests/ops/pipeline/test_repartition_pipeline.py b/tests/ops/pipeline/test_ray_repartition_pipeline.py similarity index 73% rename from tests/ops/pipeline/test_repartition_pipeline.py rename to tests/ops/pipeline/test_ray_repartition_pipeline.py index 5f96271841b..2b328d83469 100644 --- a/tests/ops/pipeline/test_repartition_pipeline.py +++ b/tests/ops/pipeline/test_ray_repartition_pipeline.py @@ -18,7 +18,9 @@ def _register_extension_type_once(extension_type): NestedDataset = importlib.import_module("data_juicer.core.data").NestedDataset load_ops = importlib.import_module("data_juicer.ops.load").load_ops -RepartitionPipeline = importlib.import_module("data_juicer.ops.pipeline.repartition_pipeline").RepartitionPipeline +RayRepartitionPipeline = importlib.import_module( + "data_juicer.ops.pipeline.ray_repartition_pipeline" +).RayRepartitionPipeline pa.register_extension_type = _register_extension_type @@ -32,11 +34,11 @@ def repartition(self, **kwargs): return self -class RepartitionPipelineTest(unittest.TestCase): +class RayRepartitionPipelineTest(unittest.TestCase): def test_ray_dataset_repartitions_without_shuffle_by_default(self): dataset = FakeRayDataset() - output = RepartitionPipeline(num_blocks=128).run(dataset) + output = RayRepartitionPipeline(num_blocks=128).run(dataset) self.assertIs(output, dataset) self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 128, "shuffle": False}) @@ -44,7 +46,7 @@ def test_ray_dataset_repartitions_without_shuffle_by_default(self): def test_defaults_to_one_block(self): dataset = FakeRayDataset() - output = RepartitionPipeline().run(dataset) + output = RayRepartitionPipeline().run(dataset) self.assertIs(output, dataset) self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 1, "shuffle": False}) @@ -52,7 +54,7 @@ def test_defaults_to_one_block(self): def test_ray_dataset_repartitions_with_shuffle(self): dataset = FakeRayDataset() - output = RepartitionPipeline(num_blocks=64, shuffle=True).run(dataset) + output = RayRepartitionPipeline(num_blocks=64, shuffle=True).run(dataset) self.assertIs(output, dataset) self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 64, "shuffle": True}) @@ -61,19 +63,19 @@ def test_local_nested_dataset_fails_fast(self): dataset = NestedDataset.from_list([{"id": 1}]) with self.assertRaisesRegex(RuntimeError, "requires Ray executor"): - RepartitionPipeline(num_blocks=2).run(dataset) + RayRepartitionPipeline(num_blocks=2).run(dataset) def test_constructor_validates_num_blocks(self): for invalid_num_blocks in [0, -1, True, 1.5, "2"]: with self.subTest(num_blocks=invalid_num_blocks): with self.assertRaisesRegex(ValueError, "num_blocks"): - RepartitionPipeline(num_blocks=invalid_num_blocks) + RayRepartitionPipeline(num_blocks=invalid_num_blocks) - def test_load_ops_can_load_repartition_pipeline(self): - ops = load_ops([{"repartition_pipeline": {"num_blocks": 32, "shuffle": True}}]) + def test_load_ops_can_load_ray_repartition_pipeline(self): + ops = load_ops([{"ray_repartition_pipeline": {"num_blocks": 32, "shuffle": True}}]) self.assertEqual(len(ops), 1) - self.assertIsInstance(ops[0], RepartitionPipeline) + self.assertIsInstance(ops[0], RayRepartitionPipeline) self.assertEqual(ops[0].num_blocks, 32) self.assertTrue(ops[0].shuffle)