-
Notifications
You must be signed in to change notification settings - Fork 438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT Video Converter: Adding Images to Videos #702
base: main
Are you sure you want to change the base?
Changes from 19 commits
1367ae0
48e3c9e
a24a804
c425af0
0997c53
40e9b00
841a7e3
a598b14
cb15543
c77e9e8
e8229f2
419d7da
39953eb
e8cdb43
023f8bf
81870a0
5f16d2b
60e4d18
ddc1728
eea9c3b
cafb2b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Adding Images to a Video" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"text/plain": [ | ||
"ConverterResult(output_text='C:\\\\Users\\\\bjagdagdorj\\\\Documents\\\\tools\\\\pyrit2\\\\PyRIT\\\\dbdata\\\\prompt-memory-entries\\\\videos\\\\1741015011507304.mp4', output_type='video_path')" | ||
] | ||
}, | ||
"execution_count": null, | ||
"metadata": {}, | ||
"output_type": "execute_result" | ||
} | ||
], | ||
"source": [ | ||
"import pathlib\n", | ||
"\n", | ||
"from pyrit.common import IN_MEMORY, initialize_pyrit\n", | ||
"from pyrit.prompt_converter import AddImageVideoConverter\n", | ||
"\n", | ||
"initialize_pyrit(memory_db_type=IN_MEMORY)\n", | ||
"\n", | ||
"input_video = str(pathlib.Path(\".\") / \"..\" / \"..\" / \"..\" / \"assets\" / \"sample_video.mp4\")\n", | ||
"input_image = str(pathlib.Path(\".\") / \"..\" / \"..\" / \"..\" / \"assets\" / \"pyrit_architecture.png\")\n", | ||
"\n", | ||
"video = AddImageVideoConverter(video_path=input_video)\n", | ||
"converted_vid = await video.convert_async(prompt=input_image, input_type=\"image_path\")\n", | ||
"converted_vid" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.11.9" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# --- | ||
# jupyter: | ||
# jupytext: | ||
# text_representation: | ||
# extension: .py | ||
# format_name: percent | ||
# format_version: '1.3' | ||
# jupytext_version: 1.16.4 | ||
# --- | ||
|
||
# %% [markdown] | ||
# # Adding Images to a Video | ||
|
||
# %% | ||
import pathlib | ||
|
||
from pyrit.common import IN_MEMORY, initialize_pyrit | ||
from pyrit.prompt_converter import AddImageVideoConverter | ||
|
||
initialize_pyrit(memory_db_type=IN_MEMORY) | ||
|
||
input_video = str(pathlib.Path(".") / ".." / ".." / ".." / "assets" / "sample_video.mp4") | ||
output_video = str(pathlib.Path(".") / ".." / ".." / ".." / "assets" / "sample_output_video.mp4") | ||
input_image = str(pathlib.Path(".") / ".." / ".." / ".." / "assets" / "ice_cream.png") | ||
|
||
video = AddImageVideoConverter(video_path=input_video, img_resize_size=(100, 100)) | ||
converted_vid = await video.convert_async(prompt=input_image, input_type="image_path") | ||
converted_vid |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ dependencies = [ | |
"ipykernel>=6.29.4", | ||
"numpy>=1.26.4", | ||
"openai>=1.58.1", | ||
"opencv-python>=4.11.0.86", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you figure out what this pulls in? Might be a rather heavyweight package. If you're not sure how please lmk and we can do it together 🙂 |
||
"pillow>=10.3.0", | ||
"pydantic>=2.7.1", | ||
"pyodbc>=5.1.0", | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,180 @@ | ||||||
# Copyright (c) Microsoft Corporation. | ||||||
# Licensed under the MIT license. | ||||||
|
||||||
import logging | ||||||
import os | ||||||
from pathlib import Path | ||||||
|
||||||
import cv2 | ||||||
jbolor21 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
import numpy as np | ||||||
|
||||||
from pyrit.common.path import DB_DATA_PATH | ||||||
from pyrit.models import PromptDataType, data_serializer_factory | ||||||
from pyrit.prompt_converter import ConverterResult, PromptConverter | ||||||
|
||||||
logger = logging.getLogger(__name__) | ||||||
|
||||||
|
||||||
# Choose the codec based on extension | ||||||
video_encoding_map = { | ||||||
"mp4": "mp4v", | ||||||
"avi": "XVID", | ||||||
"mov": "MJPG", | ||||||
"mkv": "X264", | ||||||
} | ||||||
|
||||||
|
||||||
class AddImageVideoConverter(PromptConverter): | ||||||
""" | ||||||
Adds an image to a video at a specified position. | ||||||
Also, currently the image is placed in the whole video, not at a specific timepoint | ||||||
|
||||||
Args: | ||||||
video_path (str): File path of video to add image to | ||||||
output_path (str, Optional): File path of output video. Defaults to None. | ||||||
img_position (tuple, Optional): Position to place image in video. Defaults to (10, 10). | ||||||
img_resize_size (tuple, Optional): Size to resize image to. Defaults to (500, 500). | ||||||
""" | ||||||
|
||||||
def __init__( | ||||||
self, | ||||||
video_path: str, | ||||||
output_path: str = None, | ||||||
img_position: tuple = (10, 10), | ||||||
img_resize_size: tuple = (500, 500), | ||||||
): | ||||||
if not video_path: | ||||||
raise ValueError("Please provide valid video path") | ||||||
|
||||||
self._output_path = output_path | ||||||
self._img_position = img_position | ||||||
self._img_resize_size = img_resize_size | ||||||
self._video_path = video_path | ||||||
|
||||||
async def _add_image_to_video(self, image_path: str, output_path: str): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggest |
||||||
""" | ||||||
Adds image to video | ||||||
Args: | ||||||
image_path (str): The image path to add to video. | ||||||
output_path (str): The output video path. | ||||||
|
||||||
Returns: | ||||||
output_path (str): The output video path. | ||||||
""" | ||||||
|
||||||
if not image_path: | ||||||
nina-msft marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
raise ValueError("Please provide valid image path value") | ||||||
|
||||||
input_image_data = data_serializer_factory( | ||||||
category="prompt-memory-entries", data_type="image_path", value=image_path | ||||||
) | ||||||
input_video_data = data_serializer_factory( | ||||||
category="prompt-memory-entries", data_type="video_path", value=self._video_path | ||||||
) | ||||||
|
||||||
# Open the video to ensure it exists | ||||||
video_bytes = await input_video_data.read_data() | ||||||
|
||||||
azure_storage_flag = input_video_data._is_azure_storage_url(self._video_path) | ||||||
video_path = self._video_path | ||||||
|
||||||
try: | ||||||
if azure_storage_flag: | ||||||
# If the video is in Azure storage, download it first | ||||||
|
||||||
# Save the video bytes to a temporary file | ||||||
local_temp_path = Path(DB_DATA_PATH, "temp_video.mp4") | ||||||
with open(local_temp_path, "wb") as f: | ||||||
f.write(video_bytes) | ||||||
video_path = str(local_temp_path) | ||||||
|
||||||
cap = cv2.VideoCapture(video_path) | ||||||
|
||||||
# Get video properties | ||||||
fps = int(cap.get(cv2.CAP_PROP_FPS)) | ||||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | ||||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | ||||||
file_extension = video_path.split(".")[-1].lower() | ||||||
if file_extension in video_encoding_map: | ||||||
video_char_code = cv2.VideoWriter_fourcc(*video_encoding_map[file_extension]) | ||||||
output_video = cv2.VideoWriter(output_path, video_char_code, fps, (width, height)) | ||||||
else: | ||||||
raise ValueError(f"Unsupported video format: {file_extension}") | ||||||
|
||||||
# Load and resize the overlay image | ||||||
|
||||||
input_image_bytes = await input_image_data.read_data() | ||||||
image_np_arr = np.frombuffer(input_image_bytes, np.uint8) | ||||||
overlay = cv2.imdecode(image_np_arr, cv2.IMREAD_UNCHANGED) | ||||||
overlay = cv2.resize(overlay, self._img_resize_size) | ||||||
|
||||||
# Get overlay image dimensions | ||||||
image_height, image_width, _ = overlay.shape | ||||||
x, y = self._img_position # Position where the overlay will be placed | ||||||
|
||||||
while cap.isOpened(): | ||||||
ret, frame = cap.read() | ||||||
if not ret: | ||||||
break | ||||||
|
||||||
# Ensure overlay fits within the frame boundaries | ||||||
if x + image_width > width or y + image_height > height: | ||||||
logger.info("Overlay image is too large for the video frame. Resizing to fit.") | ||||||
overlay = cv2.resize(overlay, (width - x, height - y)) | ||||||
image_height, image_width, _ = overlay.shape | ||||||
|
||||||
# Blend overlay with frame | ||||||
if overlay.shape[2] == 4: # Check number of channels on image | ||||||
alpha_overlay = overlay[:, :, 3] / 255.0 | ||||||
for c in range(0, 3): | ||||||
frame[y : y + image_height, x : x + image_width, c] = ( | ||||||
alpha_overlay * overlay[:, :, c] | ||||||
+ (1 - alpha_overlay) * frame[y : y + image_height, x : x + image_width, c] | ||||||
) | ||||||
else: | ||||||
frame[y : y + image_height, x : x + image_width] = overlay | ||||||
|
||||||
# Write the modified frame to the output video | ||||||
output_video.write(frame) | ||||||
|
||||||
finally: | ||||||
# Release everything | ||||||
cap.release() | ||||||
output_video.release() | ||||||
cv2.destroyAllWindows() | ||||||
if azure_storage_flag: | ||||||
os.remove(local_temp_path) | ||||||
|
||||||
logger.info(f"Video saved as {output_path}") | ||||||
jsong468 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
return output_path | ||||||
|
||||||
async def convert_async(self, *, prompt: str, input_type: PromptDataType = "image_path") -> ConverterResult: | ||||||
""" | ||||||
Converter that adds an image to a video | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT:
Suggested change
|
||||||
|
||||||
Args: | ||||||
prompt (str): The image file name to be added to the video. | ||||||
input_type (PromptDataType): type of data | ||||||
Returns: | ||||||
ConverterResult: The filename of the converted video as a ConverterResult Object | ||||||
""" | ||||||
if not self.input_supported(input_type): | ||||||
raise ValueError("Input type not supported") | ||||||
|
||||||
output_video_serializer = data_serializer_factory(category="prompt-memory-entries", data_type="video_path") | ||||||
|
||||||
if not self._output_path: | ||||||
output_video_serializer.value = await output_video_serializer.get_data_filename() | ||||||
else: | ||||||
output_video_serializer.value = self._output_path | ||||||
|
||||||
# Add video to the image | ||||||
updated_video = await self._add_image_to_video(image_path=prompt, output_path=output_video_serializer.value) | ||||||
return ConverterResult(output_text=str(updated_video), output_type="video_path") | ||||||
|
||||||
def input_supported(self, input_type: PromptDataType) -> bool: | ||||||
return input_type == "image_path" | ||||||
jbolor21 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
def output_supported(self, output_type: PromptDataType) -> bool: | ||||||
return output_type == "video_path" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT license. | ||
|
||
import os | ||
|
||
import cv2 | ||
import numpy as np | ||
import pytest | ||
|
||
from pyrit.prompt_converter import AddImageVideoConverter | ||
|
||
|
||
@pytest.fixture | ||
def video_converter_sample_video(): | ||
# Create a sample video file | ||
video_path = "test_video.mp4" | ||
width, height = 640, 480 | ||
video_encoding = cv2.VideoWriter_fourcc(*"mp4v") | ||
output_video = cv2.VideoWriter(video_path, video_encoding, 1, (width, height)) | ||
# Create a few frames for video | ||
for i in range(10): | ||
frame = np.zeros((height, width, 3), dtype=np.uint8) | ||
output_video.write(frame) | ||
output_video.release() | ||
return video_path | ||
|
||
|
||
@pytest.fixture | ||
def video_converter_sample_image(): | ||
# Create a sample image file | ||
image_path = "test_image.png" | ||
image = np.zeros((100, 100, 3), dtype=np.uint8) | ||
cv2.imwrite(image_path, image) | ||
return image_path | ||
|
||
|
||
def test_add_image_video_converter_initialization(video_converter_sample_video): | ||
converter = AddImageVideoConverter( | ||
video_path=video_converter_sample_video, | ||
output_path="output_video.mp4", | ||
img_position=(10, 10), | ||
img_resize_size=(100, 100), | ||
) | ||
assert converter._video_path == video_converter_sample_video | ||
assert converter._output_path == "output_video.mp4" | ||
assert converter._img_position == (10, 10) | ||
assert converter._img_resize_size == (100, 100) | ||
os.remove(video_converter_sample_video) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_add_image_video_converter_invalid_image_path(video_converter_sample_video): | ||
converter = AddImageVideoConverter(video_path=video_converter_sample_video, output_path="output_video.mp4") | ||
with pytest.raises(FileNotFoundError): | ||
await converter._add_image_to_video(image_path="invalid_image.png", output_path="output_video.mp4") | ||
os.remove(video_converter_sample_video) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_add_image_video_converter_invalid_video_path(video_converter_sample_image): | ||
converter = AddImageVideoConverter(video_path="invalid_video.mp4", output_path="output_video.mp4") | ||
with pytest.raises(FileNotFoundError): | ||
await converter._add_image_to_video(image_path=video_converter_sample_image, output_path="output_video.mp4") | ||
os.remove(video_converter_sample_image) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_add_image_video_converter(video_converter_sample_video, video_converter_sample_image): | ||
converter = AddImageVideoConverter(video_path=video_converter_sample_video, output_path="output_video.mp4") | ||
output_path = await converter._add_image_to_video( | ||
image_path=video_converter_sample_image, output_path="output_video.mp4" | ||
) | ||
assert output_path == "output_video.mp4" | ||
os.remove(video_converter_sample_video) | ||
os.remove(video_converter_sample_image) | ||
os.remove("output_video.mp4") | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_add_image_video_converter_convert_async(video_converter_sample_video, video_converter_sample_image): | ||
converter = AddImageVideoConverter(video_path=video_converter_sample_video, output_path="output_video.mp4") | ||
converted_video = await converter.convert_async(prompt=video_converter_sample_image, input_type="image_path") | ||
assert converted_video | ||
assert converted_video.output_text == "output_video.mp4" | ||
assert converted_video.output_type == "video_path" | ||
os.remove(video_converter_sample_video) | ||
os.remove(video_converter_sample_image) | ||
os.remove("output_video.mp4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this
output_video
path isn't being used right now, make sure to include it in the converter init on L26