diff --git a/supervision/__init__.py b/supervision/__init__.py index 04d3fb254..43d63cad5 100644 --- a/supervision/__init__.py +++ b/supervision/__init__.py @@ -52,6 +52,7 @@ from supervision.detection.tools.polygon_zone import PolygonZone, PolygonZoneAnnotator from supervision.detection.tools.smoother import DetectionsSmoother from supervision.detection.utils.boxes import ( + box_aspect_ratio, clip_boxes, denormalize_boxes, move_boxes, @@ -196,6 +197,7 @@ "VideoInfo", "VideoSink", "approximate_polygon", + "box_aspect_ratio", "box_iou", "box_iou_batch", "box_iou_batch_with_jaccard", diff --git a/supervision/detection/utils/boxes.py b/supervision/detection/utils/boxes.py index 3b01fcb68..1b1d9d2d3 100644 --- a/supervision/detection/utils/boxes.py +++ b/supervision/detection/utils/boxes.py @@ -6,6 +6,57 @@ from supervision.detection.utils.iou_and_nms import box_iou_batch +def box_aspect_ratio(xyxy: np.ndarray) -> np.ndarray: + """ + Calculate aspect ratios of bounding boxes given in xyxy format. + + Computes the width divided by height for each bounding box. Returns NaN + for boxes with zero height to avoid division errors. + + Args: + xyxy (`numpy.ndarray`): Array of bounding boxes in + `(x_min, y_min, x_max, y_max)` format with shape `(N, 4)`. + + Returns: + `numpy.ndarray`: Array of aspect ratios with shape `(N,)`, where each element is + the width divided by height of a box. Elements are NaN if height is zero. + + Examples: + ```python + import numpy as np + import supervision as sv + + xyxy = np.array([ + [10, 20, 30, 50], + [0, 0, 40, 10], + ]) + + sv.box_aspect_ratio(xyxy) + # array([0.66666667, 4. ]) + + xyxy = np.array([ + [10, 10, 30, 10], + [5, 5, 25, 25], + ]) + + sv.box_aspect_ratio(xyxy) + # array([ nan, 1. ]) + ``` + """ + widths = xyxy[:, 2] - xyxy[:, 0] + heights = xyxy[:, 3] - xyxy[:, 1] + + aspect_ratios = np.full_like(widths, np.nan, dtype=np.float64) + np.divide( + widths, + heights, + out=aspect_ratios, + where=heights != 0, + ) + + return aspect_ratios + + def clip_boxes(xyxy: np.ndarray, resolution_wh: tuple[int, int]) -> np.ndarray: """ Clips bounding boxes coordinates to fit within the frame resolution. diff --git a/supervision/utils/video.py b/supervision/utils/video.py index 3b281b4e2..0ece0916d 100644 --- a/supervision/utils/video.py +++ b/supervision/utils/video.py @@ -1,9 +1,11 @@ from __future__ import annotations +import threading import time from collections import deque from collections.abc import Callable, Generator from dataclasses import dataclass +from queue import Queue import cv2 import numpy as np @@ -196,63 +198,126 @@ def process_video( source_path: str, target_path: str, callback: Callable[[np.ndarray, int], np.ndarray], + *, max_frames: int | None = None, + prefetch: int = 32, + writer_buffer: int = 32, show_progress: bool = False, progress_message: str = "Processing video", ) -> None: """ - Process a video file by applying a callback function on each frame - and saving the result to a target video file. + Process video frames asynchronously using a threaded pipeline. + + This function orchestrates a three-stage pipeline to optimize video processing + throughput: + + 1. Reader thread: Continuously reads frames from the source video file and + enqueues them into a bounded queue (`frame_read_queue`). The queue size is + limited by the `prefetch` parameter to control memory usage. + 2. Main thread (Processor): Dequeues frames from `frame_read_queue`, applies the + user-defined `callback` function to process each frame, then enqueues the + processed frames into another bounded queue (`frame_write_queue`) for writing. + The processing happens in the main thread, simplifying use of stateful objects + without synchronization. + 3. Writer thread: Dequeues processed frames from `frame_write_queue` and writes + them sequentially to the output video file. Args: - source_path (str): The path to the source video file. - target_path (str): The path to the target video file. - callback (Callable[[np.ndarray, int], np.ndarray]): A function that takes in - a numpy ndarray representation of a video frame and an - int index of the frame and returns a processed numpy ndarray - representation of the frame. - max_frames (Optional[int]): The maximum number of frames to process. - show_progress (bool): Whether to show a progress bar. - progress_message (str): The message to display in the progress bar. + source_path (str): Path to the input video file. + target_path (str): Path where the processed video will be saved. + callback (Callable[[numpy.ndarray, int], numpy.ndarray]): Function called for + each frame, accepting the frame as a numpy array and its zero-based index, + returning the processed frame. + max_frames (int | None): Optional maximum number of frames to process. + If None, the entire video is processed (default). + prefetch (int): Maximum number of frames buffered by the reader thread. + Controls memory use; default is 32. + writer_buffer (int): Maximum number of frames buffered before writing. + Controls output buffer size; default is 32. + show_progress (bool): Whether to display a tqdm progress bar during processing. + Default is False. + progress_message (str): Description shown in the progress bar. - Examples: + Returns: + None + + Example: ```python + import cv2 import supervision as sv + from rfdetr import RFDETRMedium - def callback(scene: np.ndarray, index: int) -> np.ndarray: - ... + model = RFDETRMedium() + + def callback(frame, frame_index): + return model.predict(frame) process_video( - source_path=, - target_path=, - callback=callback + source_path="source.mp4", + target_path="target.mp4", + callback=frame_callback, ) ``` """ - source_video_info = VideoInfo.from_video_path(video_path=source_path) - video_frames_generator = get_video_frames_generator( - source_path=source_path, end=max_frames + video_info = VideoInfo.from_video_path(video_path=source_path) + total_frames = ( + min(video_info.total_frames, max_frames) + if max_frames is not None + else video_info.total_frames ) - with VideoSink(target_path=target_path, video_info=source_video_info) as sink: - total_frames = ( - min(source_video_info.total_frames, max_frames) - if max_frames is not None - else source_video_info.total_frames + + frame_read_queue: Queue[tuple[int, np.ndarray] | None] = Queue(maxsize=prefetch) + frame_write_queue: Queue[np.ndarray | None] = Queue(maxsize=writer_buffer) + + def reader_thread() -> None: + frame_generator = get_video_frames_generator( + source_path=source_path, + end=max_frames, ) - for index, frame in enumerate( - tqdm( - video_frames_generator, - total=total_frames, - disable=not show_progress, - desc=progress_message, - ) - ): - result_frame = callback(frame, index) - sink.write_frame(frame=result_frame) - else: - for index, frame in enumerate(video_frames_generator): - result_frame = callback(frame, index) - sink.write_frame(frame=result_frame) + for frame_index, frame in enumerate(frame_generator): + frame_read_queue.put((frame_index, frame)) + frame_read_queue.put(None) + + def writer_thread(video_sink: VideoSink) -> None: + while True: + frame = frame_write_queue.get() + if frame is None: + break + video_sink.write_frame(frame=frame) + + reader_worker = threading.Thread(target=reader_thread, daemon=True) + with VideoSink(target_path=target_path, video_info=video_info) as video_sink: + writer_worker = threading.Thread( + target=writer_thread, + args=(video_sink,), + daemon=True, + ) + + reader_worker.start() + writer_worker.start() + + progress_bar = tqdm( + total=total_frames, + disable=not show_progress, + desc=progress_message, + ) + + try: + while True: + read_item = frame_read_queue.get() + if read_item is None: + break + + frame_index, frame = read_item + processed_frame = callback(frame, frame_index) + + frame_write_queue.put(processed_frame) + progress_bar.update(1) + finally: + frame_write_queue.put(None) + reader_worker.join() + writer_worker.join() + progress_bar.close() class FPSMonitor: