diff --git a/adapters/gst/gst_plugins/python/media_files_src_bin.py b/adapters/gst/gst_plugins/python/media_files_src_bin.py index 9a6d8107..2240b2f1 100644 --- a/adapters/gst/gst_plugins/python/media_files_src_bin.py +++ b/adapters/gst/gst_plugins/python/media_files_src_bin.py @@ -1,10 +1,12 @@ +import os import re import subprocess from enum import Enum from fractions import Fraction from pathlib import Path -from typing import BinaryIO, List, Optional, Union +from typing import List, Optional, Union from urllib.parse import urlparse +from urllib.request import urlretrieve from savant.gstreamer import GLib, GObject, Gst from savant.gstreamer.codecs import Codec, CODEC_BY_CAPS_NAME @@ -23,6 +25,8 @@ class FileType(Enum): FileType.PICTURE: re.compile(r'image/(jpeg|png)'), } +SIZE_MB = 2**20 + class MediaFilesSrcBin(LoggerMixin, Gst.Bin): GST_PLUGIN_NAME = 'media_files_src_bin' @@ -102,6 +106,8 @@ def __init__(self, *args, **kwargs): self.pending_locations: List[str] = [] self.source: Gst.Element = None + self.downloaded_bytes = 0 + self.typefind: Gst.Element = Gst.ElementFactory.make('typefind') self.typefind.connect('have-type', self.on_typefind_have_type) self.add(self.typefind) @@ -136,29 +142,27 @@ def do_set_state(self, state: Gst.State): if isinstance(self.location, Path): self.pending_locations = self.list_files() self.source: Gst.Element = Gst.ElementFactory.make('filesrc') - elif self.download_path: + elif self.loop_file: download_filepath = self.get_download_filepath(self.location) - if download_filepath.exists(): - self.logger.info( - '%r already downloaded to %r', self.location, download_filepath - ) - self.pending_locations = [str(download_filepath)] - self.source: Gst.Element = Gst.ElementFactory.make('filesrc') - else: + if not download_filepath.exists(): self.logger.info( 'Downloading %r to %r', self.location, download_filepath ) download_filepath.parent.mkdir(parents=True, exist_ok=True) - download_file = open(download_filepath, 'wb') - self.pending_locations = [self.location] - if self.loop_file: - self.pending_locations.append(str(download_filepath)) - self.source: Gst.Element = Gst.ElementFactory.make('souphttpsrc') - self.source.get_static_pad('src').add_probe( - Gst.PadProbeType.BUFFER | Gst.PadProbeType.EVENT_DOWNSTREAM, - self.download_file_probe, - download_file, + tmp_filepath = ( + download_filepath.parent / f'.tmp-{download_filepath.name}' ) + urlretrieve(self.location, tmp_filepath, self.download_report) + os.rename(tmp_filepath, download_filepath) + self.logger.info( + '%r downloaded to %r', self.location, download_filepath + ) + else: + self.logger.info( + '%r already downloaded to %r', self.location, download_filepath + ) + self.pending_locations = [str(download_filepath)] + self.source: Gst.Element = Gst.ElementFactory.make('filesrc') else: self.pending_locations = [self.location] self.source: Gst.Element = Gst.ElementFactory.make('souphttpsrc') @@ -175,58 +179,29 @@ def do_set_state(self, state: Gst.State): def validate_properties(self): assert self.location is not None, '"location" property is required' assert self.file_type is not None, '"file-type" property is required' - if self.download_path is not None: - if self.download_path.exists(): - assert ( - self.download_path.is_dir() - ), '"download-path" must be a directory' - if self.loop_file: - assert ( - self.file_type == FileType.VIDEO - ), f'Only "file-type={FileType.VIDEO.value}" is allowed when "loop-file" is enabled' - if not isinstance(self.location, Path): - assert ( - self.download_path is not None - ), '"download-path" property is required when "loop-file" is enabled' + if not self.loop_file: + return + assert ( + self.file_type == FileType.VIDEO + ), f'Only "file-type={FileType.VIDEO.value}" is allowed when "loop-file" is enabled' + if not isinstance(self.location, Path): + return + assert ( + self.download_path is not None + ), '"download-path" property is required when "loop-file" is enabled' + if self.download_path.exists(): + assert self.download_path.is_dir(), '"download-path" must be a directory' def get_download_filepath(self, location: str): parsed_location = urlparse(location) return (self.download_path / parsed_location.path.lstrip('/')).absolute() - def download_file_probe( - self, - pad: Gst.Pad, - info: Gst.PadProbeInfo, - file: BinaryIO, - ): - if info.type & Gst.PadProbeType.BUFFER: - buffer: Gst.Buffer = info.get_buffer() - mapinfo: Gst.MapInfo - mapped, mapinfo = buffer.map(Gst.MapFlags.READ) - assert mapped, f'Failed to map buffer {buffer}' - try: - self.logger.debug( - 'Writing %s bytes at offset %s to %r', - mapinfo.size, - buffer.offset, - file.name, - ) - file.seek(buffer.offset) - file.write(mapinfo.data) - finally: - buffer.unmap(mapinfo) - - return Gst.PadProbeReturn.OK - - if info.type & Gst.PadProbeType.EVENT_DOWNSTREAM: - event: Gst.Event = info.get_event() - if event.type == Gst.EventType.EOS: - self.logger.info('Flushing %r', file.name) - file.flush() - file.close() - return Gst.PadProbeReturn.REMOVE - - return Gst.PadProbeReturn.PASS + def download_report(self, blocks: int, block_size: int, total: Optional[int]): + last_downloaded_mb = self.downloaded_bytes / SIZE_MB + self.downloaded_bytes += block_size + downloaded_mb = self.downloaded_bytes / SIZE_MB + if int(downloaded_mb) > int(last_downloaded_mb): + self.logger.info('Downloaded %.2f MB.', downloaded_mb) def list_files(self): assert self.location.exists(), f'No such file or directory "{self.location}"' @@ -441,17 +416,6 @@ def start_next_file(self, file_location: str): self.remove(elem) self._elements = [] - if self.source.get_factory().get_name() != 'filesrc': - # To change "souphttpsrc" to "filesrc" after the first loop - self.logger.info('Remove element %r', self.source.get_name()) - self.source.set_locked_state(True) - self.source.set_state(Gst.State.NULL) - self.remove(self.source) - self.source: Gst.Element = Gst.ElementFactory.make('filesrc') - self.logger.info('Add element %r', self.source.get_name()) - self.add(self.source) - assert self.source.link(self.typefind) - self.logger.info('Set location %s', file_location) self.source.set_property('location', file_location) self.typefind.sync_state_with_parent() @@ -460,11 +424,7 @@ def start_next_file(self, file_location: str): return False def pop_next_location(self): - if self.loop_file and len(self.pending_locations) == 1: - # When "self.loop_file=True" and "self.location" is HTTP URL - # "self.pending_locations" contains 2 locations: the URL and - # the locations of downloaded file. - # The URL is used only for the first loop. + if self.loop_file: return self.pending_locations[0] else: return self.pending_locations.pop(0)