diff --git a/assets/sample_video.mp4 b/assets/sample_video.mp4 new file mode 100644 index 000000000..3c4e08a57 Binary files /dev/null and b/assets/sample_video.mp4 differ diff --git a/doc/_toc.yml b/doc/_toc.yml index ff154b0de..029d43ebc 100644 --- a/doc/_toc.yml +++ b/doc/_toc.yml @@ -70,6 +70,7 @@ chapters: - file: code/converters/4_image_converters - file: code/converters/5_selectively_converting - file: code/converters/6_human_converter + - file: code/converters/7_video_converters - file: code/converters/ansi_attack_converter - file: code/converters/char_swap_attack_generator - file: code/converters/pdf_converter diff --git a/doc/code/converters/7_video_converters.ipynb b/doc/code/converters/7_video_converters.ipynb new file mode 100644 index 000000000..4904e074d --- /dev/null +++ b/doc/code/converters/7_video_converters.ipynb @@ -0,0 +1,63 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Adding Images to a Video\n", + "\n", + "This shows how to use the video converter to add an image to a video.\n", + "To use this converter you'll need to install opencv which can be done with \n", + "`pip install pyrit[opencv]`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "ConverterResult(output_text='..\\\\..\\\\..\\\\assets\\\\output_video.mp4', output_type='video_path')" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pathlib\n", + "\n", + "from pyrit.common import IN_MEMORY, initialize_pyrit\n", + "from pyrit.prompt_converter import AddImageVideoConverter\n", + "\n", + "initialize_pyrit(memory_db_type=IN_MEMORY)\n", + "\n", + "input_video = str(pathlib.Path(\".\") / \"..\" / \"..\" / \"..\" / \"assets\" / \"sample_video.mp4\")\n", + "input_image = str(pathlib.Path(\".\") / \"..\" / \"..\" / \"..\" / \"assets\" / \"pyrit_architecture.png\")\n", + "\n", + "video = AddImageVideoConverter(video_path=input_video)\n", + "converted_vid = await video.convert_async(prompt=input_image, input_type=\"image_path\")\n", + "converted_vid" + ] + } + ], + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/doc/code/converters/7_video_converters.py b/doc/code/converters/7_video_converters.py new file mode 100644 index 000000000..204ca4f6f --- /dev/null +++ b/doc/code/converters/7_video_converters.py @@ -0,0 +1,31 @@ +# --- +# jupyter: +# jupytext: +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.16.4 +# --- + +# %% [markdown] +# # Adding Images to a Video +# +# This shows how to use the video converter to add an image to a video. +# To use this converter you'll need to install opencv which can be done with +# `pip install pyrit[opencv]` + +# %% +import pathlib + +from pyrit.common import IN_MEMORY, initialize_pyrit +from pyrit.prompt_converter import AddImageVideoConverter + +initialize_pyrit(memory_db_type=IN_MEMORY) + +input_video = str(pathlib.Path(".") / ".." / ".." / ".." / "assets" / "sample_video.mp4") +input_image = str(pathlib.Path(".") / ".." / ".." / ".." / "assets" / "pyrit_architecture.png") + +video = AddImageVideoConverter(video_path=input_video) +converted_vid = await video.convert_async(prompt=input_image, input_type="image_path") +converted_vid diff --git a/pyproject.toml b/pyproject.toml index 311a9bbf8..86e4e9bcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,12 +114,16 @@ playwright = [ "ollama>=0.4.4" ] +opencv = [ + "opencv-python>=4.11.0.86", +] all = [ "accelerate==0.34.2", "azureml-mlflow==1.57.0", "black>=24.4.0", "flake8>=7.0.0", "flake8-copyright>=0.2.4", + "flask>=3.1.0", "jupyter-book>=1.0.2", "jupytext>=1.16.1", "mlflow==2.16.2", @@ -127,6 +131,9 @@ all = [ "mypy>=1.9.0", "mock-alchemy>=0.2.6", "numpy<2", + "ollama>=0.4.4", + "opencv-python>=4.11.0.86", + "playwright==1.49.0", "pre-commit>=3.3.3", "pytest>=7.3.1", "pytest-asyncio>=0.23.5", @@ -135,9 +142,6 @@ all = [ "semantic-kernel>=1.20.0", "sentencepiece==0.2.0", "torch>=2.3.0", - "playwright==1.49.0", - "flask>=3.1.0", - "ollama>=0.4.4", "types-PyYAML>=6.0.12.9", ] diff --git a/pyrit/prompt_converter/__init__.py b/pyrit/prompt_converter/__init__.py index 14e5f17bc..0e7f6f9e6 100644 --- a/pyrit/prompt_converter/__init__.py +++ b/pyrit/prompt_converter/__init__.py @@ -4,6 +4,7 @@ from pyrit.prompt_converter.prompt_converter import ConverterResult, PromptConverter from pyrit.prompt_converter.add_image_text_converter import AddImageTextConverter +from pyrit.prompt_converter.add_image_to_video_converter import AddImageVideoConverter from pyrit.prompt_converter.add_text_image_converter import AddTextImageConverter from pyrit.prompt_converter.ansi_escape.ansi_attack_converter import AnsiAttackConverter from pyrit.prompt_converter.ascii_art_converter import AsciiArtConverter @@ -57,6 +58,7 @@ __all__ = [ "AddImageTextConverter", + "AddImageVideoConverter", "AddTextImageConverter", "AnsiAttackConverter", "AsciiArtConverter", diff --git a/pyrit/prompt_converter/add_image_to_video_converter.py b/pyrit/prompt_converter/add_image_to_video_converter.py new file mode 100644 index 000000000..b3348cfe4 --- /dev/null +++ b/pyrit/prompt_converter/add_image_to_video_converter.py @@ -0,0 +1,183 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np + +from pyrit.common.path import DB_DATA_PATH +from pyrit.models import PromptDataType, data_serializer_factory +from pyrit.prompt_converter import ConverterResult, PromptConverter + +logger = logging.getLogger(__name__) + + +if TYPE_CHECKING: + import cv2 + +# Choose the codec based on extension +video_encoding_map = { + "mp4": "mp4v", + "avi": "XVID", + "mov": "MJPG", + "mkv": "X264", +} + + +class AddImageVideoConverter(PromptConverter): + """ + Adds an image to a video at a specified position. + Also, currently the image is placed in the whole video, not at a specific timepoint + + Args: + video_path (str): File path of video to add image to + output_path (str, Optional): File path of output video. Defaults to None. + img_position (tuple, Optional): Position to place image in video. Defaults to (10, 10). + img_resize_size (tuple, Optional): Size to resize image to. Defaults to (500, 500). + """ + + def __init__( + self, + video_path: str, + output_path: str = None, + img_position: tuple = (10, 10), + img_resize_size: tuple = (500, 500), + ): + if not video_path: + raise ValueError("Please provide valid video path") + + self._output_path = output_path + self._img_position = img_position + self._img_resize_size = img_resize_size + self._video_path = video_path + + async def _add_image_to_video(self, image_path: str, output_path: str): + """ + Adds image to video + Args: + image_path (str): The image path to add to video. + output_path (str): The output video path. + + Returns: + output_path (str): The output video path. + """ + + if not image_path: + raise ValueError("Please provide valid image path value") + + input_image_data = data_serializer_factory( + category="prompt-memory-entries", data_type="image_path", value=image_path + ) + input_video_data = data_serializer_factory( + category="prompt-memory-entries", data_type="video_path", value=self._video_path + ) + + # Open the video to ensure it exists + video_bytes = await input_video_data.read_data() + + azure_storage_flag = input_video_data._is_azure_storage_url(self._video_path) + video_path = self._video_path + + try: + if azure_storage_flag: + # If the video is in Azure storage, download it first + + # Save the video bytes to a temporary file + local_temp_path = Path(DB_DATA_PATH, "temp_video.mp4") + with open(local_temp_path, "wb") as f: + f.write(video_bytes) + video_path = str(local_temp_path) + + cap = cv2.VideoCapture(video_path) + + # Get video properties + fps = int(cap.get(cv2.CAP_PROP_FPS)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + file_extension = video_path.split(".")[-1].lower() + if file_extension in video_encoding_map: + video_char_code = cv2.VideoWriter_fourcc(*video_encoding_map[file_extension]) + output_video = cv2.VideoWriter(output_path, video_char_code, fps, (width, height)) + else: + raise ValueError(f"Unsupported video format: {file_extension}") + + # Load and resize the overlay image + + input_image_bytes = await input_image_data.read_data() + image_np_arr = np.frombuffer(input_image_bytes, np.uint8) + overlay = cv2.imdecode(image_np_arr, cv2.IMREAD_UNCHANGED) + overlay = cv2.resize(overlay, self._img_resize_size) + + # Get overlay image dimensions + image_height, image_width, _ = overlay.shape + x, y = self._img_position # Position where the overlay will be placed + + while cap.isOpened(): + ret, frame = cap.read() + if not ret: + break + + # Ensure overlay fits within the frame boundaries + if x + image_width > width or y + image_height > height: + logger.info("Overlay image is too large for the video frame. Resizing to fit.") + overlay = cv2.resize(overlay, (width - x, height - y)) + image_height, image_width, _ = overlay.shape + + # Blend overlay with frame + if overlay.shape[2] == 4: # Check number of channels on image + alpha_overlay = overlay[:, :, 3] / 255.0 + for c in range(0, 3): + frame[y : y + image_height, x : x + image_width, c] = ( + alpha_overlay * overlay[:, :, c] + + (1 - alpha_overlay) * frame[y : y + image_height, x : x + image_width, c] + ) + else: + frame[y : y + image_height, x : x + image_width] = overlay + + # Write the modified frame to the output video + output_video.write(frame) + + finally: + # Release everything + cap.release() + output_video.release() + cv2.destroyAllWindows() + if azure_storage_flag: + os.remove(local_temp_path) + + logger.info(f"Video saved as {output_path}") + + return output_path + + async def convert_async(self, *, prompt: str, input_type: PromptDataType = "image_path") -> ConverterResult: + """ + Converter that adds an image to a video + + Args: + prompt (str): The image file name to be added to the video. + input_type (PromptDataType): type of data + Returns: + ConverterResult: The filename of the converted video as a ConverterResult Object + """ + if not self.input_supported(input_type): + raise ValueError("Input type not supported") + + output_video_serializer = data_serializer_factory(category="prompt-memory-entries", data_type="video_path") + + if not self._output_path: + output_video_serializer.value = await output_video_serializer.get_data_filename() + else: + output_video_serializer.value = self._output_path + + # Add video to the image + updated_video = await self._add_image_to_video(image_path=prompt, output_path=output_video_serializer.value) + return ConverterResult(output_text=str(updated_video), output_type="video_path") + + def input_supported(self, input_type: PromptDataType) -> bool: + return input_type == "image_path" + + def output_supported(self, output_type: PromptDataType) -> bool: + return output_type == "video_path" diff --git a/tests/unit/converter/test_add_image_video_converter.py b/tests/unit/converter/test_add_image_video_converter.py new file mode 100644 index 000000000..6d9feba39 --- /dev/null +++ b/tests/unit/converter/test_add_image_video_converter.py @@ -0,0 +1,88 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os + +import cv2 +import numpy as np +import pytest + +from pyrit.prompt_converter import AddImageVideoConverter + + +@pytest.fixture +def video_converter_sample_video(): + # Create a sample video file + video_path = "test_video.mp4" + width, height = 640, 480 + video_encoding = cv2.VideoWriter_fourcc(*"mp4v") + output_video = cv2.VideoWriter(video_path, video_encoding, 1, (width, height)) + # Create a few frames for video + for i in range(10): + frame = np.zeros((height, width, 3), dtype=np.uint8) + output_video.write(frame) + output_video.release() + return video_path + + +@pytest.fixture +def video_converter_sample_image(): + # Create a sample image file + image_path = "test_image.png" + image = np.zeros((100, 100, 3), dtype=np.uint8) + cv2.imwrite(image_path, image) + return image_path + + +def test_add_image_video_converter_initialization(video_converter_sample_video): + converter = AddImageVideoConverter( + video_path=video_converter_sample_video, + output_path="output_video.mp4", + img_position=(10, 10), + img_resize_size=(100, 100), + ) + assert converter._video_path == video_converter_sample_video + assert converter._output_path == "output_video.mp4" + assert converter._img_position == (10, 10) + assert converter._img_resize_size == (100, 100) + os.remove(video_converter_sample_video) + + +@pytest.mark.asyncio +async def test_add_image_video_converter_invalid_image_path(video_converter_sample_video): + converter = AddImageVideoConverter(video_path=video_converter_sample_video, output_path="output_video.mp4") + with pytest.raises(FileNotFoundError): + await converter._add_image_to_video(image_path="invalid_image.png", output_path="output_video.mp4") + os.remove(video_converter_sample_video) + + +@pytest.mark.asyncio +async def test_add_image_video_converter_invalid_video_path(video_converter_sample_image): + converter = AddImageVideoConverter(video_path="invalid_video.mp4", output_path="output_video.mp4") + with pytest.raises(FileNotFoundError): + await converter._add_image_to_video(image_path=video_converter_sample_image, output_path="output_video.mp4") + os.remove(video_converter_sample_image) + + +@pytest.mark.asyncio +async def test_add_image_video_converter(video_converter_sample_video, video_converter_sample_image): + converter = AddImageVideoConverter(video_path=video_converter_sample_video, output_path="output_video.mp4") + output_path = await converter._add_image_to_video( + image_path=video_converter_sample_image, output_path="output_video.mp4" + ) + assert output_path == "output_video.mp4" + os.remove(video_converter_sample_video) + os.remove(video_converter_sample_image) + os.remove("output_video.mp4") + + +@pytest.mark.asyncio +async def test_add_image_video_converter_convert_async(video_converter_sample_video, video_converter_sample_image): + converter = AddImageVideoConverter(video_path=video_converter_sample_video, output_path="output_video.mp4") + converted_video = await converter.convert_async(prompt=video_converter_sample_image, input_type="image_path") + assert converted_video + assert converted_video.output_text == "output_video.mp4" + assert converted_video.output_type == "video_path" + os.remove(video_converter_sample_video) + os.remove(video_converter_sample_image) + os.remove("output_video.mp4")