Skip to content

Commit

Permalink
Fix BUG
Browse files Browse the repository at this point in the history
HanZhuoii committed Nov 19, 2020
1 parent 5d6b26f commit 7811bd1
Showing 2 changed files with 40 additions and 30 deletions.
66 changes: 37 additions & 29 deletions frame/SpiderFrame.py
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
v0.3 加入Redis支持,UrlManager使用Redis运行大型项目可以断点续爬,DataSaver使用Redis解决硬盘I/O低影响爬虫速度
"""

from ping3 import ping
from redis import Redis
from threading import Thread
from pandas import DataFrame
@@ -101,9 +102,15 @@ class Proxies(Thread):

def __init__(self):
super().__init__()
self.__main_thread__ = False # 主代理线程运行
self.__thread__flag = True # 线程运行标志
self.__proxies__ = ''
try:
self.ProxiesThread = int(redis.get("ProxiesThread").decode("utf-8"))+1
except:
self.ProxiesThread = 0
finally:
redis.set("ProxiesThread", self.ProxiesThread)
self.main_thread = False # 主代理线程运行
self.thread_flag = True # 线程运行标志
self.temp = ''
if not config.USE_PROXIES:
self.live_time = 1905603107
else:
@@ -117,7 +124,7 @@ def __init__(self):
# 结束线程
def __exit__(self):
logger.info("Exit Proxies with code 0")
self.__thread__flag = False
self.thread_flag = False

# 如果代理失效,通知进程主动更新代理
@staticmethod
@@ -132,13 +139,18 @@ def get_proxies(self):
res = requests.get(self.get_proxies_api)
j = eval(res.text.replace("true", "True").replace("false", "False").replace("null", "'null'"))
if j['code'] == 0:
redis.set("Proxies_{0}".format(config.THREAD_ID), j['data'][0]['ip'] + ":" + str(j['data'][0]['port']))
redis.set("ProxiesUpdated_{0}".format(config.THREAD_ID), time.time())
self.live_time = int(
time.mktime(time.strptime(j["data"][0]["expire_time"], "%Y-%m-%d %H:%M:%S"))) - time.time()
logger.info("Successfully get proxies")
return
if j['code'] == 121:
_ping = int(ping(j['data'][0]['ip'])*1000)
if _ping < 120:
logger.info("_ping is {0}s".format(_ping))
redis.set("Proxies_{0}".format(config.THREAD_ID), j['data'][0]['ip'] + ":" + str(j['data'][0]['port']))
redis.set("ProxiesUpdated_{0}".format(config.THREAD_ID), time.time())
self.live_time = int(
time.mktime(time.strptime(j["data"][0]["expire_time"], "%Y-%m-%d %H:%M:%S"))) - time.time()
logger.warning("Successfully get proxies: {0}".format(j['data'][0]['ip']+":"+str(j['data'][0]['port'])))
return
else:
logger.warning("_ping is {0}s, response time too long".format(_ping))
elif j['code'] == 121:
raise exception.ProxiesPoolNull
logger.warning("Failed, " + str(i + 1) + " times get proxies...")
time.sleep(randrange(0, 2))
@@ -147,10 +159,11 @@ def get_proxies(self):

def update_self_proxies(self):
temp = redis.get("Proxies_{0}".format(config.THREAD_ID)).decode("utf-8")
if self.__proxies__ != temp:
if self.temp != temp:
logger.warning("Thread {0}: Update self proxies {1} to {2}".format(self.ProxiesThread, self.temp, temp))
self.Proxies['http'] = "http://" + temp
self.Proxies['https'] = "http://" + temp
self.__proxies__ = temp
self.temp = temp

# 监测代理时间。如果超时更新代理,同一时间只允许存在一个代理监控进程,其余只负责更新,读取已经存在的代理
def run(self) -> None:
@@ -159,31 +172,31 @@ def run(self) -> None:
time.sleep(5)

start_time = time.time()
logger.info("------------ Run as following proxies thread ------------")
while self.__thread__flag:
logger.warning("------------ Proxies thread {0} run as following ------------".format(self.ProxiesThread))
while self.thread_flag:

if redis.get("ProxiesThreadCode_{0}".format(config.THREAD_ID)) is None:
redis.set("ProxiesThreadCode_{0}".format(config.THREAD_ID), "2") # 抢占代理主线
self.__main_thread__ = True # 以主线运行标志
logger.info("------------ Switch to main proxies thread ------------")
self.main_thread = True # 以主线运行标志
logger.warning("------------ Proxies thread {0} switch to main ------------".format(self.ProxiesThread))

if self.__main_thread__ and (time.time() - start_time > self.live_time or redis.get("ProxiesThreadCode_{0}".format(config.THREAD_ID)).decode("utf-8") == "2"):
logger.warning("Proxies failure, get new one")
if self.main_thread and (time.time() - start_time > self.live_time or redis.get("ProxiesThreadCode_{0}".format(config.THREAD_ID)).decode("utf-8") == "2"):
logger.warning("Thread: {0}, Proxies failure, get new one".format(self.ProxiesThread))
# 重设代理使用时长
start_time = time.time()
self.get_proxies()
self.update_self_proxies()
redis.set("ProxiesThreadCode_{0}".format(config.THREAD_ID), "1")
elif not self.__main_thread__:
elif not self.main_thread:
self.update_self_proxies()

time.sleep(1)

if self.__main_thread__:
if self.main_thread:
redis.delete("ProxiesThreadCode_{0}".format(config.THREAD_ID))
logger.warning("--------- Main proxies thread ---------")
logger.warning("--------- Thread {0}: Main proxies thread exit ---------".format(self.ProxiesThread))
else:
logger.warning("--------- Following proxies thread ---------")
logger.warning("--------- Thread {0}: Following proxies thread exit ---------".format(self.ProxiesThread))


class UrlManager(object):
@@ -289,7 +302,7 @@ def download(self, url: str, params=None) -> str:
except requests.exceptions.ProxyError:

self.proxies.need_update()
logger.error("Cannot connect to proxy.', timeout('timed out')")
logger.error("Cannot connect to proxy: {0}', timeout".format(self.proxies.Proxies))
except Exception:
logger.error("Undefined Error [{0}]".format(url), exc_info=True)
time.sleep(5)
@@ -390,11 +403,6 @@ def __init__(self, db_name='', set_name='', use_auto_increase_index=False, use_r
:func run: 采用run同步Redis与Mongo数据
"""

logger.info(
"Init DataSaver, db_name={0}, set_name={1}, use_auto_increase_index={2}, use_redis={3}".format(db_name,
set_name,
use_auto_increase_index,
use_redis))
super().__init__()
import pymongo
mg_client = pymongo.MongoClient(config.MONGO_CONNECTION)
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -2,4 +2,6 @@ redis==3.5.3
pymongo==3.11.0
requests==2.22.0
pandas==1.0.1
beautifulsoup4==4.9.3
beautifulsoup4~=4.8.2

ping3~=2.6.6

0 comments on commit 7811bd1

Please sign in to comment.