diff --git a/data_juicer/config/config_all.yaml b/data_juicer/config/config_all.yaml
index cfe56a631b9..22e4e645c8a 100644
--- a/data_juicer/config/config_all.yaml
+++ b/data_juicer/config/config_all.yaml
@@ -699,6 +699,11 @@ process:
save_visualization_dir: None # The path for saving visualization results.
- whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace.
+ # Pipeline ops
+ - ray_repartition_pipeline: # repartition a Ray Dataset into a target number of blocks.
+ num_blocks: 1 # Target number of Ray Dataset blocks.
+ shuffle: False # Whether to shuffle records during repartition.
+
# Filter ops
- alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range.
tokenization: false # whether to count the ratio of alphanumeric to the total number of tokens.
diff --git a/data_juicer/ops/pipeline/__init__.py b/data_juicer/ops/pipeline/__init__.py
index fa64957e353..0bc9de3f10b 100644
--- a/data_juicer/ops/pipeline/__init__.py
+++ b/data_juicer/ops/pipeline/__init__.py
@@ -1,4 +1,5 @@
from .llm_inference_with_ray_vllm_pipeline import LLMRayVLLMEnginePipeline
+from .ray_repartition_pipeline import RayRepartitionPipeline
from .vlm_inference_with_ray_vllm_pipeline import VLMRayVLLMEnginePipeline
-__all__ = ["LLMRayVLLMEnginePipeline", "VLMRayVLLMEnginePipeline"]
+__all__ = ["LLMRayVLLMEnginePipeline", "RayRepartitionPipeline", "VLMRayVLLMEnginePipeline"]
diff --git a/data_juicer/ops/pipeline/ray_repartition_pipeline.py b/data_juicer/ops/pipeline/ray_repartition_pipeline.py
new file mode 100644
index 00000000000..d288b7cc748
--- /dev/null
+++ b/data_juicer/ops/pipeline/ray_repartition_pipeline.py
@@ -0,0 +1,41 @@
+from data_juicer.ops.base_op import OPERATORS, Pipeline
+
+OP_NAME = "ray_repartition_pipeline"
+
+
+@OPERATORS.register_module(OP_NAME)
+class RayRepartitionPipeline(Pipeline):
+ """Repartition a Ray Dataset into a target number of blocks.
+
+ This operator performs dataset-level block repartitioning through Ray
+ Dataset's `repartition` API. It is intended for Ray executor pipelines only
+ because local datasets do not expose Ray Dataset blocks.
+ """
+
+ def __init__(
+ self,
+ num_blocks: int = 1,
+ shuffle: bool = False,
+ *args,
+ **kwargs,
+ ):
+ """
+ Initialization method.
+
+ :param num_blocks: target number of Ray Dataset blocks.
+ :param shuffle: whether to shuffle records during repartition.
+ """
+ super().__init__(*args, **kwargs)
+ if not isinstance(num_blocks, int) or isinstance(num_blocks, bool) or num_blocks <= 0:
+ raise ValueError("num_blocks must be a positive integer")
+ self.num_blocks = num_blocks
+ self.shuffle = bool(shuffle)
+
+ def run(self, dataset, *, exporter=None, tracer=None):
+ from data_juicer.core.data import NestedDataset
+
+ if isinstance(dataset, NestedDataset):
+ raise RuntimeError(
+ "ray_repartition_pipeline requires Ray executor because local datasets do not have blocks"
+ )
+ return dataset.repartition(num_blocks=self.num_blocks, shuffle=self.shuffle)
diff --git a/docs/Operators.md b/docs/Operators.md
index 186667ba2f9..0eed74d69a4 100644
--- a/docs/Operators.md
+++ b/docs/Operators.md
@@ -47,7 +47,7 @@ Data-Juicer 中的算子分为以下 8 种类型。
| [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 |
| [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 |
| [mapper](#mapper) | 123 | Edits and transforms samples. 对数据样本进行编辑和转换。 |
-| [pipeline](#pipeline) | 2 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 |
+| [pipeline](#pipeline) | 3 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 |
| [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 |
All the specific operators are listed below, each featured with several capability tags.
@@ -310,6 +310,7 @@ All the specific operators are listed below, each featured with several capabili
| Operator 算子 | Tags 标签 | Description 描述 | Details 详情 | Reference 参考 |
|----------|------|-------------|-------------|-------------|
| llm_inference_with_ray_vllm_pipeline | 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - |
+| ray_repartition_pipeline | 💻CPU 🟡Beta | Repartition a Ray Dataset into a target number of blocks. 将射线数据集重新分区为目标数量的块。 | [info](operators/pipeline/ray_repartition_pipeline.md) | - |
| vlm_inference_with_ray_vllm_pipeline | 🏞Image 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - |
## selector
diff --git a/docs/operators/pipeline/ray_repartition_pipeline.md b/docs/operators/pipeline/ray_repartition_pipeline.md
new file mode 100644
index 00000000000..2eb6ab83da6
--- /dev/null
+++ b/docs/operators/pipeline/ray_repartition_pipeline.md
@@ -0,0 +1,26 @@
+# ray_repartition_pipeline
+
+Repartition a Ray Dataset into a target number of blocks.
+
+将 Ray Dataset 重新分区到指定 block 数。
+
+This is a Ray-only dataset-level operator. It changes the number of Ray Dataset
+blocks and fails fast when used with the local executor because local datasets
+do not expose Ray Dataset blocks.
+
+这是一个仅适用于 Ray 的数据集级算子。它会调整 Ray Dataset 的 block 数;如果在本地执行器中使用,会直接报错,因为本地数据集没有 Ray Dataset blocks。
+
+Type 算子类型: **pipeline**
+
+Tags 标签: ray, cpu
+
+## 🔧 Parameter Configuration 参数配置
+| name 参数名 | type 类型 | default 默认值 | desc 说明 |
+|--------|------|--------|------|
+| `num_blocks` | | `1` | Target number of Ray Dataset blocks. |
+| `shuffle` | | `False` | Whether to shuffle records during repartition. |
+
+## 🔗 related links 相关链接
+- [source code 源代码](../../../data_juicer/ops/pipeline/ray_repartition_pipeline.py)
+- [unit test 单元测试](../../../tests/ops/pipeline/test_ray_repartition_pipeline.py)
+- [Return operator list 返回算子列表](../../Operators.md)
diff --git a/tests/ops/pipeline/test_ray_repartition_pipeline.py b/tests/ops/pipeline/test_ray_repartition_pipeline.py
new file mode 100644
index 00000000000..2b328d83469
--- /dev/null
+++ b/tests/ops/pipeline/test_ray_repartition_pipeline.py
@@ -0,0 +1,84 @@
+import importlib
+import unittest
+
+import pyarrow as pa
+
+_register_extension_type = pa.register_extension_type
+
+
+def _register_extension_type_once(extension_type):
+ try:
+ _register_extension_type(extension_type)
+ except pa.ArrowKeyError:
+ if not extension_type.extension_name.startswith("datasets.features.features."):
+ raise
+
+
+pa.register_extension_type = _register_extension_type_once
+
+NestedDataset = importlib.import_module("data_juicer.core.data").NestedDataset
+load_ops = importlib.import_module("data_juicer.ops.load").load_ops
+RayRepartitionPipeline = importlib.import_module(
+ "data_juicer.ops.pipeline.ray_repartition_pipeline"
+).RayRepartitionPipeline
+
+pa.register_extension_type = _register_extension_type
+
+
+class FakeRayDataset:
+ def __init__(self):
+ self.repartition_kwargs = None
+
+ def repartition(self, **kwargs):
+ self.repartition_kwargs = kwargs
+ return self
+
+
+class RayRepartitionPipelineTest(unittest.TestCase):
+ def test_ray_dataset_repartitions_without_shuffle_by_default(self):
+ dataset = FakeRayDataset()
+
+ output = RayRepartitionPipeline(num_blocks=128).run(dataset)
+
+ self.assertIs(output, dataset)
+ self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 128, "shuffle": False})
+
+ def test_defaults_to_one_block(self):
+ dataset = FakeRayDataset()
+
+ output = RayRepartitionPipeline().run(dataset)
+
+ self.assertIs(output, dataset)
+ self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 1, "shuffle": False})
+
+ def test_ray_dataset_repartitions_with_shuffle(self):
+ dataset = FakeRayDataset()
+
+ output = RayRepartitionPipeline(num_blocks=64, shuffle=True).run(dataset)
+
+ self.assertIs(output, dataset)
+ self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 64, "shuffle": True})
+
+ def test_local_nested_dataset_fails_fast(self):
+ dataset = NestedDataset.from_list([{"id": 1}])
+
+ with self.assertRaisesRegex(RuntimeError, "requires Ray executor"):
+ RayRepartitionPipeline(num_blocks=2).run(dataset)
+
+ def test_constructor_validates_num_blocks(self):
+ for invalid_num_blocks in [0, -1, True, 1.5, "2"]:
+ with self.subTest(num_blocks=invalid_num_blocks):
+ with self.assertRaisesRegex(ValueError, "num_blocks"):
+ RayRepartitionPipeline(num_blocks=invalid_num_blocks)
+
+ def test_load_ops_can_load_ray_repartition_pipeline(self):
+ ops = load_ops([{"ray_repartition_pipeline": {"num_blocks": 32, "shuffle": True}}])
+
+ self.assertEqual(len(ops), 1)
+ self.assertIsInstance(ops[0], RayRepartitionPipeline)
+ self.assertEqual(ops[0].num_blocks, 32)
+ self.assertTrue(ops[0].shuffle)
+
+
+if __name__ == "__main__":
+ unittest.main()