Skip to content

Commit

Permalink
video downloader: do not continue if dataset consistently does not co…
Browse files Browse the repository at this point in the history
…ntain videos at same domain; wait if the same domain is being hit repeatedly, ensure video metadata only counted once
  • Loading branch information
dale-wahl committed Dec 20, 2024
1 parent 817b4ee commit d4c43a7
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions processors/visualisation/download_videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import requests
import yt_dlp
from ural import urls_from_text
from urllib.parse import urlparse
from yt_dlp import DownloadError
from yt_dlp.utils import ExistingVideoReached

Expand Down Expand Up @@ -316,6 +317,8 @@ def dmi_match_filter(vid_info, *, incomplete):
failed_downloads = 0
copied_videos = 0
consecutive_errors = 0
not_a_video = 0
last_domains = []
self.total_possible_videos = min(len(urls), amount) if amount != 0 else len(urls)
yt_dlp_archive_map = {}
for url in urls:
Expand All @@ -325,9 +328,10 @@ def dmi_match_filter(vid_info, *, incomplete):
if previous_vid_metadata.get('success', False):
# Use previous downloaded video
try:
self.dataset.log(f"Copying previously downloaded video for url: {url}")
num_copied = self.copy_previous_video(previous_vid_metadata, results_path, vid_lib.previous_downloaders)
urls[url] = previous_vid_metadata
self.dataset.log(f"Copied previously downloaded video to current dataset for url: {url}")
self.dataset.update_status(f"Copied previously downloaded video to current dataset.")
copied_videos += num_copied
continue
except FailedToCopy as e:
Expand All @@ -340,11 +344,13 @@ def dmi_match_filter(vid_info, *, incomplete):

urls[url]["success"] = False
urls[url]["retry"] = True
last_domains = last_domains[-4:] + [urlparse(url).netloc]

# Stop processing if worker has been asked to stop or max downloads reached
if self.downloaded_videos >= amount and amount != 0:
urls[url]["error"] = "Max video download limit already reached."
continue

if self.interrupted:
raise ProcessorInterruptedException("Interrupted while downloading videos.")

Expand All @@ -356,7 +362,17 @@ def dmi_match_filter(vid_info, *, incomplete):
else:
message = "Downloaded %i videos. Errors %i consecutive times; check logs to ensure " \
"video URLs are working links and you are not being blocked." % (self.downloaded_videos, consecutive_errors)
self.dataset.update_status(message, is_final=True)
elf.dataset.update_status(message, is_final=True)
if self.downloaded_videos == 0:
self.dataset.finish(0)
return
else:
# Finish processor with already downloaded videos
break
if not_a_video >= 10 and last_domains.count(urlparse(url).netloc) == 5:
# This processor can be used to extract all links from text body and attempt to download any with videos
# If the same domain is encountered 5 times in a row and no links are to videos, we are assuming the user has poorly chosen their columns and no videos will be found
self.dataset.update_status(f"Too many consecutive non-video URLs encountered; {'try again with Non-direct videos option selected' if self.config.get('video-downloader.allow-indirect') else 'try extracting URLs and filtering dataset first'}.", is_final=True)
if self.downloaded_videos == 0:
self.dataset.finish(0)
return
Expand Down Expand Up @@ -407,7 +423,7 @@ def dmi_match_filter(vid_info, *, incomplete):
:100] + '_%(autonumber)s.%(ext)s'
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Count and use self.yt_dlp_monitor() to ensure sure we don't download videos forever...
self.url_files = []
self.url_files = {}
self.last_dl_status = {}
self.last_post_process_status = {}
self.dataset.update_status("Downloading %i/%i via yt-dlp: %s" % (self.downloaded_videos + 1, self.total_possible_videos, url))
Expand All @@ -424,7 +440,7 @@ def dmi_match_filter(vid_info, *, incomplete):
with yt_dlp.YoutubeDL({"socket_timeout": 30}) as ydl2:
info2 = ydl2.extract_info(url, download=False)
if info2:
self.url_files.append(yt_dlp_archive_map[info2.get('extractor') + info2.get('id')])
self.url_files[info2.get('_filename', {})] = yt_dlp_archive_map[info2.get('extractor') + info2.get('id')]
self.dataset.log("Already downloaded video associated with: %s" % url)
else:
message = f"Video identified, but unable to identify which video from {url}"
Expand Down Expand Up @@ -468,9 +484,9 @@ def dmi_match_filter(vid_info, *, incomplete):

# Add file data collected by YT-DLP
urls[url]["downloader"] = "yt_dlp"
urls[url]['files'] = self.url_files
urls[url]['files'] = list(self.url_files.values())
# Add to archive mapping in case needed
for file in self.url_files:
for file in self.url_files.values():
yt_dlp_archive_map[
file.get('metadata').get('extractor') + file.get('metadata').get('id')] = file

Expand All @@ -480,13 +496,18 @@ def dmi_match_filter(vid_info, *, incomplete):

else:
# No YT-DLP; move on
self.dataset.log(f"Unknown Request Error: {str(e)}")
self.dataset.log(f"NotVideoLinkError: {str(e)}")
not_a_video += 1
urls[url]["error"] = str(e)
if last_domains.count(urlparse(url).netloc) >= 2:
# Same domain encountered at least twice; let's wait before getting blocked
time.sleep(5 * not_a_video)
continue

urls[url]["success"] = success
if success:
consecutive_errors = 0
not_a_video = 0

# Update status
self.downloaded_videos += len(self.videos_downloaded_from_url)
Expand All @@ -495,6 +516,7 @@ def dmi_match_filter(vid_info, *, incomplete):
(f"; {failed_downloads} URLs failed." if failed_downloads > 0 else ""))
self.dataset.update_progress(self.downloaded_videos / self.total_possible_videos)

self.dataset.update_status("Updating and saving metadata")
# Save some metadata to be able to connect the videos to their source
metadata = {
url: {
Expand All @@ -510,7 +532,8 @@ def dmi_match_filter(vid_info, *, incomplete):
self.dataset.update_status(f"Downloaded {self.downloaded_videos} videos" +
(f"; videos copied from {copied_videos} previous downloads" if copied_videos > 0 else "") +
(f"; {failed_downloads} URLs failed." if failed_downloads > 0 else ""), is_final=True)
self.write_archive_and_finish(results_path, len([x for x in urls.values() if x.get('success')]))
self.dataset.update_status("Writing downloaded videos to zip archive")
self.write_archive_and_finish(results_path, self.downloaded_videos+copied_videos)

def yt_dlp_monitor(self, d):
"""
Expand All @@ -534,11 +557,11 @@ def yt_dlp_post_monitor(self, d):
self.last_post_process_status = d
if d['status'] == 'finished': # "downloading", "error", or "finished"
self.videos_downloaded_from_url.add(d.get('info_dict',{}).get('_filename', {}))
self.url_files.append({
self.url_files[d.get('info_dict',{}).get('_filename', {})] = {
"filename": Path(d.get('info_dict').get('_filename')).name,
"metadata": d.get('info_dict'),
"success": True
})
}

# Make sure we can stop downloads
if self.interrupted:
Expand Down Expand Up @@ -714,6 +737,7 @@ def copy_previous_video(self, previous_vid_metadata, staging_area, previous_down
if file.get("filename") not in archive_contents:
raise FailedToCopy(f"Previously downloaded video {file.get('filename')} not found")

self.dataset.log(f"Copying previously downloaded video {file.get('filename')} to new staging area")
archive_file.extract(file.get("filename"), staging_area)
num_copied += 1

Expand Down

0 comments on commit d4c43a7

Please sign in to comment.