-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter.py
129 lines (105 loc) · 4.82 KB
/
twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import logging
import cgi
import re
import webapp2
import common
from oauthclient import TwitterClient, escape
from google.appengine.api import app_identity, memcache, urlfetch
# Twitter OAuth Consumer Key.
CONSUMER_KEY = 'XXXXXXXXXXXXXXXXXXXXX'
CONSUMER_SECRET = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
# bit.ly API Key.
BITLY_USERNAME = 'XXXXX'
BITLY_APIKEY = 'R_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
# User-map's hostname.
#USER_MAP_HOSTNAME = app_identity.get_default_version_hostname()
USER_MAP_HOSTNAME = 'imakoko-iphone.appspot.com'
TWITTER_POST_URL = 'https://api.twitter.com/1.1/statuses/update.json'
oauth_client = TwitterClient(CONSUMER_KEY, CONSUMER_SECRET)
shorten_url_pattern = re.compile(re.escape('http://' + USER_MAP_HOSTNAME + '/m/') + '[\w!%\'()*.~-]*(#\w+)?')
def shorten_url(matchobj):
url = matchobj.group()
result = urlfetch.fetch('http://api.bitly.com/v3/shorten?login=' + BITLY_USERNAME + '&apiKey=' + BITLY_APIKEY + '&format=txt&domain=j.mp&longUrl=' + escape(url))
if result.status_code != 200:
logging.error('Cannot shorten URL. (%d %s)\n%s', result.status_code, result.content, url)
raise Exception
return result.content.rstrip()
class TwitterOAuthPage(common.BasePage):
def get(self):
account = self.get_account()
if account:
sid_str = self.get_sid_str()
else:
sid_str = self.create_temporary_sid()
callback_url = self.request.relative_url('callback')
reqtoken_result = oauth_client.obtain_request_token(callback_url)
memcache.set('TWITTER_OAUTH_' + sid_str, reqtoken_result, 3600)
self.redirect(reqtoken_result['authorization_url'])
class TwitterCallbackPage(common.BasePage):
def get(self):
sid_str = self.get_sid_str()
reqtoken_result = memcache.get('TWITTER_OAUTH_' + sid_str)
if not reqtoken_result:
logging.warning('No reqtoken_result.')
self.show_error_page()
return
memcache.delete('TWITTER_OAUTH_' + sid_str)
account = self.get_account()
if not account:
account = common.Account()
if reqtoken_result['oauth_token'] == self.request.get('oauth_token'):
result = oauth_client.obtain_access_token(reqtoken_result['oauth_token'], reqtoken_result['oauth_token_secret'], self.request.get('oauth_verifier'))
account.twitter_user = result['screen_name']
account.twitter_token = result['oauth_token']
account.twitter_secret = result['oauth_token_secret']
self.put_with_new_sid(account)
logging.info('(%d) Join: twitter="%s"', account.key().id(), account.twitter_user)
self.redirect('/settings.html')
elif reqtoken_result['oauth_token'] == self.request.get('denied'):
if account.twitter_user:
logging.info('(%d) Leave: twitter="%s"', account.key().id(), account.twitter_user)
account.twitter_user = None
account.twitter_token = None
account.twitter_secret = None
account.put()
self.redirect('/settings.html')
else:
logging.error('Unexpected request.')
self.show_error_page()
class TwitProxyPage(common.BasePage):
def post(self):
account = self.get_account()
if not account or not account.twitter_user:
logging.debug('Not Logged in.')
self.error(400)
return
if account.session_token != self.request.headers.get('X-Imakoko-Token'):
logging.warning('(%d) session_token mismatch.', account.key().id())
self.error(500)
return
logging.debug('(%d) Twit: %s', account.key().id(), account.twitter_user)
data = self.request.body
try:
parsed = cgi.parse_qs(data)
parsed['status'][0] = shorten_url_pattern.sub(shorten_url, parsed['status'][0], 1)
data = '&'.join(escape(k) + '=' + escape(parsed[k][0]) for k in parsed)
except:
pass
headers = oauth_client.build_oauth_header('POST', TWITTER_POST_URL, data, account.twitter_token, account.twitter_secret)
result = urlfetch.fetch(TWITTER_POST_URL, data, urlfetch.POST, headers)
status_code = result.status_code
if status_code == 401:
logging.info('(%d) Leave: twitter="%s"', account.key().id(), account.twitter_user)
account.twitter_user = None
account.twitter_token = None
account.twitter_secret = None
account.put()
self.error(400)
return
self.response.status_int = status_code
app = webapp2.WSGIApplication(
[('/twitter/oauth', TwitterOAuthPage),
('/twitter/callback', TwitterCallbackPage),
('/twitter/twit', TwitProxyPage)])