Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions data_juicer/core/data/ray_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from data_juicer.core.data import DJDataset
from data_juicer.core.data.schema import Schema
from data_juicer.core.tracer import should_trace_op
from data_juicer.ops import Deduplicator, Filter, Mapper, Pipeline
from data_juicer.ops import Deduplicator, Mapper, Pipeline
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The Filter operator is removed from imports, but its logic is still used (and needed) in _run_single_op. This should be re-added to fix the broken logic for handling Filter operators.

Suggested change
from data_juicer.ops import Deduplicator, Mapper, Pipeline
from data_juicer.ops import Deduplicator, Filter, Mapper, Pipeline

from data_juicer.ops.base_op import DEFAULT_BATCH_SIZE, TAGGING_OPS
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import is_remote_path
Expand Down Expand Up @@ -231,7 +231,6 @@ def process_batch_arrow(table: pyarrow.Table):
# Restore original process method
if tracer and should_trace_op(tracer, op._name) and original_process:
op.process = original_process
elif isinstance(op, Filter):
# Use cached_columns instead of self.data.columns() to avoid breaking pipeline
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Removing elif isinstance(op, Filter): merges the logic for Filter operators into the Mapper operator block. This will cause an AttributeError when a Mapper op is processed, as Mapper does not have methods like compute_stats which are specific to Filter. The separate logic for Mapper and Filter should be restored.

            elif isinstance(op, Filter):
                # Use cached_columns instead of self.data.columns() to avoid breaking pipeline

if Fields.stats not in cached_columns:

Expand Down
4 changes: 4 additions & 0 deletions data_juicer/ops/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from .text_chunk_mapper import TextChunkMapper
from .text_tagging_by_prompt_mapper import TextTaggingByPromptMapper
from .vggt_mapper import VggtMapper
from .video_calibration_mapper import VideoCalibrationMapper
from .video_captioning_from_audio_mapper import VideoCaptioningFromAudioMapper
from .video_captioning_from_frames_mapper import VideoCaptioningFromFramesMapper
from .video_captioning_from_summarizer_mapper import VideoCaptioningFromSummarizerMapper
Expand All @@ -97,6 +98,7 @@
from .video_resize_aspect_ratio_mapper import VideoResizeAspectRatioMapper
from .video_resize_resolution_mapper import VideoResizeResolutionMapper
from .video_split_by_duration_mapper import VideoSplitByDurationMapper
from .video_split_by_frame_mapper import VideoSplitByFrameMapper
from .video_split_by_key_frame_mapper import VideoSplitByKeyFrameMapper
from .video_split_by_scene_mapper import VideoSplitBySceneMapper
from .video_tagging_from_audio_mapper import VideoTaggingFromAudioMapper
Expand Down Expand Up @@ -183,6 +185,7 @@
"TextChunkMapper",
"TextTaggingByPromptMapper",
"VggtMapper",
"VideoCalibrationMapper",
"VideoCaptioningFromAudioMapper",
"VideoCaptioningFromFramesMapper",
"VideoCaptioningFromSummarizerMapper",
Expand All @@ -198,6 +201,7 @@
"VideoResizeAspectRatioMapper",
"VideoResizeResolutionMapper",
"VideoSplitByDurationMapper",
"VideoSplitByFrameMapper",
"VideoSplitByKeyFrameMapper",
"VideoSplitBySceneMapper",
"VideoTaggingFromAudioMapper",
Expand Down
292 changes: 292 additions & 0 deletions data_juicer/ops/mapper/video_calibration_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
import argparse
import logging
import os
import sys
from typing import Optional

import numpy as np

from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE
from data_juicer.utils.constant import Fields
from data_juicer.utils.lazy_loader import LazyLoader

from ..base_op import OPERATORS, Mapper

logger = logging.getLogger(__name__)

torch = LazyLoader("torch")
cv2 = LazyLoader("cv2", "opencv-python")

OP_NAME = "video_calibration_mapper"


@OPERATORS.register_module(OP_NAME)
class VideoCalibrationMapper(Mapper):
"""
Extract camera intrinsics from videos using DroidCalib.

**Notice**: This operator will download the DroidCalib component from
GitHub at runtime. This component follows the AGPL-3.0 license, please
be aware for commercial use.
"""

_accelerator = "cuda"

