diff --git a/data_juicer/core/data/ray_dataset.py b/data_juicer/core/data/ray_dataset.py index 8c8ab729e9..74d680074a 100644 --- a/data_juicer/core/data/ray_dataset.py +++ b/data_juicer/core/data/ray_dataset.py @@ -13,7 +13,7 @@ from data_juicer.core.data import DJDataset from data_juicer.core.data.schema import Schema from data_juicer.core.tracer import should_trace_op -from data_juicer.ops import Deduplicator, Filter, Mapper, Pipeline +from data_juicer.ops import Deduplicator, Mapper, Pipeline from data_juicer.ops.base_op import DEFAULT_BATCH_SIZE, TAGGING_OPS from data_juicer.utils.constant import Fields from data_juicer.utils.file_utils import is_remote_path @@ -231,7 +231,6 @@ def process_batch_arrow(table: pyarrow.Table): # Restore original process method if tracer and should_trace_op(tracer, op._name) and original_process: op.process = original_process - elif isinstance(op, Filter): # Use cached_columns instead of self.data.columns() to avoid breaking pipeline if Fields.stats not in cached_columns: diff --git a/data_juicer/ops/mapper/__init__.py b/data_juicer/ops/mapper/__init__.py index 82d7d29c41..ac4d031d45 100644 --- a/data_juicer/ops/mapper/__init__.py +++ b/data_juicer/ops/mapper/__init__.py @@ -82,6 +82,7 @@ from .text_chunk_mapper import TextChunkMapper from .text_tagging_by_prompt_mapper import TextTaggingByPromptMapper from .vggt_mapper import VggtMapper +from .video_calibration_mapper import VideoCalibrationMapper from .video_captioning_from_audio_mapper import VideoCaptioningFromAudioMapper from .video_captioning_from_frames_mapper import VideoCaptioningFromFramesMapper from .video_captioning_from_summarizer_mapper import VideoCaptioningFromSummarizerMapper @@ -97,6 +98,7 @@ from .video_resize_aspect_ratio_mapper import VideoResizeAspectRatioMapper from .video_resize_resolution_mapper import VideoResizeResolutionMapper from .video_split_by_duration_mapper import VideoSplitByDurationMapper +from .video_split_by_frame_mapper import VideoSplitByFrameMapper from .video_split_by_key_frame_mapper import VideoSplitByKeyFrameMapper from .video_split_by_scene_mapper import VideoSplitBySceneMapper from .video_tagging_from_audio_mapper import VideoTaggingFromAudioMapper @@ -183,6 +185,7 @@ "TextChunkMapper", "TextTaggingByPromptMapper", "VggtMapper", + "VideoCalibrationMapper", "VideoCaptioningFromAudioMapper", "VideoCaptioningFromFramesMapper", "VideoCaptioningFromSummarizerMapper", @@ -198,6 +201,7 @@ "VideoResizeAspectRatioMapper", "VideoResizeResolutionMapper", "VideoSplitByDurationMapper", + "VideoSplitByFrameMapper", "VideoSplitByKeyFrameMapper", "VideoSplitBySceneMapper", "VideoTaggingFromAudioMapper", diff --git a/data_juicer/ops/mapper/video_calibration_mapper.py b/data_juicer/ops/mapper/video_calibration_mapper.py new file mode 100644 index 0000000000..ca3b5767e3 --- /dev/null +++ b/data_juicer/ops/mapper/video_calibration_mapper.py @@ -0,0 +1,292 @@ +import argparse +import logging +import os +import sys +from typing import Optional + +import numpy as np + +from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE +from data_juicer.utils.constant import Fields +from data_juicer.utils.lazy_loader import LazyLoader + +from ..base_op import OPERATORS, Mapper + +logger = logging.getLogger(__name__) + +torch = LazyLoader("torch") +cv2 = LazyLoader("cv2", "opencv-python") + +OP_NAME = "video_calibration_mapper" + + +@OPERATORS.register_module(OP_NAME) +class VideoCalibrationMapper(Mapper): + """ + Extract camera intrinsics from videos using DroidCalib. + + **Notice**: This operator will download the DroidCalib component from + GitHub at runtime. This component follows the AGPL-3.0 license, please + be aware for commercial use. + """ + + _accelerator = "cuda" + + def __init__( + self, + weights_path: Optional[str] = None, + image_size: list = [384, 512], + stride: int = 2, + max_frames: int = 300, + buffer: int = 1024, + beta: float = 0.3, + filter_thresh: float = 2.4, + warmup: int = 8, + keyframe_thresh: float = 4.0, + frontend_thresh: float = 16.0, + frontend_window: int = 25, + frontend_radius: int = 2, + frontend_nms: int = 1, + backend_thresh: float = 22.0, + backend_radius: int = 2, + backend_nms: int = 3, + upsample: bool = False, + disable_vis: bool = True, + verbose: bool = False, + *args, + **kwargs, + ): + """ + Initialization method. + + :param weights_path: Path to the model weights. + :param image_size: Target image size [height, width]. + :param stride: Frame stride. + :param max_frames: Maximum number of frames to process. + :param buffer: Buffer size for Droid. + :param beta: Weight for translation / rotation components of flow. + :param filter_thresh: Motion threshold before considering new keyframe. + :param warmup: Number of warmup frames. + :param keyframe_thresh: Threshold to create a new keyframe. + :param frontend_thresh: Add edges between frames within this distance. + :param frontend_window: Frontend optimization window. + :param frontend_radius: Force edges between frames within radius. + :param frontend_nms: Non-maximal suppression of edges. + :param backend_thresh: Backend threshold. + :param backend_radius: Backend radius. + :param backend_nms: Backend NMS. + :param upsample: Whether to upsample. + :param disable_vis: Whether to disable visualization. + """ + super().__init__(*args, **kwargs) + + self.verbose = verbose + self._deps_ready = False + + self.droid_calib_path = os.path.join(DATA_JUICER_ASSETS_CACHE, "DroidCalib") + self.droid_slam_path = os.path.join(self.droid_calib_path, "droid_slam") + + # Dynamic path append (best-effort; also repeated at runtime in workers) + if os.path.exists(self.droid_slam_path) and self.droid_slam_path not in sys.path: + sys.path.append(self.droid_slam_path) + + self.weights_path = weights_path + if self.weights_path is None: + self.weights_path = os.path.join(self.droid_calib_path, "droidcalib.pth") + + self.image_size = image_size + self.stride = stride + self.max_frames = max_frames + + # Droid args + self.droid_args = argparse.Namespace() + self.droid_args.weights = self.weights_path + self.droid_args.buffer = buffer + self.droid_args.image_size = image_size + self.droid_args.beta = beta + self.droid_args.filter_thresh = filter_thresh + self.droid_args.warmup = warmup + self.droid_args.keyframe_thresh = keyframe_thresh + self.droid_args.frontend_thresh = frontend_thresh + self.droid_args.frontend_window = frontend_window + self.droid_args.frontend_radius = frontend_radius + self.droid_args.frontend_nms = frontend_nms + self.droid_args.backend_thresh = backend_thresh + self.droid_args.backend_radius = backend_radius + self.droid_args.backend_nms = backend_nms + self.droid_args.upsample = upsample + self.droid_args.disable_vis = disable_vis + self.droid_args.stereo = False + self.droid_args.camera_model = "pinhole" # Default to pinhole + self.droid_args.opt_intr = True + + def _ensure_droidcalib_ready(self) -> bool: + """Ensure DroidCalib is importable in the *current process*. + + This matters because `Dataset.map(num_proc>1)` may execute in child + processes where `sys.path` changes from `__init__` are not present. + """ + + if self._deps_ready: + return True + + # Ensure droid_slam is on sys.path in this process + self.droid_slam_path = os.path.join(self.droid_calib_path, "droid_slam") + if os.path.exists(self.droid_slam_path) and self.droid_slam_path not in sys.path: + sys.path.append(self.droid_slam_path) + + # Try import first + try: + import droid # noqa: F401 + + self._deps_ready = True + return True + except Exception as e: + if self.verbose: + print( + f"Warning: DroidCalib not importable: {e}. " + "Please run the installation script `demos/video_calibration/install_droidcalib.sh` " + "to install the required dependencies." + ) + return False + + def _image_stream(self, video_path): + """ + Generator that yields (t, image, intrinsics, size_factor) + """ + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + return + + # Initial calibration guess (center of image) + w0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h0 = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # fx, fy, cx, cy + calib = np.array([(w0 + h0) / 2, (w0 + h0) / 2, w0 / 2, h0 / 2]) + fx, fy, cx, cy = calib + + ht, wd = self.image_size # Target size [h, w] + + t = 0 + frame_idx = 0 + + while cap.isOpened(): + ret, image = cap.read() + if not ret: + break + + if frame_idx % self.stride != 0: + frame_idx += 1 + continue + + if self.max_frames and t >= self.max_frames: + break + + h0, w0, _ = image.shape + + # Resize logic from demo.py + # h1 = int(h0 * np.sqrt((ht * wd) / (h0 * w0))) + # w1 = int(w0 * np.sqrt((ht * wd) / (h0 * w0))) + # Actually demo.py logic seems to try to maintain aspect ratio but target specific area? + # Let's stick to demo.py logic + ratio = np.sqrt((ht * wd) / (h0 * w0)) + h1 = int(h0 * ratio) + w1 = int(w0 * ratio) + + image = cv2.resize(image, (w1, h1)) + image = image[: h1 - h1 % 8, : w1 - w1 % 8] # Crop to be divisible by 8 + + image_tensor = torch.as_tensor(image).permute(2, 0, 1) + + intrinsics = torch.as_tensor([fx, fy, cx, cy]) + + # Adjust intrinsics for resize + h_final, w_final = image.shape[:2] + size_factor = [(w_final / w0), (h_final / h0)] + intrinsics[0::2] *= size_factor[0] + intrinsics[1::2] *= size_factor[1] + + yield t, image_tensor[None], intrinsics, size_factor + + t += 1 + frame_idx += 1 + + cap.release() + + def _process_video_file(self, video_path): + if not self._ensure_droidcalib_ready(): + return None + + try: + from droid import Droid + except Exception as e: + if self.verbose: + print(f"Warning: Failed to import Droid after ensuring deps: {e}") + return None + + if not os.path.exists(video_path): + return None + + # Let's create a generator + stream = self._image_stream(video_path) + + droid = None + sf = None # size factor + intr_est_list = None + + # try: + for i in range(1): + for t, image, intrinsics, size_factor in stream: + if droid is None: + # Update args with actual image size + self.droid_args.image_size = [image.shape[2], image.shape[3]] + droid = Droid(self.droid_args) + + droid.track(t, image, intrinsics=intrinsics) + sf = size_factor + + if droid is not None: + # Terminate and get results + # We need to pass the stream again for terminate? + # demo.py: droid.terminate(image_stream(...)) + # It seems terminate does a final BA pass using the stream? + # Let's recreate stream + stream_second_pass = self._image_stream(video_path) + traj_est, intr_est = droid.terminate(stream_second_pass) + + # Rescale intrinsics back to original resolution + if sf: + intr_est = intr_est.copy() + intr_est[0:4:2] /= sf[0] + intr_est[1:4:2] /= sf[1] + + intr_est_list = intr_est.tolist() + + # except Exception as e: + # # Log error or just skip + # print(f"Error processing video {video_path}: {e}") + # finally: + # Cleanup + if droid: + del droid + torch.cuda.empty_cache() + + return intr_est_list + + def process_single(self, sample, rank=None): + video_paths = sample[self.video_key] + if isinstance(video_paths, str): + video_paths = [video_paths] + + intrinsics_results = [] + for video_path in video_paths: + res = self._process_video_file(video_path) + intrinsics_results.append(res) + + if Fields.meta not in sample: + sample[Fields.meta] = {} + + sample[Fields.meta]["camera_intrinsics"] = intrinsics_results + + return sample diff --git a/data_juicer/ops/mapper/video_split_by_frame_mapper.py b/data_juicer/ops/mapper/video_split_by_frame_mapper.py new file mode 100644 index 0000000000..f862cbaf7f --- /dev/null +++ b/data_juicer/ops/mapper/video_split_by_frame_mapper.py @@ -0,0 +1,290 @@ +import copy +import math +import os +import re +import subprocess +from typing import Optional + +from data_juicer.utils.constant import Fields +from data_juicer.utils.file_utils import add_suffix_to_filename +from data_juicer.utils.lazy_loader import LazyLoader +from data_juicer.utils.mm_utils import SpecialTokens + +from ..base_op import OPERATORS, Mapper +from ..op_fusion import LOADED_VIDEOS + +cv2 = LazyLoader("cv2") + + +def create_replacer(replacements): + def replacer(match): + return replacements.pop(0) + + return replacer + + +OP_NAME = "video_split_by_frame_mapper" + + +@OPERATORS.register_module(OP_NAME) +@LOADED_VIDEOS.register_module(OP_NAME) +class VideoSplitByFrameMapper(Mapper): + """Splits videos into segments based on a specified frame count or duration. + + This operator splits each video in the dataset into smaller segments, each with a fixed + frame count (or duration). It supports overlapping segments. The original sample can be + kept or removed based on the `keep_original_sample` parameter. The generated video files + are saved in the specified directory or, if not provided, in the same directory as the + input files. + + - Splits videos into segments of a specified frame count or duration. + - Supports overlapping segments. + - Keeps or removes the original sample based on the `keep_original_sample` parameter. + - Saves the generated video files in the specified directory or the input file's + directory. + """ + + _batched_op = True + + def __init__( + self, + split_len: float = 10, + overlap_len: float = 0, + unit: str = "frame", + keep_original_sample: bool = True, + save_dir: Optional[str] = None, + ffmpeg_extra_args: str = "", + *args, + **kwargs, + ): + """ + Initialization method. + + :param split_len: length of each video split. Can be frame count (int) or duration in seconds (float). + :param overlap_len: overlap length between adjacent splits. Can be frame count (int) or duration in seconds (float). + :param unit: unit of split_len and overlap_len. Can be 'frame' or 'second'. Default is 'frame'. + :param keep_original_sample: whether to keep the original sample. If + it's set to False, there will be only cut sample in the + final datasets and the original sample will be removed. It's True + in default. + :param save_dir: The directory where generated video files will be stored. + If not specified, outputs will be saved in the same directory as their corresponding input files. + This path can alternatively be defined by setting the `DJ_PRODUCED_DATA_DIR` environment variable. + :param ffmpeg_extra_args: Extra ffmpeg args for splitting video. + :param args: extra args + :param kwargs: extra args + """ + super().__init__(*args, **kwargs) + self._init_parameters = self.remove_extra_parameters(locals()) + self._init_parameters.pop("save_dir", None) + + self.split_len = split_len + self.overlap_len = overlap_len + self.unit = unit + if self.unit not in ["frame", "second"]: + raise ValueError(f"Unit must be 'frame' or 'second', but got {self.unit}") + + self.keep_original_sample = keep_original_sample + self.save_dir = save_dir + self.ffmpeg_extra_args = ffmpeg_extra_args + + def split_videos_by_frame(self, video_key): + # 1. Get video metadata using OpenCV + cap = cv2.VideoCapture(video_key) + if not cap.isOpened(): + # If video cannot be opened, return empty list (or maybe original key?) + # Following other mappers, we might skip or return empty. + return [] + + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + fps = cap.get(cv2.CAP_PROP_FPS) + cap.release() + + if fps <= 0 or total_frames <= 0: + return [] + + # Convert seconds to frames if unit is 'second' + if self.unit == "second": + clip_len_frames = int(self.split_len * fps) + overlap_len_frames = int(self.overlap_len * fps) + else: + clip_len_frames = int(self.split_len) + overlap_len_frames = int(self.overlap_len) + + if clip_len_frames <= 0: + return [] + + # Calculate number of parts + # We want to cover the whole video. + # If overlap is 0: num_parts = ceil(total / clip) + # If overlap > 0: stride = clip - overlap. + # We start at 0, stride, 2*stride... + # Last start point must be such that start + clip <= total (if we don't want partials at end?) + # The user's code: + # num_parts = math.ceil(total_frames / clip_len) -> This logic in user code seems to assume 0 overlap for count calculation? + # Wait, user code: + # num_parts = math.ceil(total_frames / clip_len) + # for part_idx in range(num_parts): + # start_frame = part_idx * clip_len + # frames_to_extract = clip_len + overlap_len + # This logic implies the "stride" is `clip_len`. And `overlap_len` is EXTRA frames added to the end. + # So it's not "sliding window with overlap" in the traditional sense (where stride = window - overlap). + # It is "consecutive chunks + extra overlap at the end". + # Let's stick to the user's logic as requested: "split_video_for_vitra" logic. + # User code: + # num_parts = math.ceil(total_frames / clip_len) + # start_frame = part_idx * clip_len + # frames_to_extract = clip_len + overlap_len + + # However, if I use `split_len` as the "stride" and `overlap_len` as the overlap... + # The user's variable is `CLIP_LEN` and `OVERLAP`. + # `num_parts = math.ceil(total_frames / CLIP_LEN)` + # `start_frame = part_idx * CLIP_LEN` + # `frames_to_extract = CLIP_LEN + OVERLAP` + # This means the actual duration of the output video is `CLIP_LEN + OVERLAP`. + # And the next video starts at `CLIP_LEN`. + # So the overlap is indeed `OVERLAP`. + # Example: Clip=100, Overlap=20. + # Part 0: Start 0, Len 120. (0-120) + # Part 1: Start 100, Len 120. (100-220) + # Overlap is 20 frames (100-120). + # Yes, this matches "stride = split_len". + + stride = clip_len_frames + extra_overlap = overlap_len_frames + + num_parts = math.ceil(total_frames / stride) + + split_video_keys = [] + # transfer_filename can be imported from data_juicer.utils.file_utils if needed + # unique_video_key = transfer_filename(video_key, OP_NAME, self.save_dir, **self._init_parameters) + + if self.save_dir is not None: + output_dir = self.save_dir + else: + output_dir = os.path.dirname(video_key) + + unique_video_key = os.path.join(output_dir, os.path.basename(video_key)) + + for part_idx in range(num_parts): + start_frame = part_idx * stride + frames_to_extract = stride + extra_overlap + + # Boundary check + if start_frame + frames_to_extract > total_frames: + frames_to_extract = total_frames - start_frame + + # If the remaining frames are too few (e.g. less than a threshold?), user code doesn't check. + # But user code does: if start_frame + frames_to_extract > total_frames: frames_to_extract = ... + # And if frames_to_extract <= 0? (Should not happen if loop is correct) + + start_time = start_frame / fps + + output_path = add_suffix_to_filename(unique_video_key, f"_part{part_idx + 1}") + + # Construct FFmpeg command + # -y: overwrite + # -ss: start time + # -i: input + # -frames:v: frames to extract + # -c:v libx264 ... + # -an: remove audio (User code has -an. Should I keep it? Maybe make it optional? User code says "VITRA seems to only focus on visual". I'll keep it for now or make it configurable via ffmpeg_extra_args. But user asked to imitate the code.) + # I will include -an by default if it's in the user code, but maybe it's better to let ffmpeg_extra_args handle encoding options. + # User code: "-c:v", "libx264", "-preset", "fast", "-crf", "22", "-an" + + cmd = [ + "ffmpeg", + "-y", + "-ss", + f"{start_time:.4f}", + "-i", + video_key, + "-frames:v", + str(frames_to_extract), + ] + + # Default encoding args from user code + # We can allow overriding via ffmpeg_extra_args + if self.ffmpeg_extra_args: + import shlex + + cmd.extend(shlex.split(self.ffmpeg_extra_args)) + else: + # Default to user's sample logic if no args provided + cmd.extend(["-c:v", "libx264", "-preset", "fast", "-crf", "22", "-an"]) + + cmd.extend(["-loglevel", "error", output_path]) + + try: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + split_video_keys.append(output_path) + except subprocess.CalledProcessError: + # If failed, maybe skip? + continue + + return split_video_keys + + def _process_single_sample(self, sample): + # there is no video in this sample + if self.video_key not in sample or sample[self.video_key] is None or len(sample[self.video_key]) == 0: + sample[Fields.source_file] = [] + return [] + + if Fields.source_file not in sample or not sample[Fields.source_file]: + sample[Fields.source_file] = sample[self.video_key] + + # the split results + split_sample = copy.deepcopy(sample) + split_sample[self.text_key] = "" + split_sample[Fields.source_file] = [] + + # load all video(s) + loaded_video_keys = sample[self.video_key] + + # We don't need to pre-load video objects like in DurationMapper because we use cv2/ffmpeg per file. + + split_video_keys = [] + offset = 0 + # split each video chunk by chunk + for chunk in sample[self.text_key].split(SpecialTokens.eoc): + # skip empty chunks or contents after the last eoc token + if not chunk.strip(): + continue + else: + video_count = chunk.count(SpecialTokens.video) + place_holders = [] + for video_key in loaded_video_keys[offset : offset + video_count]: + new_video_keys = self.split_videos_by_frame(video_key) + split_video_keys.extend(new_video_keys) + place_holders.append(SpecialTokens.video * len(new_video_keys)) + split_sample[Fields.source_file].extend([video_key] * len(new_video_keys)) + + # insert the generated text according to given mode + replacer_function = create_replacer(place_holders) + new_split_text_per_chunk = re.sub(SpecialTokens.video, replacer_function, chunk) + split_sample[self.text_key] += f"{new_split_text_per_chunk}{SpecialTokens.eoc}" # noqa: E501 + offset += video_count + + split_sample[self.video_key] = split_video_keys + return [split_sample] + + def process_batched(self, samples): + # reconstruct samples from "dict of lists" to "list of dicts" + reconstructed_samples = [] + for i in range(len(samples[self.text_key])): + reconstructed_samples.append({key: samples[key][i] for key in samples}) + samples_after_split = [] + # do split for each sample within the batch + for ori_sample in reconstructed_samples: + if self.keep_original_sample: + samples_after_split.append(ori_sample) + generated_samples = self._process_single_sample(ori_sample) + if len(generated_samples) != 0: + samples_after_split.extend(generated_samples) + # reconstruct samples from "list of dicts" to "dict of lists" + keys = samples_after_split[0].keys() + res_samples = {} + for key in keys: + res_samples[key] = [s[key] for s in samples_after_split] + return res_samples diff --git a/demos/video_calibration/data/demo.jsonl b/demos/video_calibration/data/demo.jsonl new file mode 100644 index 0000000000..58f2017f6a --- /dev/null +++ b/demos/video_calibration/data/demo.jsonl @@ -0,0 +1,3 @@ +{"videos": ["./demos/video_calibration/data/test_video.mp4"], "text": "A dummy video for calibration test 1."} +{"videos": ["./demos/video_calibration/data/test_video.mp4"], "text": "A dummy video for calibration test 2."} +{"videos": ["./demos/video_calibration/data/test_video.mp4"], "text": "A dummy video for calibration test 3."} diff --git a/demos/video_calibration/data/test_video.mp4 b/demos/video_calibration/data/test_video.mp4 new file mode 100644 index 0000000000..410b5f726a Binary files /dev/null and b/demos/video_calibration/data/test_video.mp4 differ diff --git a/demos/video_calibration/install_droidcalib.sh b/demos/video_calibration/install_droidcalib.sh new file mode 100644 index 0000000000..74d040d5cf --- /dev/null +++ b/demos/video_calibration/install_droidcalib.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +# Get the cache directory from data_juicer +CACHE_DIR=$(python -c "from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE; print(DATA_JUICER_ASSETS_CACHE)") +DROID_CALIB_PATH="$CACHE_DIR/DroidCalib" + +echo "Target installation path: $DROID_CALIB_PATH" + +# Create cache directory if it doesn't exist +mkdir -p "$CACHE_DIR" + +# Clone DroidCalib if it doesn't exist +if [ ! -d "$DROID_CALIB_PATH" ]; then + echo "Cloning DroidCalib..." + git clone https://github.com/1van2ha0/DroidCalib.git "$DROID_CALIB_PATH" +else + echo "DroidCalib repo already exists at $DROID_CALIB_PATH" +fi + +cd "$DROID_CALIB_PATH" || exit + +# Clean up existing egg-info to avoid "Multiple .egg-info directories found" error +if ls *.egg-info 1> /dev/null 2>&1; then + echo "Cleaning up existing egg-info..." + rm -rf *.egg-info +fi + +echo "Installing DroidCalib..." +python setup.py install + +echo "Installation complete." diff --git a/demos/video_calibration/run_droidcalib.py b/demos/video_calibration/run_droidcalib.py new file mode 100644 index 0000000000..100ad5d75d --- /dev/null +++ b/demos/video_calibration/run_droidcalib.py @@ -0,0 +1,40 @@ +import os +import ray +from data_juicer.core.data.ray_dataset import RayDataset +from data_juicer.ops.mapper.video_calibration_mapper import VideoCalibrationMapper + +def main(): + # 1. Initialize Ray + ray.init(address='auto', ignore_reinit_error=True) + + # 2. Setup paths + data_path = "./demos/video_calibration/data/demo.jsonl" + output_dir = "./output/video_calibration" + + if not os.path.exists(data_path): + raise FileNotFoundError(f"Data file not found at {data_path}.") + + # 3. Load Dataset + ds = RayDataset(ray.data.read_json(data_path)) + print(f"Dataset loaded with {ds.count()} samples.") + + # 4. Initialize Operator + op = VideoCalibrationMapper( + image_size=[384, 512], + stride=2, + max_frames=50, + num_gpus=0.5, + ) + + # 5. Process Dataset + ds = ds.process([op]) + + # 6. Write Results + os.makedirs(output_dir, exist_ok=True) + ds.data.write_json(output_dir, force_ascii=False) + print(f"Results written to {output_dir}") + + ray.shutdown() + +if __name__ == '__main__': + main() diff --git a/demos/video_split_by_frame/data/demo.jsonl b/demos/video_split_by_frame/data/demo.jsonl new file mode 100644 index 0000000000..ec7f5459ee --- /dev/null +++ b/demos/video_split_by_frame/data/demo.jsonl @@ -0,0 +1 @@ +{"videos": ["demos/video_split_by_frame/data/dummy_video.mp4"], "text": "<__dj__video> A dummy video for split test. <|__dj__eoc|>"} diff --git a/demos/video_split_by_frame/data/dummy_video.mp4 b/demos/video_split_by_frame/data/dummy_video.mp4 new file mode 100644 index 0000000000..a52f5955f5 Binary files /dev/null and b/demos/video_split_by_frame/data/dummy_video.mp4 differ diff --git a/demos/video_split_by_frame/run_video_split.py b/demos/video_split_by_frame/run_video_split.py new file mode 100644 index 0000000000..ac0e92d187 --- /dev/null +++ b/demos/video_split_by_frame/run_video_split.py @@ -0,0 +1,38 @@ +import os +import sys + +from data_juicer.core.data import NestedDataset as Dataset +from data_juicer.ops.mapper.video_split_by_frame_mapper import VideoSplitByFrameMapper + +def main(): + # 1. Setup paths + data_path = 'demos/video_split_by_frame/data/demo.jsonl' + output_dir = 'demos/video_split_by_frame/output' + + if not os.path.exists(data_path): + raise FileNotFoundError(f"Data file not found at {data_path}.") + + # 2. Load Dataset + ds = Dataset.from_json(data_path) + print(f"Dataset loaded with {len(ds)} samples.") + + # 3. Initialize Operator + op = VideoSplitByFrameMapper( + split_len=50, + overlap_len=10, + unit='frame', + keep_original_sample=False, + save_dir=output_dir + ) + + # 4. Process Dataset + # Since VideoSplitByFrameMapper is a batched operator, we must set batched=True + ds = ds.map(op.process, batched=True) + + # 5. Write Results + os.makedirs(output_dir, exist_ok=True) + ds.to_json(os.path.join(output_dir, 'result.jsonl'), force_ascii=False) + print(f"Results written to {output_dir}") + +if __name__ == '__main__': + main() diff --git a/docs/Operators.md b/docs/Operators.md index 090337be92..356f91ff11 100644 --- a/docs/Operators.md +++ b/docs/Operators.md @@ -46,7 +46,7 @@ Data-Juicer 中的算子分为以下 8 种类型。 | [filter](#filter) | 56 | Filters out low-quality samples. 过滤低质量样本。 | | [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 | | [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 | -| [mapper](#mapper) | 98 | Edits and transforms samples. 对数据样本进行编辑和转换。 | +| [mapper](#mapper) | 100 | Edits and transforms samples. 对数据样本进行编辑和转换。 | | [pipeline](#pipeline) | 3 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 | | [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 | @@ -257,6 +257,7 @@ All the specific operators are listed below, each featured with several capabili | text_chunk_mapper | 🔤Text 💻CPU 🔗API 🟢Stable | Split input text into chunks based on specified criteria. 根据指定的条件将输入文本拆分为块。 | [info](operators/mapper/text_chunk_mapper.md) | - | | text_tagging_by_prompt_mapper | 🔤Text 🚀GPU 🌊vLLM 🧩HF 🟡Beta | Mapper to generate text tags using prompt with LLM. Mapper使用带有LLM的prompt生成文本标记。 | - | - | | vggt_mapper | 🎬Video 🚀GPU 🟡Beta | Input a video of a single scene, and use VGGT to extract information including Camera Pose, Depth Maps, Point Maps, and 3D Point Tracks. 输入单个场景的视频,并使用VGGT提取包括相机姿态、深度图、点图和3D点轨迹的信息。 | [info](operators/mapper/vggt_mapper.md) | - | +| video_calibration_mapper | 🎬Video 🚀GPU 🟡Beta | Extract camera intrinsics from videos using DroidCalib. 使用DroidCalib从视频中提取相机内部函数。 (Notice: Downloads DroidCalib (AGPL-3.0) at runtime. 注意: 运行时下载遵循AGPL-3.0的DroidCalib组件) | - | - | | video_captioning_from_audio_mapper | 🔮Multimodal 🚀GPU 🧩HF 🟢Stable | Mapper to caption a video according to its audio streams based on Qwen-Audio model. 映射器根据基于qwen-audio模型的音频流为视频添加字幕。 | [info](operators/mapper/video_captioning_from_audio_mapper.md) | - | | video_captioning_from_frames_mapper | 🔮Multimodal 🚀GPU 🧩HF 🟢Stable | Generates video captions from sampled frames using an image-to-text model. 使用图像到文本模型从采样帧生成视频字幕。 | [info](operators/mapper/video_captioning_from_frames_mapper.md) | - | | video_captioning_from_summarizer_mapper | 🔮Multimodal 🚀GPU 🧩HF 🟢Stable | Mapper to generate video captions by summarizing several kinds of generated texts (captions from video/audio/frames, tags from audio/frames, ...). 映射器通过总结几种生成的文本 (来自视频/音频/帧的字幕,来自音频/帧的标签,...) 来生成视频字幕。 | [info](operators/mapper/video_captioning_from_summarizer_mapper.md) | - | @@ -272,6 +273,7 @@ All the specific operators are listed below, each featured with several capabili | video_resize_aspect_ratio_mapper | 🎬Video 💻CPU 🟢Stable | Resizes videos to fit within a specified aspect ratio range. 调整视频大小以适应指定的宽高比范围。 | [info](operators/mapper/video_resize_aspect_ratio_mapper.md) | - | | video_resize_resolution_mapper | 🎬Video 💻CPU 🟢Stable | Resizes video resolution based on specified width and height constraints. 根据指定的宽度和高度限制调整视频分辨率。 | [info](operators/mapper/video_resize_resolution_mapper.md) | - | | video_split_by_duration_mapper | 🔮Multimodal 💻CPU 🟢Stable | Splits videos into segments based on a specified duration. 根据指定的持续时间将视频拆分为多个片段。 | [info](operators/mapper/video_split_by_duration_mapper.md) | - | +| video_split_by_frame_mapper | 🔮Multimodal 💻CPU 🟡Beta | Splits videos into segments based on a specified frame count or duration. 根据指定的帧数或持续时间将视频拆分为段。 | - | - | | video_split_by_key_frame_mapper | 🔮Multimodal 💻CPU 🟢Stable | Splits a video into segments based on key frames. 根据关键帧将视频分割为多个片段。 | [info](operators/mapper/video_split_by_key_frame_mapper.md) | - | | video_split_by_scene_mapper | 🔮Multimodal 💻CPU 🟢Stable | Splits videos into scene clips based on detected scene changes. 根据检测到的场景变化将视频拆分为场景剪辑。 | [info](operators/mapper/video_split_by_scene_mapper.md) | - | | video_tagging_from_audio_mapper | 🎬Video 🚀GPU 🧩HF 🟢Stable | Generates video tags from audio streams using the Audio Spectrogram Transformer. 使用音频频谱图转换器从音频流生成视频标签。 | [info](operators/mapper/video_tagging_from_audio_mapper.md) | - | diff --git a/tests/ops/mapper/test_video_calibration_mapper.py b/tests/ops/mapper/test_video_calibration_mapper.py new file mode 100644 index 0000000000..4528c7ec15 --- /dev/null +++ b/tests/ops/mapper/test_video_calibration_mapper.py @@ -0,0 +1,78 @@ +import unittest +import os +import cv2 +import numpy as np +from unittest.mock import MagicMock, patch +from data_juicer.ops.mapper.video_calibration_mapper import VideoCalibrationMapper +from data_juicer.utils.constant import Fields + +class TestVideoCalibrationMapper(unittest.TestCase): + + def setUp(self): + self.video_path = 'test_video.mp4' + # Create a dummy video + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(self.video_path, fourcc, 20.0, (640, 480)) + for _ in range(10): + frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) + out.write(frame) + out.release() + + def tearDown(self): + if os.path.exists(self.video_path): + os.remove(self.video_path) + + def test_process_single(self): + # Mock Droid import inside process_single + # Since Droid is imported inside process_single, we need to patch sys.modules or use patch.dict + # But a simpler way is to mock the class where it is used. + # However, since the import happens inside the function, standard patch object might be tricky if the module isn't loaded. + + # We will mock the entire process_single logic's dependency on Droid. + # Actually, we can patch 'data_juicer.ops.mapper.video_calibration_mapper.sys.path.append' to avoid side effects + # and patch 'builtins.__import__' or similar? No that's too complex. + + # Let's assume Droid is available for the test logic flow, or mock the import. + # We can use patch.dict(sys.modules, {'droid': MagicMock()}) + + mock_droid_module = MagicMock() + mock_droid_class = MagicMock() + mock_droid_module.Droid = mock_droid_class + + mock_instance = mock_droid_class.return_value + mock_instance.terminate.return_value = (None, np.array([100.0, 100.0, 320.0, 240.0])) + + with patch.dict('sys.modules', {'droid': mock_droid_module}): + # We also need to mock os.path.exists to return True for DroidCalib path check in __init__ + # and mock subprocess.run to avoid actual git clone + with patch('os.path.exists') as mock_exists, \ + patch('subprocess.run') as mock_run: + + # Make sure video path exists + def side_effect(path): + if path == self.video_path: + return True + if "DroidCalib" in path: + return True # Pretend DroidCalib exists + return False + mock_exists.side_effect = side_effect + + op = VideoCalibrationMapper(weights_path='dummy.pth') + sample = {'videos': [self.video_path]} + + # Run + res = op.process_single(sample) + + # Check + self.assertIn(Fields.meta, res) + self.assertIn('camera_intrinsics', res[Fields.meta]) + self.assertEqual(len(res[Fields.meta]['camera_intrinsics']), 1) + self.assertEqual(len(res[Fields.meta]['camera_intrinsics'][0]), 4) + + # Verify Droid was called + self.assertTrue(mock_droid_class.called) + self.assertTrue(mock_instance.track.called) + self.assertTrue(mock_instance.terminate.called) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/ops/mapper/test_video_split_by_frame_mapper.py b/tests/ops/mapper/test_video_split_by_frame_mapper.py new file mode 100644 index 0000000000..be61696e7c --- /dev/null +++ b/tests/ops/mapper/test_video_split_by_frame_mapper.py @@ -0,0 +1,136 @@ +import os +import unittest +import cv2 +import math + +from data_juicer.core.data import NestedDataset as Dataset +from data_juicer.ops.mapper.video_split_by_frame_mapper import VideoSplitByFrameMapper +from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +class VideoSplitByFrameMapperTest(DataJuicerTestCaseBase): + + data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'data') + vid1_path = os.path.join(data_path, 'video1.mp4') + vid2_path = os.path.join(data_path, 'video2.mp4') + vid3_path = os.path.join(data_path, 'video3.mp4') + + def _get_res_list(self, dataset, source_list): + res_list = [] + origin_paths = [self.vid1_path, self.vid2_path, self.vid3_path] + idx = 0 + for sample in dataset.to_list(): + output_paths = sample['videos'] + + # for keep_original_sample=True + if set(output_paths) <= set(origin_paths): + res_list.append({ + 'text': sample['text'], + 'videos': sample['videos'] + }) + continue + + source = source_list[idx] + idx += 1 + + output_file_names = [ + os.path.splitext(os.path.basename(p))[0] for p in output_paths + ] + split_frames_nums = [] + for origin_path in source['videos']: + origin_file_name = os.path.splitext( + os.path.basename(origin_path))[0] + cnt = 0 + for output_file_name in output_file_names: + if origin_file_name in output_file_name: + cnt += 1 + split_frames_nums.append(cnt) + + res_list.append({ + 'text': sample['text'], + 'split_frames_num': split_frames_nums + }) + + return res_list + + def _run_video_split_by_frame_mapper(self, + op, + source_list, + target_list, + num_proc=1): + dataset = Dataset.from_list(source_list) + dataset = dataset.map(op.process, num_proc=num_proc) + res_list = self._get_res_list(dataset, source_list) + self.assertEqual(res_list, target_list) + + def _get_frame_count(self, video_path): + cap = cv2.VideoCapture(video_path) + count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + return count + + def test_split_by_frame(self): + # Calculate expected splits + # video1 + frames1 = self._get_frame_count(self.vid1_path) + split_len = 100 + overlap_len = 0 + expected_splits1 = math.ceil(frames1 / split_len) + + # video2 + frames2 = self._get_frame_count(self.vid2_path) + expected_splits2 = math.ceil(frames2 / split_len) + + # video3 + frames3 = self._get_frame_count(self.vid3_path) + expected_splits3 = math.ceil(frames3 / split_len) + + ds_list = [{ + 'text': f'{SpecialTokens.video} video1', + 'videos': [self.vid1_path] + }, { + 'text': f'{SpecialTokens.video} video2 {SpecialTokens.eoc}', + 'videos': [self.vid2_path] + }, { + 'text': f'{SpecialTokens.video} video3 {SpecialTokens.eoc}', + 'videos': [self.vid3_path] + }] + + tgt_list = [{ + 'text': f'{SpecialTokens.video * expected_splits1} video1{SpecialTokens.eoc}', + 'split_frames_num': [expected_splits1] + }, { + 'text': f'{SpecialTokens.video * expected_splits2} video2 {SpecialTokens.eoc}', + 'split_frames_num': [expected_splits2] + }, { + 'text': f'{SpecialTokens.video * expected_splits3} video3 {SpecialTokens.eoc}', + 'split_frames_num': [expected_splits3] + }] + + op = VideoSplitByFrameMapper(split_len=split_len, overlap_len=overlap_len, unit='frame', keep_original_sample=False) + self._run_video_split_by_frame_mapper(op, ds_list, tgt_list) + + def test_split_by_frame_with_overlap(self): + # video1 + frames1 = self._get_frame_count(self.vid1_path) + split_len = 50 + overlap_len = 10 + # stride = 50 + # num_parts = ceil(frames1 / 50) + expected_splits1 = math.ceil(frames1 / split_len) + + ds_list = [{ + 'text': f'{SpecialTokens.video} video1', + 'videos': [self.vid1_path] + }] + + tgt_list = [{ + 'text': f'{SpecialTokens.video * expected_splits1} video1{SpecialTokens.eoc}', + 'split_frames_num': [expected_splits1] + }] + + op = VideoSplitByFrameMapper(split_len=split_len, overlap_len=overlap_len, unit='frame', keep_original_sample=False) + self._run_video_split_by_frame_mapper(op, ds_list, tgt_list) + +if __name__ == '__main__': + unittest.main()