-
Notifications
You must be signed in to change notification settings - Fork 183
/
Copy pathstackoverflow.py
136 lines (119 loc) Β· 5.99 KB
/
stackoverflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional
import requests
from data_source.api.base_data_source import BaseDataSource, ConfigField, HTMLInputType, BaseDataSourceConfig
from data_source.api.basic_document import DocumentType, BasicDocument
from queues.index_queue import IndexQueue
from data_source.api.utils import rate_limit
logger = logging.getLogger(__name__)
@dataclass
class StackOverflowPost:
link: str
score: int
last_activity_date: int
creation_date: int
post_id: Optional[int] = None
post_type: Optional[str] = None
body_markdown: Optional[str] = None
owner_account_id: Optional[int] = None
owner_reputation: Optional[int] = None
owner_user_id: Optional[int] = None
owner_user_type: Optional[str] = None
owner_profile_image: Optional[str] = None
owner_display_name: Optional[str] = None
owner_link: Optional[str] = None
title: Optional[str] = None
last_edit_date: Optional[str] = None
tags: Optional[List[str]] = None
view_count: Optional[int] = None
article_id: Optional[int] = None
article_type: Optional[str] = None
class StackOverflowConfig(BaseDataSourceConfig):
api_key: str
team_name: str
@rate_limit(allowed_per_second=15)
def rate_limited_get(url, headers):
'''
https://api.stackoverflowteams.com/docs/throttle
https://api.stackexchange.com/docs/throttle
Every application is subject to an IP based concurrent request throttle.
If a single IP is making more than 30 requests a second, new requests will be dropped.
The exact ban period is subject to change, but will be on the order of 30 seconds to a few minutes typically.
Note that exactly what response an application gets (in terms of HTTP code, text, and so on)
is undefined when subject to this ban; we consider > 30 request/sec per IP to be very abusive and thus cut the requests off very harshly.
'''
resp = requests.get(url, headers=headers)
if resp.status_code == 429:
logger.warning('Rate limited, sleeping for 5 minutes')
time.sleep(300)
return rate_limited_get(url, headers)
return resp
class StackOverflowDataSource(BaseDataSource):
@staticmethod
def get_config_fields() -> List[ConfigField]:
return [
ConfigField(label="PAT API Key", name="api_key", type=HTMLInputType.TEXT),
ConfigField(label="Team Name", name="team_name", type=HTMLInputType.TEXT),
]
@staticmethod
async def validate_config(config: Dict) -> None:
so_config = StackOverflowConfig(**config)
url = f'https://api.stackoverflowteams.com/2.3/questions?&team={so_config.team_name}'
response = rate_limited_get(url, headers={'X-API-Access-Token': so_config.api_key})
response.raise_for_status()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
so_config = StackOverflowConfig(**self._raw_config)
self._api_key = so_config.api_key
self._team_name = so_config.team_name
def _fetch_posts(self, *, api_key: str, team_name: str, page: int, doc_type: str) -> None:
team_fragment = f'&team={team_name}'
# this is a filter for "body markdown" inclusion, all filters are unique and static
# i am not entirely sure if this is per account, or usable by everyone
filter_fragment = '&filter=!nOedRLbqzB'
page_fragment = f'&page={page}'
# it looked like the timestamp was 10 digits, lets only look at stuff that is newer than the last index time
from_date_fragment = f'&fromdate={self._last_index_time.timestamp():.10n}'
url = f'https://api.stackoverflowteams.com/2.3/{doc_type}?{team_fragment}{filter_fragment}{page_fragment}{from_date_fragment}'
response = rate_limited_get(url, headers={'X-API-Access-Token': api_key})
response.raise_for_status()
response = response.json()
has_more = response['has_more']
items = response['items']
logger.info(f'Fetched {len(items)} {doc_type} from Stack Overflow')
for item_dict in items:
owner_fields = {}
if 'owner' in item_dict:
owner_fields = {f"owner_{k}": v for k, v in item_dict.pop('owner').items()}
if 'title' not in item_dict:
item_dict['title'] = item_dict['link']
post = StackOverflowPost(**item_dict, **owner_fields)
last_modified = datetime.fromtimestamp(post.last_edit_date or post.last_activity_date)
if last_modified < self._last_index_time:
return
logger.info(f'Feeding {doc_type} {post.title}')
post_document = BasicDocument(title=post.title, content=post.body_markdown, author=post.owner_display_name,
timestamp=datetime.fromtimestamp(post.creation_date), id=post.post_id,
data_source_id=self._data_source_id, location=post.link,
url=post.link, author_image_url=post.owner_profile_image,
type=DocumentType.MESSAGE)
IndexQueue.get_instance().put_single(doc=post_document)
if has_more:
# paginate onto the queue
self.add_task_to_queue(self._fetch_posts, api_key=self._api_key, team_name=self._team_name, page=page + 1, doc_type=doc_type)
def _feed_new_documents(self) -> None:
self.add_task_to_queue(self._fetch_posts, api_key=self._api_key, team_name=self._team_name, page=1, doc_type='posts')
# TODO: figure out how to get articles
# self.add_task_to_queue(self._fetch_posts, api_key=self._api_key, team_name=self._team_name, page=1, doc_type='articles')
# def test():
# import os
# config = {"api_key": os.environ['SO_API_KEY'], "team_name": os.environ['SO_TEAM_NAME']}
# so = StackOverflowDataSource(config=config, data_source_id=1)
# so._feed_new_documents()
#
#
# if __name__ == '__main__':
# test()