def __init__(
self,
weights_path: Optional[str] = None,
image_size: list = [384, 512],
stride: int = 2,
max_frames: int = 300,
buffer: int = 1024,
beta: float = 0.3,
filter_thresh: float = 2.4,
warmup: int = 8,
keyframe_thresh: float = 4.0,
frontend_thresh: float = 16.0,
frontend_window: int = 25,
frontend_radius: int = 2,
frontend_nms: int = 1,
backend_thresh: float = 22.0,
backend_radius: int = 2,
backend_nms: int = 3,
upsample: bool = False,
disable_vis: bool = True,
verbose: bool = False,
*args,
**kwargs,
):
"""
Initialization method.

:param weights_path: Path to the model weights.
:param image_size: Target image size [height, width].
:param stride: Frame stride.
:param max_frames: Maximum number of frames to process.
:param buffer: Buffer size for Droid.
:param beta: Weight for translation / rotation components of flow.
:param filter_thresh: Motion threshold before considering new keyframe.
:param warmup: Number of warmup frames.
:param keyframe_thresh: Threshold to create a new keyframe.
:param frontend_thresh: Add edges between frames within this distance.
:param frontend_window: Frontend optimization window.
:param frontend_radius: Force edges between frames within radius.
:param frontend_nms: Non-maximal suppression of edges.
:param backend_thresh: Backend threshold.
:param backend_radius: Backend radius.
:param backend_nms: Backend NMS.
:param upsample: Whether to upsample.
:param disable_vis: Whether to disable visualization.
"""
super().__init__(*args, **kwargs)

self.verbose = verbose
self._deps_ready = False

self.droid_calib_path = os.path.join(DATA_JUICER_ASSETS_CACHE, "DroidCalib")
self.droid_slam_path = os.path.join(self.droid_calib_path, "droid_slam")

# Dynamic path append (best-effort; also repeated at runtime in workers)
if os.path.exists(self.droid_slam_path) and self.droid_slam_path not in sys.path:
sys.path.append(self.droid_slam_path)

self.weights_path = weights_path
if self.weights_path is None:
self.weights_path = os.path.join(self.droid_calib_path, "droidcalib.pth")

self.image_size = image_size
self.stride = stride
self.max_frames = max_frames

# Droid args
self.droid_args = argparse.Namespace()
self.droid_args.weights = self.weights_path
self.droid_args.buffer = buffer
self.droid_args.image_size = image_size
self.droid_args.beta = beta
self.droid_args.filter_thresh = filter_thresh
self.droid_args.warmup = warmup
self.droid_args.keyframe_thresh = keyframe_thresh
self.droid_args.frontend_thresh = frontend_thresh
self.droid_args.frontend_window = frontend_window
self.droid_args.frontend_radius = frontend_radius
self.droid_args.frontend_nms = frontend_nms
self.droid_args.backend_thresh = backend_thresh
self.droid_args.backend_radius = backend_radius
self.droid_args.backend_nms = backend_nms
self.droid_args.upsample = upsample
self.droid_args.disable_vis = disable_vis
self.droid_args.stereo = False
self.droid_args.camera_model = "pinhole" # Default to pinhole
self.droid_args.opt_intr = True

def _ensure_droidcalib_ready(self) -> bool:
"""Ensure DroidCalib is importable in the *current process*.

This matters because `Dataset.map(num_proc>1)` may execute in child
processes where `sys.path` changes from `__init__` are not present.
"""

if self._deps_ready:
return True

# Ensure droid_slam is on sys.path in this process
self.droid_slam_path = os.path.join(self.droid_calib_path, "droid_slam")
if os.path.exists(self.droid_slam_path) and self.droid_slam_path not in sys.path:
sys.path.append(self.droid_slam_path)

# Try import first
try:
import droid # noqa: F401

self._deps_ready = True
return True
except Exception as e:
if self.verbose:
print(
f"Warning: DroidCalib not importable: {e}. "
"Please run the installation script `demos/video_calibration/install_droidcalib.sh` "
"to install the required dependencies."
)
return False

