diff --git a/benchmarks/video/benchmark.py b/benchmarks/video/benchmark.py deleted file mode 100644 index d9e5e62bbf..0000000000 --- a/benchmarks/video/benchmark.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import threading -import time -from contextlib import ContextDecorator - - -class TimeBenchmark(ContextDecorator): - """ - Measures execution time using a context manager or decorator. - - This class supports both context manager and decorator usage, and is thread-safe for multithreaded - environments. - - Args: - print: If True, prints the elapsed time upon exiting the context or completing the function. Defaults - to False. - - Examples: - - Using as a context manager: - - >>> benchmark = TimeBenchmark() - >>> with benchmark: - ... time.sleep(1) - >>> print(f"Block took {benchmark.result:.4f} seconds") - Block took approximately 1.0000 seconds - - Using with multithreading: - - ```python - import threading - - benchmark = TimeBenchmark() - - - def context_manager_example(): - with benchmark: - time.sleep(0.01) - print(f"Block took {benchmark.result_ms:.2f} milliseconds") - - - threads = [] - for _ in range(3): - t1 = threading.Thread(target=context_manager_example) - threads.append(t1) - - for t in threads: - t.start() - - for t in threads: - t.join() - ``` - Expected output: - Block took approximately 10.00 milliseconds - Block took approximately 10.00 milliseconds - Block took approximately 10.00 milliseconds - """ - - def __init__(self, print=False): - self.local = threading.local() - self.print_time = print - - def __enter__(self): - self.local.start_time = time.perf_counter() - return self - - def __exit__(self, *exc): - self.local.end_time = time.perf_counter() - self.local.elapsed_time = self.local.end_time - self.local.start_time - if self.print_time: - print(f"Elapsed time: {self.local.elapsed_time:.4f} seconds") - return False - - @property - def result(self): - return getattr(self.local, "elapsed_time", None) - - @property - def result_ms(self): - return self.result * 1e3 diff --git a/benchmarks/video/capture_camera_feed.py b/benchmarks/video/capture_camera_feed.py deleted file mode 100755 index 8f8530532d..0000000000 --- a/benchmarks/video/capture_camera_feed.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Capture video feed from a camera as raw images.""" - -import argparse -import datetime as dt -import os -import time -from pathlib import Path - -import cv2 -import rerun as rr - -# see https://rerun.io/docs/howto/visualization/limit-ram -RERUN_MEMORY_LIMIT = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "5%") - - -def display_and_save_video_stream(output_dir: Path, fps: int, width: int, height: int, duration: int): - rr.init("lerobot_capture_camera_feed") - rr.spawn(memory_limit=RERUN_MEMORY_LIMIT) - - now = dt.datetime.now() - capture_dir = output_dir / f"{now:%Y-%m-%d}" / f"{now:%H-%M-%S}" - if not capture_dir.exists(): - capture_dir.mkdir(parents=True, exist_ok=True) - - # Opens the default webcam - cap = cv2.VideoCapture(0) - if not cap.isOpened(): - print("Error: Could not open video stream.") - return - - cap.set(cv2.CAP_PROP_FPS, fps) - cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) - cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) - - frame_index = 0 - start_time = time.time() - while time.time() - start_time < duration: - ret, frame = cap.read() - - if not ret: - print("Error: Could not read frame.") - break - rr.log("video/stream", rr.Image(frame), static=True) - cv2.imwrite(str(capture_dir / f"frame_{frame_index:06d}.png"), frame) - frame_index += 1 - - # Release the capture - cap.release() - - # TODO(Steven): Add a graceful shutdown via a close() method for the Viewer context, though not currently supported in the Rerun API. - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - - parser.add_argument( - "--output-dir", - type=Path, - default=Path("outputs/cam_capture/"), - help="Directory where the capture images are written. A subfolder named with the current date & time will be created inside it for each capture.", - ) - parser.add_argument( - "--fps", - type=int, - default=30, - help="Frames Per Second of the capture.", - ) - parser.add_argument( - "--width", - type=int, - default=1280, - help="Width of the captured images.", - ) - parser.add_argument( - "--height", - type=int, - default=720, - help="Height of the captured images.", - ) - parser.add_argument( - "--duration", - type=int, - default=20, - help="Duration in seconds for which the video stream should be captured.", - ) - args = parser.parse_args() - display_and_save_video_stream(**vars(args)) diff --git a/benchmarks/video/run_video_benchmark.py b/benchmarks/video/run_video_benchmark.py index 9f34b22736..37018bee83 100644 --- a/benchmarks/video/run_video_benchmark.py +++ b/benchmarks/video/run_video_benchmark.py @@ -21,11 +21,13 @@ import argparse import datetime as dt +import itertools import random import shutil from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path +from threading import Lock import einops import numpy as np @@ -35,13 +37,13 @@ from skimage.metrics import mean_squared_error, peak_signal_noise_ratio, structural_similarity from tqdm import tqdm -from benchmarks.video.benchmark import TimeBenchmark from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.video_utils import ( - decode_video_frames_torchvision, + decode_video_frames, encode_video_frames, ) from lerobot.utils.constants import OBS_IMAGE +from lerobot.utils.utils import TimerManager BASE_ENCODING = OrderedDict( [ @@ -86,7 +88,7 @@ def load_original_frames(imgs_dir: Path, timestamps: list[float], fps: int) -> t frames = [] for ts in timestamps: idx = int(ts * fps) - frame = PIL.Image.open(imgs_dir / f"frame_{idx:06d}.png") + frame = PIL.Image.open(imgs_dir / f"frame-{idx:06d}.png") frame = torch.from_numpy(np.array(frame)) frame = frame.type(torch.float32) / 255 frame = einops.rearrange(frame, "h w c -> c h w") @@ -97,21 +99,21 @@ def load_original_frames(imgs_dir: Path, timestamps: list[float], fps: int) -> t def save_decoded_frames( imgs_dir: Path, save_dir: Path, frames: torch.Tensor, timestamps: list[float], fps: int ) -> None: - if save_dir.exists() and len(list(save_dir.glob("frame_*.png"))) == len(timestamps): + if save_dir.exists() and len(list(save_dir.glob("frame-*.png"))) == len(timestamps): return save_dir.mkdir(parents=True, exist_ok=True) for i, ts in enumerate(timestamps): idx = int(ts * fps) frame_hwc = (frames[i].permute((1, 2, 0)) * 255).type(torch.uint8).cpu().numpy() - PIL.Image.fromarray(frame_hwc).save(save_dir / f"frame_{idx:06d}_decoded.png") - shutil.copyfile(imgs_dir / f"frame_{idx:06d}.png", save_dir / f"frame_{idx:06d}_original.png") + PIL.Image.fromarray(frame_hwc).save(save_dir / f"frame-{idx:06d}_decoded.png") + shutil.copyfile(imgs_dir / f"frame-{idx:06d}.png", save_dir / f"frame-{idx:06d}_original.png") def save_first_episode(imgs_dir: Path, dataset: LeRobotDataset) -> None: episode_index = 0 ep_num_images = dataset.meta.episodes["length"][episode_index] - if imgs_dir.exists() and len(list(imgs_dir.glob("frame_*.png"))) == ep_num_images: + if imgs_dir.exists() and len(list(imgs_dir.glob("frame-*.png"))) == ep_num_images: return imgs_dir.mkdir(parents=True, exist_ok=True) @@ -125,7 +127,7 @@ def save_first_episode(imgs_dir: Path, dataset: LeRobotDataset) -> None: tqdm(imgs_dataset, desc=f"saving {dataset.repo_id} first episode images", leave=False) ): img = item[img_keys[0]] - img.save(str(imgs_dir / f"frame_{i:06d}.png"), quality=100) + img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100) if i >= ep_num_images - 1: break @@ -149,18 +151,6 @@ def sample_timestamps(timestamps_mode: str, ep_num_images: int, fps: int) -> lis return [idx / fps for idx in frame_indexes] -def decode_video_frames( - video_path: str, - timestamps: list[float], - tolerance_s: float, - backend: str, -) -> torch.Tensor: - if backend in ["pyav", "video_reader"]: - return decode_video_frames_torchvision(video_path, timestamps, tolerance_s, backend) - else: - raise NotImplementedError(backend) - - def benchmark_decoding( imgs_dir: Path, video_path: Path, @@ -172,8 +162,8 @@ def benchmark_decoding( num_workers: int = 4, save_frames: bool = False, ) -> dict: - def process_sample(sample: int): - time_benchmark = TimeBenchmark() + def process_sample(sample: int, lock: Lock): + time_benchmark = TimerManager(log=False) timestamps = sample_timestamps(timestamps_mode, ep_num_images, fps) num_frames = len(timestamps) result = { @@ -182,13 +172,13 @@ def process_sample(sample: int): "mse_values": [], } - with time_benchmark: + with time_benchmark, lock: frames = decode_video_frames(video_path, timestamps=timestamps, tolerance_s=5e-1, backend=backend) - result["load_time_video_ms"] = time_benchmark.result_ms / num_frames + result["load_time_video_ms"] = (time_benchmark.last * 1000) / num_frames with time_benchmark: original_frames = load_original_frames(imgs_dir, timestamps, fps) - result["load_time_images_ms"] = time_benchmark.result_ms / num_frames + result["load_time_images_ms"] = (time_benchmark.last * 1000) / num_frames frames_np, original_frames_np = frames.numpy(), original_frames.numpy() for i in range(num_frames): @@ -215,8 +205,10 @@ def process_sample(sample: int): # A sample is a single set of decoded frames specified by timestamps_mode (e.g. a single frame, 2 frames, etc.). # For each sample, we record metrics (loading time and quality metrics) which are then averaged over all samples. # As these samples are independent, we run them in parallel threads to speed up the benchmark. + # Use a single shared lock for all worker threads + shared_lock = Lock() with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [executor.submit(process_sample, i) for i in range(num_samples)] + futures = [executor.submit(process_sample, i, shared_lock) for i in range(num_samples)] for future in tqdm(as_completed(futures), total=num_samples, desc="samples", leave=False): result = future.result() load_times_video_ms.append(result["load_time_video_ms"]) @@ -358,24 +350,27 @@ def main( imgs_dir = output_dir / "images" / dataset.repo_id.replace("/", "_") # We only use the first episode save_first_episode(imgs_dir, dataset) - for key, values in tqdm(encoding_benchmarks.items(), desc="encodings (g, crf)", leave=False): - for value in tqdm(values, desc=f"encodings ({key})", leave=False): - encoding_cfg = BASE_ENCODING.copy() - encoding_cfg["vcodec"] = video_codec - encoding_cfg["pix_fmt"] = pixel_format + for duet in [ + dict(zip(encoding_benchmarks.keys(), unique_combination, strict=False)) + for unique_combination in itertools.product(*encoding_benchmarks.values()) + ]: + encoding_cfg = BASE_ENCODING.copy() + encoding_cfg["vcodec"] = video_codec + encoding_cfg["pix_fmt"] = pixel_format + for key, value in duet.items(): encoding_cfg[key] = value - args_path = Path("_".join(str(value) for value in encoding_cfg.values())) - video_path = output_dir / "videos" / args_path / f"{repo_id.replace('/', '_')}.mp4" - benchmark_table += benchmark_encoding_decoding( - dataset, - video_path, - imgs_dir, - encoding_cfg, - decoding_benchmarks, - num_samples, - num_workers, - save_frames, - ) + args_path = Path("_".join(str(value) for value in encoding_cfg.values())) + video_path = output_dir / "videos" / args_path / f"{repo_id.replace('/', '_')}.mp4" + benchmark_table += benchmark_encoding_decoding( + dataset, + video_path, + imgs_dir, + encoding_cfg, + decoding_benchmarks, + num_samples, + num_workers, + save_frames, + ) # Save intermediate results benchmark_df = pd.DataFrame(benchmark_table, columns=headers) @@ -409,9 +404,9 @@ def main( nargs="*", default=[ "lerobot/pusht_image", - "aliberts/aloha_mobile_shrimp_image", - "aliberts/paris_street", - "aliberts/kitchen", + "CarolinePascal/aloha_mobile_shrimp_image", + "CarolinePascal/paris_street", + "CarolinePascal/kitchen", ], help="Datasets repo-ids to test against. First episodes only are used. Must be images.", ) @@ -419,7 +414,7 @@ def main( "--vcodec", type=str, nargs="*", - default=["libx264", "hevc", "libsvtav1"], + default=["h264", "hevc", "libsvtav1"], help="Video codecs to be tested", ) parser.add_argument( @@ -468,7 +463,7 @@ def main( "--backends", type=str, nargs="*", - default=["pyav", "video_reader"], + default=["torchcodec", "pyav"], help="Torchvision decoding backend to be tested.", ) parser.add_argument(