Skip to content

Commit

Permalink
V1.0 全站基础信息实现
Browse files Browse the repository at this point in the history
  • Loading branch information
HanZhuoii committed Nov 15, 2020
1 parent c679356 commit 5328502
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 58 deletions.
4 changes: 4 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# - global -
DB_NAME = "知乎"

USE_REDIS = True # 如果设置为False,后面的*_ADD_*_ID都将失效
REDIS_HOST = "192.168.100.101"
REDIS_PORT = 6379
REDIS_PASSWORD = None

MONGO_CONNECTION = "mongodb://localhost:27017/"
MONGO_DOC_LIMIT = 5000 # 每个文档最多存储5000条data

PROXIES_LIVE_TIME = 60
REQUEST_RETRY_TIMES = 5
SOCKET_DEFAULT_TIMEOUT = 20 # 整个request生命周期
Expand Down
53 changes: 34 additions & 19 deletions frame/SpiderFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,29 @@
v0.3 加入Redis支持,UrlManager使用Redis运行大型项目可以断点续爬,DataSaver使用Redis解决硬盘I/O低影响爬虫速度
"""
import threading
import random

import pandas as pd
import requests
import redis
import socket
import logging
import time

import config
from os import path

redis = redis.Redis(host=config.REDIS_HOST, port=config.REDIS_PORT, password=config.REDIS_PASSWORD)


class exception(Exception):
class exception:
class RequestRetryError(Exception):
def __init__(self):
super().__init__()

def __str__(self):
return "发起过多次失败的Requests请求"

class UserNotExist(Exception):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -84,12 +94,12 @@ def __init__(self):
super().__init__()
# 线程运行标志
self.__thread__flag = True
self.get_proxies_api = "http://api.xdaili.cn/xdaili-api/greatRecharge/getGreatIp?spiderId" \
"=192b9425f13c47ffbbe4a663c974608b&orderno=YZ2020219595449Wzor&returnType=2&count=1 "
self.get_proxies_api = "http://webapi.http.zhimacangku.com/getip?num=1&type=2&pro=0&city=0&yys=0&port=11&pack=125417&ts=1&ys=0&cs=0&lb=1&sb=0&pb=45&mr=2&regions=110000,130000,140000,310000,320000,330000,340000,350000,360000,370000,410000,420000,430000,440000,500000,510000,610000"
self.Proxies = {
"http": "",
"https": ""
}
self.live_time = config.PROXIES_LIVE_TIME

# 结束线程
def __exit__(self):
Expand All @@ -101,23 +111,24 @@ def get_proxies(self):
i = 0
for i in range(config.REQUEST_RETRY_TIMES):
res = requests.get(self.get_proxies_api)
j = eval(res.text)
if j['ERRORCODE'] == '0':
self.Proxies['http'] = "http://" + j['RESULT'][0]['ip'] + ":" + j['RESULT'][0]['port']
self.Proxies['https'] = "http://" + j['RESULT'][0]['ip'] + ":" + j['RESULT'][0]['port']
j = eval(res.text.replace("true", "True").replace("false", "False").replace("null", "'null'"))
if j['code'] == 0:
self.Proxies['http'] = "http://" + j['data'][0]['ip'] + ":" + str(j['data'][0]['port'])
self.Proxies['https'] = "https://" + j['data'][0]['ip'] + ":" + str(j['data'][0]['port'])
self.live_time = int(time.mktime(time.strptime(j["data"][0]["expire_time"], "%Y-%m-%d %H:%M:%S"))) - time.time()
logger.info("Successfully get proxies")
return
logger.warning("Failed, " + str(i + 1) + " times get proxies...")
time.sleep(5)
time.sleep(random.randrange(7, 13))
if i == 4:
logger.critical("Get proxies failed, exit program...")

# 监测代理时间。如果超时更新代理
def run(self) -> None:
start_time = time.time()
self.get_proxies()
while self.__thread__flag:
# 设置代理生存时间为60s
if start_time - time.time() > config.PROXIES_LIVE_TIME:
if time.time() - start_time > self.live_time:
logger.warning("proxies failure, get new one")
# 重设代理使用时长
start_time = time.time()
Expand Down Expand Up @@ -204,7 +215,6 @@ def __init__(self):
def download(self, url: str, params=None) -> str:
if url == "":
raise exception.UrlEmptyException
res = '' # 没啥用,消除警告而已
if params is None:
params = {}
for i in range(config.REQUEST_RETRY_TIMES):
Expand All @@ -213,20 +223,25 @@ def download(self, url: str, params=None) -> str:
timeout=3)
if res.status_code == 200:
return res.text
# 非200,更换代理,抛出异常
self.proxies.get_proxies()
res.raise_for_status()
# 记录异常
except requests.exceptions.HTTPError:
logger.error(u"HTTPError; Code {0}[{1}]".format(str(res.status_code), url))
logger.warning(
"HTTPError with url:<{0}> retrying.....{1},{2}".format(url[:25] + " ... " + url[-15:], i + 1,
config.REQUEST_RETRY_TIMES))
except requests.exceptions.Timeout:
logger.error(url + "; Timeout")
logger.warning(
"Timeout with url:<{0}> retrying.....{1},{2}".format(url[:25] + " ... " + url[-15:], i + 1,
config.REQUEST_RETRY_TIMES))
except requests.exceptions.ProxyError:
self.proxies.get_proxies()
logger.error("Cannot connect to proxy.', timeout('timed out')", exc_info=True)
time.sleep(10)
except Exception:
logger.error("Undefined Error [{0}]".format(url))
# 请求失败,更换代理,重试
self.proxies.get_proxies()
logger.warning("downloading error , retrying.....{0},3".format(i + 1))
logger.error("Undefined Error [{0}]".format(url), exc_info=True)
self.proxies.get_proxies()
logger.critical("requests.exceptions.RetryError [{0}]".format(url), exc_info=True)
time.sleep(10)
raise requests.exceptions.RetryError

def img_download(self, dir_path: str, url: str) -> None:
Expand Down
51 changes: 35 additions & 16 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def run(self):
except Exception as e:
# 之前的报错信息已被记录
logger.critical("Unexpected Exit TopicSpider: {0}, Message: {1}".format(_id, e), exc_info=True)
send_mail("TopicSpider发生意料之外的错误,已退出线程")
# send_mail("TopicSpider发生意料之外的错误,已退出线程")
finally:
logger.warning("TopicSpider finished with exit code 0")
topic.html_downloader.proxies.__exit__()
Expand All @@ -50,14 +50,20 @@ class QuestionSpider(Thread):
def __init__(self):
logger.info("QuestionSpider init...")
super().__init__()

self.flag = True
# url_manager方法已经内置,只需要使用id_manager传入ID参数即可
self.id_manager = SpiderFrame.UrlManager(db_set_name=config.QUESTION_ID_SET, use_redis=config.USE_REDIS)

def __exit__(self):
logger.warning("强制终止线程: QuestionSpider")
self.flag = False

def run(self):
logger.info("QuestionSpider thread start...")
_id = ''
try:
while True:
while self.flag:
if self.id_manager.list_not_null():
_id = self.id_manager.get()
question.spider(_id)
Expand All @@ -66,7 +72,7 @@ def run(self):
except Exception as e:
# 之前的报错信息已被记录
logger.critical("Unexpected Exit QuestionSpider: {0}, Message: {1}".format(_id, e), exc_info=True)
send_mail("QuestionSpider发生意料之外的错误,已退出线程")
# send_mail("QuestionSpider发生意料之外的错误,已退出线程")
finally:
logger.warning("QuestionSpider finished with exit code 0")
question.html_downloader.proxies.__exit__()
Expand All @@ -76,9 +82,15 @@ class CommentSpider(Thread):
def __init__(self):
logger.info("CommentSpider init...")
super().__init__()

self.flag = True
# url_manager方法已经内置,只需要使用id_manager传入ID参数即可
self.id_manager = SpiderFrame.UrlManager(db_set_name=config.ANSWER_ID_SET, use_redis=config.USE_REDIS)

def __exit__(self):
logger.warning("强制终止线程: CommentSpider")
self.flag = False

def run(self):
_id = ''
try:
Expand All @@ -92,7 +104,7 @@ def run(self):
except Exception as e:
# 之前的报错信息已被记录
logger.critical("Unexpected Exit CommentSpider: {0}, Message: {1}".format(_id, e), exc_info=True)
send_mail("CommentSpider发生意料之外的错误,已退出线程")
# send_mail("CommentSpider发生意料之外的错误,已退出线程")
finally:
logger.warning("CommentSpider finished with exit code 0")
comment.html_downloader.proxies.__exit__()
Expand All @@ -102,9 +114,15 @@ class UserSpider(Thread):
def __init__(self):
logger.info("UserSpider init...")
super().__init__()

self.flag = True
# url_manager方法已经内置,只需要使用id_manager传入ID参数即可
self.id_manager = SpiderFrame.UrlManager(db_set_name=config.USER_ID_SET, use_redis=config.USE_REDIS)

def __exit__(self):
logger.warning("强制终止线程: UserSpider")
self.flag = False

def run(self):
logger.info("UserSpider thread start...")
_id = ''
Expand All @@ -118,34 +136,35 @@ def run(self):
except Exception as e:
# 之前的报错信息已被记录
logger.critical("Unexpected Exit UserSpider: {0}, Message: {1}".format(_id, e), exc_info=True)
send_mail("UserSpider发生意料之外的错误,已退出线程")
# send_mail("UserSpider发生意料之外的错误,已退出线程")
finally:
logger.warning("UserSpider finished with exit code 0")
user.html_downloader.proxies.__exit__()



if __name__ == '__main__':
TS = TopicSpider()
QS = QuestionSpider()
CS = CommentSpider()
US = UserSpider()

TS.start()
logger.info("Next thread will be start after 10s")
sleep(10)
logger.info("Next thread will be start after 7.5s")
sleep(7.5)
QS.start()
logger.info("Next thread will be start after 10s")
sleep(10)
logger.info("Next thread will be start after 7.5s")
sleep(7.5)
CS.start()
logger.info("Next thread will be start after 10s")
sleep(10)
logger.info("Next thread will be start after 7.5s")
sleep(7.5)
US.start()

logger.warning("爬虫进程启动完成,启动监控进程")
# watching
TS_i = QS_i = CS_i = US_i = 1
while True:
TS_i = QS_i = CS_i = US_i = 1
if not TS.is_alive() and (redis.llen(config.TOPIC_ID_SET) or redis.llen(config.TOPIC_SET)):
if TS_i != 3 and not TS.is_alive() and (redis.llen("list_"+config.TOPIC_ID_SET) or redis.llen("list_"+config.TOPIC_SET)):
for i in range(1, 4):
if TS.is_alive():
continue
Expand All @@ -156,7 +175,7 @@ def run(self):
if i == 3 and not TS.is_alive():
logger.error("Active thread TS failed")
send_mail("TS is exit and try to activate it failed")
if not QS.is_alive() and (redis.llen(config.QUESTION_ID_SET) or redis.llen(config.QUESTION_SET)):
if QS_i != 3 and not QS.is_alive() and (redis.llen("list_"+config.QUESTION_ID_SET) or redis.llen("list_"+config.QUESTION_SET)):
for i in range(1, 4):
if QS.is_alive():
QS_i = 1
Expand All @@ -168,7 +187,7 @@ def run(self):
if i == 3 and not QS.is_alive():
logger.error("----- Active thread QS failed -----")
send_mail("QS is exit and try to activate it failed")
if not CS.is_alive() and (redis.llen(config.ANSWER_ID_SET) or redis.llen(config.COMMENT_SET)):
if CS_i != 3 and not CS.is_alive() and (redis.llen("list_"+config.ANSWER_ID_SET) or redis.llen("list_"+config.COMMENT_SET)):
for i in range(1, 4):
if CS.is_alive():
CS_i = 1
Expand All @@ -180,7 +199,7 @@ def run(self):
if i == 3 and not CS.is_alive():
logger.error("----- Active thread CS failed -----")
send_mail("CS is exit and try to activate it failed")
if not US.is_alive() and redis.llen(config.USER_ID_SET):
if US_i != 3 and not US.is_alive() and redis.llen("list_"+config.USER_ID_SET):
for i in range(1, 4):
if US.is_alive():
US_i = 1
Expand Down
4 changes: 2 additions & 2 deletions tools/KeyWordsSearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ def search(keyword):
if not data_saver.mg_data_db.find_one({"KeyWord": KWD}):
data_saver.mongo_insert({
"KeyWord": KWD,
"result": []
"data": []
})

while True:
res = json.loads(res)
logger.info("Saving Data To MongoDB")
for data in res['data']:
data_saver.mg_data_db.update_one({"KeyWord": KWD}, {'$addToSet': {"result": data}})
data_saver.mg_data_db.update_one({"KeyWord": KWD}, {'$addToSet': {"data": data}})
html_parser.parse(data)
if res['paging']['is_end']:
logger.info("Paging is end, exit")
Expand Down
32 changes: 28 additions & 4 deletions utils/comment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

def spider(answer_id: str) -> None:
# 增量爬取评论
offset = config.MONGO_DOC_LIMIT
logger.info("Get comments for answer id: {0}".format(answer_id))
url = "https://www.zhihu.com/api/v4/answers/{}/root_comments?limit=10&offset=0&order=normal&status=open" \
.format(answer_id)
Expand All @@ -33,26 +34,49 @@ def spider(answer_id: str) -> None:
data_saver.mongo_insert({
"AnswerID": answer_id,
"common_counts": res['paging']['totals'],
"result": []
"limit": config.MONGO_DOC_LIMIT,
"offset": offset,
"end_url": "",
"data": []
})

try:
while url_manager.list_not_null():
sleep(.3)
res = html_downloader.download(url_manager.get())
url = url_manager.get()
try:
res = html_downloader.download(url)
except SpiderFrame.exception.RequestRetryError as e:
logger.error(e, exc_info=True)
url_manager.add_url(url)
sleep(1)
continue
res = json_lds(res)
for data in res['data']:
if data_saver.mg_data_db.find_one({"result.id": data["id"]}):
if len(data_saver.mg_data_db.find_one({"AnswerID": answer_id, "offset": offset})["data"]) >= 5000:
logger.warning("MongoDB document out of limit, Create new document and update offset")
offset += config.MONGO_DOC_LIMIT
data_saver.mongo_insert({
"AnswerID": answer_id,
"common_counts": res['paging']['totals'],
"limit": config.MONGO_DOC_LIMIT,
"offset": offset,
"end_url": "",
"data": []
})
if data_saver.mg_data_db.find_one({"data.id": data["id"]}):
# 已经存在的,不存储
continue
data_saver.mg_data_db.update_one({"AnswerID": answer_id}, {'$addToSet': {"result": data}})
data_saver.mg_data_db.update_one({"AnswerID": answer_id, "offset": offset}, {'$addToSet': {"data": data}})
try:
if data["author"]["url_token"] is not "":
url_manager.add_id(id_set=config.USER_ID_SET, _id=data["author"]["member"]["url_token"])
except:
pass

if res['paging']['is_end']:
logger.info("Paging is end, exit")
data_saver.mg_data_db.update_one({"AnswerID": answer_id, "offset": offset}, {"$set": {"end_url": url}})
break
url = res['paging']['next']
url_manager.add_url(url)
Expand Down
Loading

0 comments on commit 5328502

Please sign in to comment.