Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions metadata_fetcher/fetchers/youtube_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import json
from .Fetcher import Fetcher
import requests
from requests.adapters import HTTPAdapter
from requests.adapters import Retry
from urllib.parse import urlencode
from .. import settings


class YoutubeFetcher(Fetcher):
BASE_URL = "https://www.googleapis.com/youtube/v3/"

def __init__(self, params: dict[str]):
"""
Parameters:
params: dict[str]
"""
super(YoutubeFetcher, self).__init__(params)

# If `next_url` is a param, we know that this is not the fetch of the
# first page, so skip setting those attributes
if "next_url" in params:
for key in params:
setattr(self, key, params[key])
return

self.collection_id = params.get("collection_id")
self.external_id = params.get("harvest_data").get("harvest_extra_data")
# 50 is the max per_page value
self.per_page = 1
self.page_token = None
self.next_page_token = None
self.next_url = self.get_current_url()

def get_current_url(self) -> str:
"""
If it's a single video, fetch it, otherwise fetch a page of results from a
playlist or user upload.

Returns: str
"""
if self.is_single():
return self.get_videos_by_id_request([self.external_id])

params = {
"maxResults": self.per_page,
"part": "contentDetails",
"playlistId": self.external_id,
"pageToken": self.page_token or ""
}
endpoint = "playlistItems"

return self.build_request_url(endpoint, params)

def is_single(self) -> bool:
"""
Based on the external_id, determines if it's a single video in the
simplest way possible.

Returns: bool
"""
return len(self.external_id) == 11

def get_videos_by_id_request(self, external_ids: list) -> str:
"""
Parameters:
external_ids: list

Returns: str
"""
params = {
"part": "snippet",
"id": ",".join(external_ids)
}
endpoint = "videos"

return self.build_request_url(endpoint, params)

def build_request_url(self, endpoint: str, params: dict[str]) -> str:
"""
Creates a YouTube API request URL from dictionary of request parameters.

Parameters:
endpoint: str
params: dict[str]

Returns: str
"""
params["key"] = settings.YOUTUBE_API_KEY

return self.BASE_URL + endpoint + "?" + urlencode(params)

def build_fetch_request(self) -> dict[str]:
"""
Generates arguments for `requests.get()`.

Returns: dict[str]
"""
request = {"url": self.get_current_url()}

print(
f"[{self.collection_id}]: Fetching page {self.write_page} "
f"at {request.get('url')}")

return request

def aggregate_vernacular_content(self, content: str) -> str:
"""
If it's a single video, this response is our page content. Otherwise, we
need to iterate the returned records and fetch them by id.

Parameters:
content: str

Returns: str
"""
if self.is_single():
return content

items = json.loads(content)

external_ids = [item.get("contentDetails").get("videoId")
for item in items.get("items")]

return self.get_video_metadata(external_ids).text

def get_video_metadata(self, external_ids: list) -> requests.Response:
"""
Performs a request for video info and returns the response. Attempts
retries.

Parameters:
external_ids: list

Returns: requests.Response
"""
print(
f"[{self.collection_id}]: Fetching videos {external_ids}"
)

session = requests.Session()
retries = Retry(total=3, backoff_factor=2)
session.mount("https://", HTTPAdapter(max_retries=retries))
return session.get(url=self.get_videos_by_id_request(external_ids))

def check_page(self, http_resp: requests.Response) -> int:
"""
Parameters:
http_resp: requests.Response

Returns: int
"""
data = json.loads(http_resp.content)
count = len(data.get("items"))

print(
f"[{self.collection_id}]: Fetched ids for page {self.write_page} "
f"at {http_resp.url} with {count} hits"
)

return count

def increment(self, http_resp: requests.Response):
"""
Sets the `next_url` to fetch and increments the page number.

Parameters:
http_resp: requests.Response
"""
super(YoutubeFetcher, self).increment(http_resp)

data = json.loads(http_resp.content)
self.next_page_token = data.get("nextPageToken", None)

self.next_url = self.get_current_url() if self.next_page_token else None

def json(self) -> str:
"""
Generates JSON for the next page of results.

Returns: str
"""
current_state = {
"harvest_type": self.harvest_type,
"external_id": self.external_id,
"collection_id": self.collection_id,
"next_url": self.next_url,
"page_token": self.next_page_token,
"write_page": self.write_page,
"per_page": self.per_page
}

if not self.next_url:
current_state.update({"finished": True})

return json.dumps(current_state)
1 change: 1 addition & 0 deletions metadata_fetcher/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

NUXEO_TOKEN = os.environ.get('NUXEO')
FLICKR_API_KEY = os.environ.get('FLICKR_API_KEY')
YOUTUBE_API_KEY = os.environ.get('YOUTUBE_API_KEY')

DATA_DEST_URL = os.environ.get("FETCHER_DATA_DEST", "file:///tmp")
DATA_DEST = {
Expand Down
8 changes: 6 additions & 2 deletions metadata_mapper/mappers/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def to_UCLDC(self) -> dict[str, Any]:
# Mapped value may be a function or lambda
self.mapped_data = {k: v() if isinstance(v, Callable) else v for (k, v)
in mapped_data.items()}

return self.mapped_data

def UCLDC_map(self) -> dict:
Expand Down Expand Up @@ -494,10 +493,15 @@ def move_date_values(self, prop, dest="temporal"):

remove = []
for value in src_values:
if not isinstance(value, str):
if isinstance(value, dict):
if "name" not in value:
continue
value = value.get("name")
elif not isinstance(value, str):
continue
cleaned_value = re.sub(r"[\(\)\.\?]", "", value)
cleaned_value = cleaned_value.strip()

for pattern in constants.move_date_value_reg_search:
matches = re.compile(pattern, re.I).findall(cleaned_value)
if (len(matches) == 1 and
Expand Down
57 changes: 57 additions & 0 deletions metadata_mapper/mappers/youtube/youtube_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import json
from typing import Optional
from ..mapper import Vernacular, Record


class YoutubeRecord(Record):
def UCLDC_map(self):
return {
"calisphere-id": self.legacy_couch_db_id.split('--')[1],
"isShownAt": self.map_is_shown_at,
"isShownBy": self.map_is_shown_by,
"description": self.map_description,
"subject": self.map_subject,
"title": self.map_title
}

def map_is_shown_at(self) -> str:
video_id = self.source_metadata.get("id")
return f"https://www.youtube.com/watch?v={video_id}"

def map_is_shown_by(self) -> Optional[str]:
thumbnails = self.source_metadata.get("snippet", {}).get("thumbnails")
for thumb_label in ["standard", "high", "medium", "default"]:
thumb_url = thumbnails.get(thumb_label, {}).get("url")
if thumb_url is not None:
break
else:
thumb_url = None

return thumb_url

def map_description(self) -> list:
return [self.source_metadata.get("snippet", {}).get("description")]

def map_subject(self) -> list:
return [{"name": v} for v in
self.source_metadata.get("snippet", {}).get("tags", [])]

def map_title(self) -> list:
return [self.source_metadata.get("snippet", {}).get("title")]


class YoutubeVernacular(Vernacular):
record_cls = YoutubeRecord

def parse(self, api_response: str) -> list:
def modify_record(record: dict) -> dict:
record.update({"calisphere-id": f"{self.collection_id}--"
f"{record.get('id')}"})
return record

records = [modify_record(record) for record
in json.loads(api_response).get("items")]

records = self.get_records(records)
return records