Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions data_juicer/config/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,11 @@ process:
save_visualization_dir: None # The path for saving visualization results.
- whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace.

# Pipeline ops
- ray_repartition_pipeline: # repartition a Ray Dataset into a target number of blocks.
num_blocks: 1 # Target number of Ray Dataset blocks.
shuffle: False # Whether to shuffle records during repartition.

# Filter ops
- alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range.
tokenization: false # whether to count the ratio of alphanumeric to the total number of tokens.
Expand Down
3 changes: 2 additions & 1 deletion data_juicer/ops/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .llm_inference_with_ray_vllm_pipeline import LLMRayVLLMEnginePipeline
from .ray_repartition_pipeline import RayRepartitionPipeline
from .vlm_inference_with_ray_vllm_pipeline import VLMRayVLLMEnginePipeline

__all__ = ["LLMRayVLLMEnginePipeline", "VLMRayVLLMEnginePipeline"]
__all__ = ["LLMRayVLLMEnginePipeline", "RayRepartitionPipeline", "VLMRayVLLMEnginePipeline"]
41 changes: 41 additions & 0 deletions data_juicer/ops/pipeline/ray_repartition_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from data_juicer.ops.base_op import OPERATORS, Pipeline

OP_NAME = "ray_repartition_pipeline"


@OPERATORS.register_module(OP_NAME)
class RayRepartitionPipeline(Pipeline):
"""Repartition a Ray Dataset into a target number of blocks.

This operator performs dataset-level block repartitioning through Ray
Dataset's `repartition` API. It is intended for Ray executor pipelines only
because local datasets do not expose Ray Dataset blocks.
"""

def __init__(
self,
num_blocks: int = 1,
shuffle: bool = False,
*args,
**kwargs,
):
"""
Initialization method.

:param num_blocks: target number of Ray Dataset blocks.
:param shuffle: whether to shuffle records during repartition.
"""
super().__init__(*args, **kwargs)
if not isinstance(num_blocks, int) or isinstance(num_blocks, bool) or num_blocks <= 0:
raise ValueError("num_blocks must be a positive integer")
self.num_blocks = num_blocks
self.shuffle = bool(shuffle)

def run(self, dataset, *, exporter=None, tracer=None):
from data_juicer.core.data import NestedDataset

