Skip to content

Commit

Permalink
Download file before starting the stream in video-loop-source (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh authored May 4, 2023
1 parent 6d41cec commit cfbe136
Showing 1 changed file with 41 additions and 81 deletions.
122 changes: 41 additions & 81 deletions adapters/gst/gst_plugins/python/media_files_src_bin.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import re
import subprocess
from enum import Enum
from fractions import Fraction
from pathlib import Path
from typing import BinaryIO, List, Optional, Union
from typing import List, Optional, Union
from urllib.parse import urlparse
from urllib.request import urlretrieve

from savant.gstreamer import GLib, GObject, Gst
from savant.gstreamer.codecs import Codec, CODEC_BY_CAPS_NAME
Expand All @@ -23,6 +25,8 @@ class FileType(Enum):
FileType.PICTURE: re.compile(r'image/(jpeg|png)'),
}

SIZE_MB = 2**20


class MediaFilesSrcBin(LoggerMixin, Gst.Bin):
GST_PLUGIN_NAME = 'media_files_src_bin'
Expand Down Expand Up @@ -102,6 +106,8 @@ def __init__(self, *args, **kwargs):
self.pending_locations: List[str] = []
self.source: Gst.Element = None

self.downloaded_bytes = 0

self.typefind: Gst.Element = Gst.ElementFactory.make('typefind')
self.typefind.connect('have-type', self.on_typefind_have_type)
self.add(self.typefind)
Expand Down Expand Up @@ -136,29 +142,27 @@ def do_set_state(self, state: Gst.State):
if isinstance(self.location, Path):
self.pending_locations = self.list_files()
self.source: Gst.Element = Gst.ElementFactory.make('filesrc')
elif self.download_path:
elif self.loop_file:
download_filepath = self.get_download_filepath(self.location)
if download_filepath.exists():
self.logger.info(
'%r already downloaded to %r', self.location, download_filepath
)
self.pending_locations = [str(download_filepath)]
self.source: Gst.Element = Gst.ElementFactory.make('filesrc')
else:
if not download_filepath.exists():
self.logger.info(
'Downloading %r to %r', self.location, download_filepath
)
download_filepath.parent.mkdir(parents=True, exist_ok=True)
download_file = open(download_filepath, 'wb')
self.pending_locations = [self.location]
if self.loop_file:
self.pending_locations.append(str(download_filepath))
self.source: Gst.Element = Gst.ElementFactory.make('souphttpsrc')
self.source.get_static_pad('src').add_probe(
Gst.PadProbeType.BUFFER | Gst.PadProbeType.EVENT_DOWNSTREAM,
self.download_file_probe,
download_file,
tmp_filepath = (
download_filepath.parent / f'.tmp-{download_filepath.name}'
)
urlretrieve(self.location, tmp_filepath, self.download_report)
os.rename(tmp_filepath, download_filepath)
self.logger.info(
'%r downloaded to %r', self.location, download_filepath
)
else:
self.logger.info(
'%r already downloaded to %r', self.location, download_filepath
)
self.pending_locations = [str(download_filepath)]
self.source: Gst.Element = Gst.ElementFactory.make('filesrc')
else:
self.pending_locations = [self.location]
self.source: Gst.Element = Gst.ElementFactory.make('souphttpsrc')
Expand All @@ -175,58 +179,29 @@ def do_set_state(self, state: Gst.State):
def validate_properties(self):
assert self.location is not None, '"location" property is required'
assert self.file_type is not None, '"file-type" property is required'
if self.download_path is not None:
if self.download_path.exists():
assert (
self.download_path.is_dir()
), '"download-path" must be a directory'
if self.loop_file:
assert (
self.file_type == FileType.VIDEO
), f'Only "file-type={FileType.VIDEO.value}" is allowed when "loop-file" is enabled'
if not isinstance(self.location, Path):
assert (
self.download_path is not None
), '"download-path" property is required when "loop-file" is enabled'
if not self.loop_file:
return
assert (
self.file_type == FileType.VIDEO
), f'Only "file-type={FileType.VIDEO.value}" is allowed when "loop-file" is enabled'
if not isinstance(self.location, Path):
return
assert (
self.download_path is not None
), '"download-path" property is required when "loop-file" is enabled'
if self.download_path.exists():
assert self.download_path.is_dir(), '"download-path" must be a directory'

def get_download_filepath(self, location: str):
parsed_location = urlparse(location)
return (self.download_path / parsed_location.path.lstrip('/')).absolute()

def download_file_probe(
self,
pad: Gst.Pad,
info: Gst.PadProbeInfo,
file: BinaryIO,
):
if info.type & Gst.PadProbeType.BUFFER:
buffer: Gst.Buffer = info.get_buffer()
mapinfo: Gst.MapInfo
mapped, mapinfo = buffer.map(Gst.MapFlags.READ)
assert mapped, f'Failed to map buffer {buffer}'
try:
self.logger.debug(
'Writing %s bytes at offset %s to %r',
mapinfo.size,
buffer.offset,
file.name,
)
file.seek(buffer.offset)
file.write(mapinfo.data)
finally:
buffer.unmap(mapinfo)

return Gst.PadProbeReturn.OK

if info.type & Gst.PadProbeType.EVENT_DOWNSTREAM:
event: Gst.Event = info.get_event()
if event.type == Gst.EventType.EOS:
self.logger.info('Flushing %r', file.name)
file.flush()
file.close()
return Gst.PadProbeReturn.REMOVE

return Gst.PadProbeReturn.PASS
def download_report(self, blocks: int, block_size: int, total: Optional[int]):
last_downloaded_mb = self.downloaded_bytes / SIZE_MB
self.downloaded_bytes += block_size
downloaded_mb = self.downloaded_bytes / SIZE_MB
if int(downloaded_mb) > int(last_downloaded_mb):
self.logger.info('Downloaded %.2f MB.', downloaded_mb)

def list_files(self):
assert self.location.exists(), f'No such file or directory "{self.location}"'
Expand Down Expand Up @@ -441,17 +416,6 @@ def start_next_file(self, file_location: str):
self.remove(elem)
self._elements = []

if self.source.get_factory().get_name() != 'filesrc':
# To change "souphttpsrc" to "filesrc" after the first loop
self.logger.info('Remove element %r', self.source.get_name())
self.source.set_locked_state(True)
self.source.set_state(Gst.State.NULL)
self.remove(self.source)
self.source: Gst.Element = Gst.ElementFactory.make('filesrc')
self.logger.info('Add element %r', self.source.get_name())
self.add(self.source)
assert self.source.link(self.typefind)

self.logger.info('Set location %s', file_location)
self.source.set_property('location', file_location)
self.typefind.sync_state_with_parent()
Expand All @@ -460,11 +424,7 @@ def start_next_file(self, file_location: str):
return False

def pop_next_location(self):
if self.loop_file and len(self.pending_locations) == 1:
# When "self.loop_file=True" and "self.location" is HTTP URL
# "self.pending_locations" contains 2 locations: the URL and
# the locations of downloaded file.
# The URL is used only for the first loop.
if self.loop_file:
return self.pending_locations[0]
else:
return self.pending_locations.pop(0)
Expand Down

0 comments on commit cfbe136

Please sign in to comment.