-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtoDatabase.py
376 lines (340 loc) · 14.7 KB
/
toDatabase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import sqlite3
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from datetime import datetime, date
import time
from sqlite3 import register_adapter
import logging
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from config import get_api_key, CHANNEL_IDS, DB_CONFIG, rotate_api_key
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Initialize YouTube API using the rotated API key
youtube = build('youtube', 'v3', developerKey=get_api_key())
def adapt_date(d: date) -> str:
"""Adapter function to store Python date as ISO string in SQLite."""
return d.isoformat()
register_adapter(date, adapt_date)
@dataclass
class ChannelDetails:
channel_id: str
channel_name: str
subscriber_count: int
def safe_execute(request) -> Dict[str, Any]:
"""
Executes a YouTube API request.
Rotates API key and reattempts the request if quota is exceeded.
"""
global youtube
try:
return request.execute()
except HttpError as e:
error_content = e.content.decode('utf-8')
if e.resp.status == 403 and "quotaExceeded" in error_content:
logging.info("Quota exceeded. Rotating API key...")
new_api_key = rotate_api_key()
youtube = build('youtube', 'v3', developerKey=new_api_key)
return request.execute()
else:
logging.error("YouTube API error: %s", e)
raise
def get_channel_details(channel_id: str) -> Optional[ChannelDetails]:
"""
Fetch channel details from YouTube API by channel ID.
Returns ChannelDetails or None on error.
"""
try:
request = youtube.channels().list(
part="snippet,statistics",
id=channel_id
)
response = safe_execute(request)
items = response.get('items', [])
if items:
channel = items[0]
return ChannelDetails(
channel_id=channel_id,
channel_name=channel['snippet']['title'],
subscriber_count=int(channel['statistics']['subscriberCount'])
)
else:
logging.warning("No channel found with id: %s", channel_id)
except Exception as e:
logging.error("Error fetching channel details for id %s: %s", channel_id, e)
return None
def insert_channel_details(channel: ChannelDetails) -> None:
"""
Insert channel details into the SQLite database.
"""
try:
with sqlite3.connect(DB_CONFIG) as conn:
cur = conn.cursor()
cur.execute(
"INSERT INTO channels (channel_id, channel_name, subscriber_count) VALUES (?, ?, ?)",
(channel.channel_id, channel.channel_name, channel.subscriber_count)
)
conn.commit()
logging.info("Inserted channel details for %s", channel.channel_id)
except sqlite3.Error as e:
logging.error("Database error inserting channel details: %s", e)
def get_video_details(video_id: str, channel_id: str) -> Optional[Dict[str, Any]]:
"""
Get detailed video information from YouTube API.
Returns a dictionary with video details or None on error.
Adds a 'commentsEnabled' flag to indicate if comments are available.
"""
try:
request = youtube.videos().list(
part="snippet,statistics",
id=video_id
)
response = safe_execute(request)
if response.get('items'):
video = response['items'][0]
published_at = datetime.strptime(video['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
# Determine if comments are enabled (the API returns 'commentCount' only if enabled)
comments_enabled = 'commentCount' in video['statistics']
return {
'videoId': video_id,
'channelId': channel_id,
'videoTitle': video['snippet']['title'],
'videoAudio': None,
'videoTranscript': None,
'viewCount': int(video['statistics'].get('viewCount', 0)),
'likeCount': int(video['statistics'].get('likeCount', 0)),
'commentCount': int(video['statistics']['commentCount']) if comments_enabled else 0,
'publishedAt': published_at.strftime('%Y-%m-%d %H:%M:%S'),
'collectedDate': datetime.now().date(),
'commentsEnabled': comments_enabled
}
else:
logging.warning("No video details found for video id: %s", video_id)
return None
except Exception as e:
logging.error("Error fetching video details for video id %s: %s", video_id, e)
return None
def get_video_comments(video_id: str) -> List[Dict[str, Any]]:
"""
Fetch video comments (and their replies) from YouTube API.
Returns a list of comment dictionaries.
If comments are disabled, logs a concise message and returns an empty list.
"""
comments = []
next_page_token = None
current_date = datetime.now().date()
try:
while True:
try:
request = youtube.commentThreads().list(
part="snippet,replies",
videoId=video_id,
maxResults=100,
pageToken=next_page_token
)
response = safe_execute(request)
except HttpError as e:
# Decode error content once rather than logging the full response.
error_message = e.content.decode("utf-8") if e.content else ""
if e.resp.status == 403 and "commentsDisabled" in error_message:
logging.info("Comments are disabled for video %s. Skipping.", video_id)
return []
else:
logging.error("Error fetching comments for video %s. Skipping.", video_id)
return []
for item in response.get("items", []):
top_comment = item["snippet"]["topLevelComment"]
comment_id = top_comment["id"]
comment_snippet = top_comment["snippet"]
comment_data = {
"commentId": comment_id,
"videoId": video_id,
"parentCommentId": None,
"userId": comment_snippet["authorChannelId"]["value"],
"userName": comment_snippet["authorDisplayName"],
"content": comment_snippet["textDisplay"],
"likeCount": comment_snippet["likeCount"],
"publishedAt": datetime.strptime(comment_snippet["publishedAt"], '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d %H:%M:%S'),
"collectedDate": current_date
}
comments.append(comment_data)
# Process replies if any
for reply in item.get("replies", {}).get("comments", []):
reply_snippet = reply["snippet"]
comments.append({
"commentId": reply["id"],
"videoId": video_id,
"parentCommentId": comment_id,
"userId": reply_snippet["authorChannelId"]["value"],
"userName": reply_snippet["authorDisplayName"],
"content": reply_snippet["textDisplay"],
"likeCount": reply_snippet["likeCount"],
"publishedAt": datetime.strptime(reply_snippet["publishedAt"], '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d %H:%M:%S'),
"collectedDate": current_date
})
next_page_token = response.get("nextPageToken")
if not next_page_token:
break
except Exception:
logging.error("Unexpected error fetching comments for video %s. Skipping.", video_id)
return []
return comments
def get_channel_videos(channel_id: str) -> List[str]:
"""
Get list of all video IDs for a given channel.
"""
video_ids = []
next_page_token = None
try:
while True:
request = youtube.search().list(
part="snippet",
channelId=channel_id,
maxResults=50,
pageToken=next_page_token,
type="video"
)
response = safe_execute(request)
video_ids.extend([
item['id']['videoId']
for item in response.get('items', [])
if item['id'].get('kind') == "youtube#video"
])
next_page_token = response.get('nextPageToken')
if not next_page_token:
break
# Retrieve and log channel name along with video count
channel_response = youtube.channels().list(
part="snippet",
id=channel_id
).execute()
if channel_response.get("items"):
channel_name = channel_response["items"][0]["snippet"]["title"]
logging.info("Found %d videos for channel '%s' (ID: %s)", len(video_ids), channel_name, channel_id)
else:
logging.info("Found %d videos for channel (ID: %s)", len(video_ids), channel_id)
except Exception as e:
logging.error("Error fetching channel videos for channel %s: %s", channel_id, e)
return video_ids
def save_to_database(conn: sqlite3.Connection, cursor: sqlite3.Cursor,
channel_data: Dict[str, Any], video_data: Dict[str, Any],
comments: List[Dict[str, Any]]) -> bool:
"""
Save video and comment data to the SQLite database.
"""
try:
video_collected_date = video_data['collectedDate'].isoformat()
# Update Videos table
cursor.execute("""
INSERT INTO Videos (
videoId, channelId, videoTitle, videoAudio, videoTranscript,
viewCount, likeCount, commentCount, publishedAt, collectedDate
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(videoId) DO UPDATE SET
videoTitle = excluded.videoTitle,
viewCount = excluded.viewCount,
likeCount = excluded.likeCount,
commentCount = excluded.commentCount,
collectedDate = excluded.collectedDate
""", (
video_data['videoId'],
video_data['channelId'],
video_data['videoTitle'],
video_data['videoAudio'],
video_data['videoTranscript'],
video_data['viewCount'],
video_data['likeCount'],
video_data['commentCount'],
video_data['publishedAt'],
video_collected_date
))
# Insert Comments and Replies
for comment in comments:
comment_collected_date = comment['collectedDate'].isoformat()
cursor.execute("""
INSERT INTO Comments (
commentId, videoId, parentCommentId, userId,
userName, content, likeCount, publishedAt, collectedDate
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(commentId) DO UPDATE SET
content = excluded.content,
likeCount = excluded.likeCount,
collectedDate = excluded.collectedDate
""", (
comment['commentId'],
comment['videoId'],
comment['parentCommentId'],
comment['userId'],
comment['userName'],
comment['content'],
comment['likeCount'],
comment['publishedAt'],
comment_collected_date
))
conn.commit()
return True
except Exception as e:
logging.error("Database error while saving video %s: %s", video_data.get('videoId'), e)
conn.rollback()
return False
def video_exists_in_database(cursor: sqlite3.Cursor, video_id: str) -> bool:
"""
Check if a video exists in the database.
"""
try:
cursor.execute("SELECT COUNT(*) FROM Videos WHERE videoId = ?", (video_id,))
count = cursor.fetchone()[0]
return count > 0
except Exception as e:
logging.error("Error checking existence of video %s: %s", video_id, e)
return False
def main():
with sqlite3.connect(DB_CONFIG) as conn:
cursor = conn.cursor()
try:
for channel_id in CHANNEL_IDS:
details = get_channel_details(channel_id)
if not details:
logging.warning("Skipping channel %s due to missing details.", channel_id)
continue
video_ids = get_channel_videos(channel_id)
channel_data = {
'channelId': details.channel_id,
'channelName': details.channel_name,
'dayCollected': datetime.now().date(),
'numberOfSubscribers': details.subscriber_count,
'numberOfVideos': len(video_ids)
}
total_videos = len(video_ids)
for i, video_id in enumerate(video_ids, start=1):
video_data = get_video_details(video_id, channel_id)
if not video_data:
continue
if video_exists_in_database(cursor, video_data['videoId']):
logging.info("Video %s already exists in the database. Skipping...", video_data['videoId'])
continue
# Check if comments are enabled for the video.
if not video_data.get('commentsEnabled'):
logging.info("Comments are disabled for video %s. Skipping fetching comments.", video_id)
comments = []
else:
comments = get_video_comments(video_id)
if save_to_database(conn, cursor, channel_data, video_data, comments):
logging.info("(%d/%d) Saved data for video %s (%d comments/replies)",
i, total_videos, video_id, len(comments))
else:
logging.error("Failed to save data for video %s", video_id)
time.sleep(1) # Respect API quota
remaining_videos = total_videos - i
logging.info("Processing complete for channel %s. %d videos remaining (if any further processing is needed).",
channel_id, remaining_videos)
finally:
cursor.close()
if __name__ == "__main__":
main()