-
Notifications
You must be signed in to change notification settings - Fork 0
/
proxy.py
executable file
·147 lines (112 loc) · 4.79 KB
/
proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import webbrowser
from urlparse import urljoin, urlsplit
from bs4 import BeautifulSoup
from flask import Flask, Response
from flask_script import Command, Manager, Option, Server
import requests
from w3lib.html import replace_entities
def is_visible(element):
if element.parent.name in ['style', 'script', '[document]', 'head',
'title']:
return False
elif element.__class__.__name__ == 'Comment':
return False
return True
class CustomDummyCache():
"""
there is another interesting approach though:
you can ask Cache object directly for a url content,
and if we don't have it in cache - it is the cache who will make a request
to fetch it, but I find it a bit perverted from control reversion and
incapsulation point of view
also, usually you calculate an e.g. md5 hashes from your urls and use it as
cache keys
yeeeeah and Redis and expiring mechanism that makes all of it super fun and
buggy
"""
def __init__(self):
self.storage = {}
def is_cached(self, url):
return url in self.storage
def get(self, url):
print('using cache!')
return self.storage.get(url)
def store(self, url, content_type, data):
self.storage[url] = {'content_type': content_type, 'data': data}
class CustomServer(Server):
def __init__(self, host, port, site, use_cache, with_reloader):
self.host, self.port, self.site, self.use_cache, self.with_reloader = \
host, port, site, use_cache, with_reloader
super(CustomServer, self).__init__(self.host, self.port,
use_reloader=with_reloader)
def __call__(self, app):
server_args = {'processes': 1, 'threaded': False, 'use_debugger': True,
'use_reloader': self.with_reloader, 'host': self.host,
'passthrough_errors': False, 'port': self.port}
webbrowser.open('http://%s:%s/' % (self.host, self.port))
app.host, app.port, app.site, app.use_cache = self.host, self.port, \
self.site, self.use_cache
if self.use_cache:
app.cache = CustomDummyCache()
return Server.__call__(self, app, **server_args)
class ArgumentsParser(Command):
option_list = (
Option('--host', '-h', dest='host', default='127.0.0.1'),
Option('--port', '-p', dest='port', default=5000, type=int),
Option('--site', '-s', dest='site', default='habrahabr.ru'),
Option('--cache', '-c', dest='use_cache', default=False,
action='store_true'),
Option('--reloader', '-r', dest='with_reloader', default=False,
action='store_true'),
)
def run(self, host, port, site, use_cache, with_reloader):
if not urlsplit(site).scheme:
site = 'http://' + site
CustomServer(host, port, site, use_cache, with_reloader)(app)
what_to_add = u"\u2122"
app = Flask(__name__)
manager = Manager(app)
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def index(path):
url = urljoin(app.site, path)
regexp = re.compile('(^|\s)([^\W\d]{6})($|\s)', re.UNICODE)
if app.use_cache and app.cache.is_cached(url):
cached = app.cache.get(url)
return Response(cached['data'], mimetype=cached['content_type'])
resp = requests.get(url)
if resp.headers.get('Content-Type') and \
'text/html' not in resp.headers.get('Content-Type'):
if app.use_cache:
app.cache.store(url, resp.headers.get('Content-Type'),
resp.content)
return Response(resp.content,
mimetype=resp.headers.get('Content-Type'))
soup = BeautifulSoup(resp.text, "html.parser")
strings = soup.findAll(string=regexp)
visible_strings = filter(is_visible, strings)
for string in visible_strings:
new_string = re.sub(regexp, '\g<1>\g<2>%s\g<3>' % what_to_add, string)
string.replace_with(new_string)
site_domain = urlsplit(app.site).netloc
proxy_domain = 'http://%s:%s/' % (app.host, app.port)
for link in soup.find_all('a'):
if not link.get('href'):
continue
if not urlsplit(link['href']).scheme:
link['href'] = urljoin(proxy_domain, link['href'])
elif site_domain == urlsplit(link['href']).netloc:
url_parts = urlsplit(link['href'])
uri = url_parts.path + \
('?' + url_parts.query if url_parts.query else '')
link['href'] = urljoin(proxy_domain, uri)
content = replace_entities(str(soup))
if app.use_cache:
app.cache.store(url, 'text/html', content)
return content
manager.add_command('runserver', ArgumentsParser())
if __name__ == '__main__':
manager.run()