Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 8 additions & 46 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import copy
import importlib.util
import json
import os
import shutil
Expand Down Expand Up @@ -51,51 +50,14 @@ def _generate_module_name(abs_path):


def load_custom_operators(paths):
"""Dynamically load custom operator modules or packages in the specified path."""
for path in paths:
abs_path = os.path.abspath(path)
if os.path.isfile(abs_path):
module_name = _generate_module_name(abs_path)
if module_name in sys.modules:
existing_path = sys.modules[module_name].__file__
raise RuntimeError(
f"Module '{module_name}' already loaded from '{existing_path}'. "
f"Conflict detected while loading '{abs_path}'."
)
try:
spec = importlib.util.spec_from_file_location(module_name, abs_path)
if spec is None:
raise RuntimeError(f"Failed to create spec for '{abs_path}'")
module = importlib.util.module_from_spec(spec)
# register the module first to avoid recursive import issues
sys.modules[module_name] = module
spec.loader.exec_module(module)
except Exception as e:
raise RuntimeError(f"Error loading '{abs_path}' as '{module_name}': {e}")

elif os.path.isdir(abs_path):
if not os.path.isfile(os.path.join(abs_path, "__init__.py")):
raise ValueError(f"Package directory '{abs_path}' must contain __init__.py")
package_name = os.path.basename(abs_path)
parent_dir = os.path.dirname(abs_path)
if package_name in sys.modules:
existing_path = sys.modules[package_name].__path__[0]
raise RuntimeError(
f"Package '{package_name}' already loaded from '{existing_path}'. "
f"Conflict detected while loading '{abs_path}'."
)
original_sys_path = sys.path.copy()
try:
sys.path.insert(0, parent_dir)
importlib.import_module(package_name)
# record the loading path of the package (for subsequent conflict detection)
sys.modules[package_name].__loaded_from__ = abs_path
except Exception as e:
raise RuntimeError(f"Error loading package '{abs_path}': {e}")
finally:
sys.path = original_sys_path
else:
raise ValueError(f"Path '{abs_path}' is neither a file nor a directory")
"""Dynamically load custom operator modules or packages in the specified path.

This is a re-export from ``data_juicer.utils.custom_op`` kept here for
backward compatibility.
"""
from data_juicer.utils.custom_op import load_custom_operators as _impl

_impl(paths)


def build_base_parser() -> ArgumentParser:
Expand Down
45 changes: 28 additions & 17 deletions data_juicer/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ def timing_context(description):

# yapf: disable
with timing_context('Importing operator modules'):
# 1. Built-in operators (registered via @OPERATORS.register_module decorators
# that fire as each sub-package is imported)
# 2. Persistent custom operators (loaded from ~/.data_juicer/custom_op.json;
# no-op when the registry file does not exist)
from data_juicer.utils.custom_op import (
load_persistent_custom_ops as _load_persistent,
)

from . import aggregator, deduplicator, filter, grouper, mapper, pipeline, selector
_load_persistent()
del _load_persistent

from .base_op import (
ATTRIBUTION_FILTERS,
NON_STATS_FILTERS,
Expand All @@ -39,21 +50,21 @@ def timing_context(description):
)

__all__ = [
'load_ops',
'Filter',
'Mapper',
'Deduplicator',
'Selector',
'Grouper',
'Aggregator',
'UNFORKABLE',
'NON_STATS_FILTERS',
'OPERATORS',
'TAGGING_OPS',
'Pipeline',
'OPEnvSpec',
'op_requirements_to_op_env_spec',
'OPEnvManager',
'analyze_lazy_loaded_requirements',
'analyze_lazy_loaded_requirements_for_code_file',
"load_ops",
"Filter",
"Mapper",
"Deduplicator",
"Selector",
"Grouper",
"Aggregator",
"UNFORKABLE",
"NON_STATS_FILTERS",
"OPERATORS",
"TAGGING_OPS",
"Pipeline",
"OPEnvSpec",
"op_requirements_to_op_env_spec",
"OPEnvManager",
"analyze_lazy_loaded_requirements",
"analyze_lazy_loaded_requirements_for_code_file",
]
46 changes: 41 additions & 5 deletions data_juicer/tools/DJ_mcp_granular_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import inspect
import os
import sys
from typing import Annotated, Optional
from typing import Annotated, Optional, get_type_hints

from pydantic import Field

Expand All @@ -13,6 +13,30 @@
fastmcp = LazyLoader("mcp.server.fastmcp", "mcp[cli]")


def resolve_signature_annotations(func, sig: inspect.Signature) -> inspect.Signature:
"""Resolve postponed/string annotations into real runtime types.

When a module uses ``from __future__ import annotations``, all
annotations are stored as strings. This helper calls
``typing.get_type_hints`` on the original callable to obtain the
real type objects and rebuilds the signature with them.
"""
try:
module = sys.modules.get(func.__module__, None) if hasattr(func, "__module__") else None
globalns = module.__dict__ if module else {}
hints = get_type_hints(func, globalns=globalns, localns=globalns)
except Exception:
hints = {}

new_params = []
for name, param in sig.parameters.items():
resolved_annotation = hints.get(name, param.annotation)
new_params.append(param.replace(annotation=resolved_annotation))

return_annotation = hints.get("return", sig.return_annotation)
return sig.replace(parameters=new_params, return_annotation=return_annotation)


# Dynamic MCP Tool Creation
def process_parameter(name: str, param: inspect.Parameter) -> inspect.Parameter:
"""
Expand All @@ -31,13 +55,18 @@ def create_operator_function(op, mcp):
This function dynamically creates a function that can be registered as an MCP tool,
with proper signature and documentation based on the operator's __init__ method.
"""
sig = op["sig"]
raw_sig = op["sig"]
init_func = op.get("init_func")
if init_func is not None:
sig = resolve_signature_annotations(init_func, raw_sig)
else:
sig = raw_sig
docstring = op["desc"]
param_docstring = op["param_desc"]

# Create new function signature with dataset_path as first parameter
# Consider adding other common parameters later, such as export_psth
new_parameters = [
fixed_params = [
inspect.Parameter("dataset_path", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=str),
inspect.Parameter(
"export_path",
Expand All @@ -51,11 +80,18 @@ def create_operator_function(op, mcp):
annotation=Optional[int],
default=None,
),
] + [
]
op_params = [
process_parameter(name, param)
for name, param in sig.parameters.items()
if name not in ("args", "kwargs", "self")
]
# Merge all params, then reorder: required (no default) first,
# optional (with default) second, to satisfy Python's signature rule.
all_params = fixed_params + op_params
required_params = [p for p in all_params if p.default is inspect.Parameter.empty]
optional_params = [p for p in all_params if p.default is not inspect.Parameter.empty]
new_parameters = required_params + optional_params
new_signature = sig.replace(parameters=new_parameters, return_annotation=str)

def func(*args, **kwargs):
Expand All @@ -66,7 +102,7 @@ def func(*args, **kwargs):
export_path = bound_arguments.arguments.pop("export_path")
dataset_path = bound_arguments.arguments.pop("dataset_path")
np = bound_arguments.arguments.pop("np")
args_dict = {k: v for k, v in bound_arguments.arguments.items() if v}
args_dict = {k: v for k, v in bound_arguments.arguments.items() if v is not None}

dj_cfg = {
"dataset_path": dataset_path,
Expand Down
Loading
Loading