def _image_stream(self, video_path):
"""
Generator that yields (t, image, intrinsics, size_factor)
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return

# Initial calibration guess (center of image)
w0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h0 = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# fx, fy, cx, cy
calib = np.array([(w0 + h0) / 2, (w0 + h0) / 2, w0 / 2, h0 / 2])
fx, fy, cx, cy = calib

ht, wd = self.image_size # Target size [h, w]

t = 0
frame_idx = 0

while cap.isOpened():
ret, image = cap.read()
if not ret:
break

if frame_idx % self.stride != 0:
frame_idx += 1
continue

if self.max_frames and t >= self.max_frames:
break

h0, w0, _ = image.shape

# Resize logic from demo.py
# h1 = int(h0 * np.sqrt((ht * wd) / (h0 * w0)))
# w1 = int(w0 * np.sqrt((ht * wd) / (h0 * w0)))
# Actually demo.py logic seems to try to maintain aspect ratio but target specific area?
# Let's stick to demo.py logic
ratio = np.sqrt((ht * wd) / (h0 * w0))
h1 = int(h0 * ratio)
w1 = int(w0 * ratio)

image = cv2.resize(image, (w1, h1))
image = image[: h1 - h1 % 8, : w1 - w1 % 8] # Crop to be divisible by 8

image_tensor = torch.as_tensor(image).permute(2, 0, 1)

intrinsics = torch.as_tensor([fx, fy, cx, cy])

# Adjust intrinsics for resize
h_final, w_final = image.shape[:2]
size_factor = [(w_final / w0), (h_final / h0)]
intrinsics[0::2] *= size_factor[0]
intrinsics[1::2] *= size_factor[1]

yield t, image_tensor[None], intrinsics, size_factor

t += 1
frame_idx += 1

cap.release()

def _process_video_file(self, video_path):
if not self._ensure_droidcalib_ready():
return None

try:
from droid import Droid
except Exception as e:
if self.verbose:
print(f"Warning: Failed to import Droid after ensuring deps: {e}")
return None

if not os.path.exists(video_path):
return None

# Let's create a generator
stream = self._image_stream(video_path)

droid = None
sf = None # size factor
intr_est_list = None

# try:
for i in range(1):
for t, image, intrinsics, size_factor in stream:
if droid is None:
# Update args with actual image size
self.droid_args.image_size = [image.shape[2], image.shape[3]]
droid = Droid(self.droid_args)

droid.track(t, image, intrinsics=intrinsics)
sf = size_factor

if droid is not None:
# Terminate and get results
# We need to pass the stream again for terminate?
# demo.py: droid.terminate(image_stream(...))
# It seems terminate does a final BA pass using the stream?
# Let's recreate stream
stream_second_pass = self._image_stream(video_path)
traj_est, intr_est = droid.terminate(stream_second_pass)

# Rescale intrinsics back to original resolution
if sf:
intr_est = intr_est.copy()
intr_est[0:4:2] /= sf[0]
intr_est[1:4:2] /= sf[1]

intr_est_list = intr_est.tolist()

# except Exception as e:
# # Log error or just skip
# print(f"Error processing video {video_path}: {e}")
# finally:
# Cleanup
if droid:
del droid
torch.cuda.empty_cache()
Comment on lines +239 to +273
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The resource cleanup logic (e.g., del droid, torch.cuda.empty_cache()) is not guaranteed to run if an exception occurs during video processing. It's better to wrap the processing logic in a try...finally block to ensure resources are always released. The for i in range(1): loop is unnecessary and can be removed. Also, consider using the logger for error messages instead of print.

        try:
            for t, image, intrinsics, size_factor in stream:
                if droid is None:
                    # Update args with actual image size
                    self.droid_args.image_size = [image.shape[2], image.shape[3]]
                    droid = Droid(self.droid_args)

                droid.track(t, image, intrinsics=intrinsics)
                sf = size_factor

            if droid is not None:
                # Terminate and get results
                stream_second_pass = self._image_stream(video_path)
                _, intr_est = droid.terminate(stream_second_pass)

                # Rescale intrinsics back to original resolution
                if sf:
                    intr_est = intr_est.copy()
                    intr_est[0:4:2] /= sf[0]
                    intr_est[1:4:2] /= sf[1]

                intr_est_list = intr_est.tolist()
        except Exception as e:
            if self.verbose:
                logger.warning(f'Error processing video {video_path}: {e}')
        finally:
            # Cleanup
            if droid:
                del droid
            torch.cuda.empty_cache()


return intr_est_list

def process_single(self, sample, rank=None):
video_paths = sample[self.video_key]
if isinstance(video_paths, str):
video_paths = [video_paths]

intrinsics_results = []
for video_path in video_paths:
res = self._process_video_file(video_path)
intrinsics_results.append(res)

if Fields.meta not in sample:
sample[Fields.meta] = {}

sample[Fields.meta]["camera_intrinsics"] = intrinsics_results

return sample
Loading
Loading