if isinstance(dataset, NestedDataset):
raise RuntimeError(
"ray_repartition_pipeline requires Ray executor because local datasets do not have blocks"
)
return dataset.repartition(num_blocks=self.num_blocks, shuffle=self.shuffle)
3 changes: 2 additions & 1 deletion docs/Operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Data-Juicer 中的算子分为以下 8 种类型。
| [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 |
| [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 |
| [mapper](#mapper) | 123 | Edits and transforms samples. 对数据样本进行编辑和转换。 |
| [pipeline](#pipeline) | 2 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 |
| [pipeline](#pipeline) | 3 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 |
| [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 |

All the specific operators are listed below, each featured with several capability tags.
Expand Down Expand Up @@ -310,6 +310,7 @@ All the specific operators are listed below, each featured with several capabili
| Operator 算子 | Tags 标签 | Description 描述 | Details 详情 | Reference 参考 |
|----------|------|-------------|-------------|-------------|
| llm_inference_with_ray_vllm_pipeline | 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - |
| ray_repartition_pipeline | 💻CPU 🟡Beta | Repartition a Ray Dataset into a target number of blocks. 将射线数据集重新分区为目标数量的块。 | [info](operators/pipeline/ray_repartition_pipeline.md) | - |
| vlm_inference_with_ray_vllm_pipeline | 🏞Image 🚀GPU 🟡Beta | Pipeline to generate response using vLLM engine on Ray. 使用Ray上的vLLM引擎生成响应的管道。 | - | - |

## selector <a name="selector"/>
Expand Down
26 changes: 26 additions & 0 deletions docs/operators/pipeline/ray_repartition_pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# ray_repartition_pipeline

Repartition a Ray Dataset into a target number of blocks.

将 Ray Dataset 重新分区到指定 block 数。

This is a Ray-only dataset-level operator. It changes the number of Ray Dataset
blocks and fails fast when used with the local executor because local datasets
do not expose Ray Dataset blocks.

这是一个仅适用于 Ray 的数据集级算子。它会调整 Ray Dataset 的 block 数;如果在本地执行器中使用,会直接报错,因为本地数据集没有 Ray Dataset blocks。

Type 算子类型: **pipeline**

Tags 标签: ray, cpu

## 🔧 Parameter Configuration 参数配置
| name 参数名 | type 类型 | default 默认值 | desc 说明 |
|--------|------|--------|------|
| `num_blocks` | <class 'int'> | `1` | Target number of Ray Dataset blocks. |
| `shuffle` | <class 'bool'> | `False` | Whether to shuffle records during repartition. |

## 🔗 related links 相关链接
- [source code 源代码](../../../data_juicer/ops/pipeline/ray_repartition_pipeline.py)
- [unit test 单元测试](../../../tests/ops/pipeline/test_ray_repartition_pipeline.py)
- [Return operator list 返回算子列表](../../Operators.md)
84 changes: 84 additions & 0 deletions tests/ops/pipeline/test_ray_repartition_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import importlib
import unittest

import pyarrow as pa

_register_extension_type = pa.register_extension_type


def _register_extension_type_once(extension_type):
try:
_register_extension_type(extension_type)
except pa.ArrowKeyError:
if not extension_type.extension_name.startswith("datasets.features.features."):
raise


pa.register_extension_type = _register_extension_type_once

NestedDataset = importlib.import_module("data_juicer.core.data").NestedDataset
load_ops = importlib.import_module("data_juicer.ops.load").load_ops
RayRepartitionPipeline = importlib.import_module(
"data_juicer.ops.pipeline.ray_repartition_pipeline"
).RayRepartitionPipeline

pa.register_extension_type = _register_extension_type


class FakeRayDataset:
def __init__(self):
self.repartition_kwargs = None

def repartition(self, **kwargs):
self.repartition_kwargs = kwargs
return self


class RayRepartitionPipelineTest(unittest.TestCase):
def test_ray_dataset_repartitions_without_shuffle_by_default(self):
dataset = FakeRayDataset()

output = RayRepartitionPipeline(num_blocks=128).run(dataset)

self.assertIs(output, dataset)
self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 128, "shuffle": False})

def test_defaults_to_one_block(self):
dataset = FakeRayDataset()

output = RayRepartitionPipeline().run(dataset)

self.assertIs(output, dataset)
self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 1, "shuffle": False})

def test_ray_dataset_repartitions_with_shuffle(self):
dataset = FakeRayDataset()

output = RayRepartitionPipeline(num_blocks=64, shuffle=True).run(dataset)

self.assertIs(output, dataset)
self.assertEqual(dataset.repartition_kwargs, {"num_blocks": 64, "shuffle": True})

def test_local_nested_dataset_fails_fast(self):
dataset = NestedDataset.from_list([{"id": 1}])

with self.assertRaisesRegex(RuntimeError, "requires Ray executor"):
RayRepartitionPipeline(num_blocks=2).run(dataset)

def test_constructor_validates_num_blocks(self):
for invalid_num_blocks in [0, -1, True, 1.5, "2"]:
with self.subTest(num_blocks=invalid_num_blocks):
with self.assertRaisesRegex(ValueError, "num_blocks"):
RayRepartitionPipeline(num_blocks=invalid_num_blocks)

def test_load_ops_can_load_ray_repartition_pipeline(self):
ops = load_ops([{"ray_repartition_pipeline": {"num_blocks": 32, "shuffle": True}}])

self.assertEqual(len(ops), 1)
self.assertIsInstance(ops[0], RayRepartitionPipeline)
self.assertEqual(ops[0].num_blocks, 32)
self.assertTrue(ops[0].shuffle)


if __name__ == "__main__":
unittest.main()
Loading