-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathselenium_sign.py
350 lines (314 loc) · 14 KB
/
selenium_sign.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
import os
import sys
import traceback
import time
import json
import datetime
import argparse
from selenium.webdriver.common.by import By
# from selenium.webdriver.support.wait import WebDriverWait
from init import firefox_profile
from init import myLogger
from init import config_init
from helper import module_importer
RESULT_VERSION = 1.0
def printList(sign_list: list, is_detail: bool):
if not sign_list:
logger.info("空")
else:
for sign in sign_list:
if is_detail:
if sign.result:
if sign.result.get('access_result_info'):
logger.info(f"{sign.indexUrl}: {sign.result.get('access_result_info')}")
elif sign.result.get('sign_result_info'):
logger.info(f"{sign.indexUrl}: {sign.result.get('sign_result_info')}")
else:
logger.info(f"{sign.indexUrl}: no result")
else:
logger.info(f"{sign.indexUrl}")
def show_extra_info(sign_list: list):
logger.info("start print extra info")
if not sign_list:
logger.info("空")
return
for sign in sign_list:
if sign.result:
if sign.result.get('extra_info') or sign.result.get('new_message'):
logger.info(f"{sign.indexUrl}; extra info: {sign.result.get('extra_info')}; new message: {sign.result.get('new_message')}")
def get_sign_list(site_name: str):
# 获取目录下.py文件的文件名
target_directory = 'target'
data = module_importer.load_target_json(target_directory, 'sign_site.json')
if site_name != 'all':
sign_list = module_importer.import_modules(all = False, dir = target_directory, sites = site_name)
return sign_list
else:
if 'all' in data and data['all'] != True and "module_list" in data:
sign_list = module_importer.import_modules(all = False, dir = target_directory, sites = data['module_list'])
else:
logger.info("import所有模块")
sign_list = module_importer.import_modules(all = True, dir = target_directory, sites = [])
return sign_list
def get_and_remove_ignore_list(sign_list: list, force) -> list:
ignore_list = []
data = []
if not force:
try:
with open("log/result_data.json", "r", encoding='utf-8') as f:
result_data = json.load(f)
if isinstance(result_data, dict) and result_data.get('version') == RESULT_VERSION:
data = result_data['result']
else:
logger.info(f"reult版本已过时,丢弃")
except:
data = []
for sign in sign_list[:]:
logger.info(f"检查{sign.module_name}")
need_sign = True
for last in data:
if last == None or sign.module_name != last['module_name']:
continue
last_timestamp = last['timestamp']
last_sign_time = datetime.datetime.fromtimestamp(last_timestamp)
current_datetime = datetime.datetime.now()
if last_sign_time.day == current_datetime.day:
if last['sign_result'] == True:
need_sign = False
break
if need_sign == False:
logger.info(f"{sign.module_name}今天已经签到成功了,无需再次签到")
ignore_list.append(sign)
sign_list.remove(sign)
return ignore_list
def do_sign(sign_list: list, driver) -> list:
succeed_list = []
fail_list = []
for sign in sign_list[:]:
logger.info(f"开始{sign.indexUrl}, module name = {sign.module_name}")
try:
if not sign.get_driver():
sign.set_driver(driver)
if hasattr(sign, 'accessIndex') and callable(getattr(sign, 'accessIndex')):
sign.accessIndex()
if hasattr(sign, 'valid_access') and callable(getattr(sign, 'valid_access')):
if not sign.valid_access():
if hasattr(sign, 'collect_info') and callable(getattr(sign, 'collect_info')):
logger.info(sign.collect_info())
fail_list.append(sign)
sign.exit()
sign_list.remove(sign)
continue
if hasattr(sign, 'sign') and callable(getattr(sign, 'sign')):
sign.sign()
if hasattr(sign, 'msgCheck') and callable(getattr(sign, 'msgCheck')):
sign.msgCheck()
if hasattr(sign, 'validSign') and callable(getattr(sign, 'validSign')):
if sign.validSign():
succeed_list.append(sign)
else:
fail_list.append(sign)
# driver.save_screenshot('log/' + sign.module_name + '_snapshot.png')
# with open('log/' + sign.module_name + '_page.html', 'w', encoding='utf-8') as file:
# file.write(driver.page_source)
else:
fail_list.append(sign)
if hasattr(sign, 'collect_info') and callable(getattr(sign, 'collect_info')):
logger.info(sign.collect_info())
except Exception as e:
logger.error(f"something error: {e}")
logger.warning(traceback.format_exc())
fail_list.append(sign)
sign.exit()
sign_list.remove(sign)
return [succeed_list, fail_list]
def get_logger() -> list:
config_data = config_init.get_config_for_sign()
log_path = config_data['log_path']
if not os.path.exists(log_path):
os.makedirs(log_path)
sign_log_path = os.path.join(log_path, 'sign')
logger = myLogger.myLogger('sign', sign_log_path, False).getLogger()
firefox_profile.setLogger(logger)
module_importer.setLogger(logger)
return logger
def get_web_driver() -> list:
config_data = config_init.get_config_for_sign()
log_path = config_data['log_path']
if not os.path.exists(log_path):
os.makedirs(log_path)
browser = config_data['browser']
if browser == 'firefox':
geckodriver_log_path = os.path.join(log_path, 'geckodriver.log')
driver = firefox_profile.create_firefox_with_user_profile(geckodriver_log_path)
elif browser == 'chrome':
logger.error(f"当前不支持")
sys.exit(-1)
return driver
def resign(fail_list, driver) -> list:
logger.info(f"失败{len(fail_list)}。尝试再次签到失败网站")
ss, fail_list = do_sign(fail_list, driver)
return [ss, fail_list]
def rewrite_result(sign_list: list):
new_data = {'version': RESULT_VERSION, 'result': []}
data = []
# load旧数据
try:
with open("log/result_data.json", "r", encoding='utf-8') as f:
# 将文件内容转换为 JSON 对象列表
result_data = json.load(f)
if isinstance(result_data, dict) and result_data.get('version') == RESULT_VERSION:
data = result_data['result']
else:
logger.info(f"reult版本已过时,丢弃")
for i in range(len(data)):
if data[i] == None:
continue
last_timestamp = data[i]['timestamp']
last_sign_time = datetime.datetime.fromtimestamp(last_timestamp)
current_datetime = datetime.datetime.now()
if last_sign_time.day != current_datetime.day:
continue # 旧数据已过时
if data[i]['sign_result'] == False:
continue # 无效旧数据
new_data['result'].append(data[i])
except Exception as e:
logger.warning(f"打开结果记录异常:{e}")
logger.warning(f"错误堆栈信息:")
logger.warning(traceback.format_exc())
# 追加新数据
logger.info(f"尝试写入{len(sign_list)}个打卡数据 ")
for sign in sign_list:
# 判断sign.result是否在new_data['result']中已存在,已存在则修改
for item in new_data['result']:
if isinstance(item, dict) and item.get("module_name") == sign.module_name:
# 如果找到匹配项,则修改数据,并跳出for else循环
if not hasattr(sign, 'result'):
item.update(None)
else:
item.update(sign.result)
break
else:
logger.info(f"没找到旧数据或旧数据已过时,尝试追加{sign.site_name}新数据")
if hasattr(sign, 'result'):
new_data['result'].append(sign.result)
else:
t = time.time()
result = {
"module_name": sign.module_name,
"site_name": sign.site_name,
"timestamp": int(t),
"timestring": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t)),
}
new_data['result'].append(result)
logger.info(f"决定写入{len(new_data['result'])}个打卡数据 ")
# logger.info(new_data)
with open("log/result_data.json", "w", encoding='utf-8') as f:
# 将 JSON 对象列表写入文件
json.dump(new_data, f, ensure_ascii=False, indent=4)
def not_retry(sign):
if sign.result:
if sign.result.get('access_result_info'):
if "未登录" in sign.result.get('access_result_info'):
logger.info(f"{sign.module_name} access 未登录, not retry")
return True
if "标题异常" in sign.result.get('access_result_info'):
if "502" in sign.result.get('access_result_info') or "504" in sign.result.get('access_result_info'):
return False
# 之后改成"502: Bad gateway", "504: Gateway time-out" 可以 retry
logger.info(f"{sign.module_name} access 标题异常, not retry")
return True
if sign.result.get('sign_result_info'):
if "标题异常" in sign.result.get('sign_result_info'):
if "502" in sign.result.get('sign_result_info') or "504" in sign.result.get('sign_result_info'):
return False
logger.info(f"{sign.module_name} sign 标题异常, not retry")
return True
elif "8点不到,无法签到" in sign.result.get('sign_result_info'):
logger.info(f"{sign.module_name} 8点不到, not retry")
return True
elif "未签到。活跃度不够" in sign.result.get('sign_result_info'):
logger.info(f"{sign.module_name} 活跃度不够, not retry")
return True
return False
def main(force: bool, site_name: str):
'''
WebDriverWait(driver, 10).until(
lambda wd: driver.execute_script("return document.readyState") == 'complete',
"Page taking too long to load"
)
'''
if logger:
sign_list = get_sign_list(site_name)
target_size = len(sign_list);
logger.info(f"有{target_size}个站需要签到")
ignore_list = get_and_remove_ignore_list(sign_list, force)
logger.info(f"有{len(ignore_list)}个站忽略签到")
if len(ignore_list) == target_size:
logger.info(f"没有站需要签到,等待30秒结束") # 防止运行过快
time.sleep(30)
return
driver = get_web_driver()
if driver:
succeed_list, fail_list = do_sign(sign_list, driver)
logger.info(f"签到成功{len(succeed_list)}个站,失败{len(fail_list)}个站")
real_failed_list = []
temp_pass = []
for item in fail_list:
if not_retry(item) == True:
temp_pass.append(item)
else:
real_failed_list.append(item)
fail_list = real_failed_list
succeed_list2 = []
fail_list2 = []
if fail_list:
time.sleep(5)
succeed_list2, fail_list2 = resign(fail_list, driver)
logger.info(f"重新签到1, 成功{len(succeed_list2)}/失败{len(fail_list2)}")
succeed_list3 = []
fail_list3 = []
if fail_list2:
time.sleep(5)
succeed_list3, fail_list3 = resign(fail_list2, driver)
logger.info(f"重新签到2, 成功{len(succeed_list3)}/失败{len(fail_list3)}")
logger.info("不重试签到 列表:")
printList(temp_pass, True)
logger.info("重试依然签到失败 列表:")
printList(fail_list3, True)
show_extra_info(succeed_list + succeed_list2 + succeed_list3 + fail_list3)
rewrite_result(succeed_list + succeed_list2 + succeed_list3 + fail_list3)
driver.quit()
driver = None
else:
logger.error(f"初始化失败")
sys.exit(-1)
if __name__ == "__main__":
global logger
logger = get_logger()
parser = argparse.ArgumentParser(description='哈哈哈哈')
# 添加命令行参数
parser.add_argument('-f', '--force', action='store_true', help='强制重新运行,忽略已运行记录')
parser.add_argument('-o', '--once', action='store_true', help='立即运行一次')
parser.add_argument('site_name', nargs='?', default='all', help='指定站点名,默认all')
# 解析命令行参数
args = parser.parse_args()
# 输出解析结果
logger.info(f'参数 force: {args.force}')
logger.info(f'参数 once: {args.once}')
logger.info(f'参数 site_name: {args.site_name}')
if args.once == True:
main(args.force, args.site_name)
else:
logger.info(f'开始等待')
while True:
now = datetime.datetime.now()
today = datetime.date.today()
current_hour = now.hour
current_minute = now.minute
if current_hour == 4 and current_minute == 0:
logger.info(f'现在是{today.day}日{current_hour}时{current_minute}分')
main(args.force, args.site_name)
logger.info(f'开始等待')
else:
time.sleep(50)