From ac7111e89ff951eacaade80eb1849bd5349c0886 Mon Sep 17 00:00:00 2001 From: wzdnzd Date: Wed, 13 Nov 2024 08:35:27 +0800 Subject: [PATCH] enhanced backend address extraction --- subscribe/crawl.py | 124 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 95 insertions(+), 29 deletions(-) diff --git a/subscribe/crawl.py b/subscribe/crawl.py index 851c7b14c..a7d0e24ff 100644 --- a/subscribe/crawl.py +++ b/subscribe/crawl.py @@ -1599,39 +1599,100 @@ def run_crawl(url: str, separator: str, address_regex: str, coupon_regex: str) - return result - def extract_real_url(domain: str, retry: int = 2) -> str: - count, retry = 0, max(retry, 1) - url = urllib.parse.urljoin(domain, "env.js") + def extract_backend_url(domain: str, retry: int = 2) -> str: + # TODO: exploring a more generalized approach to backend addresses + def request_once(suffix: str) -> tuple[bool, str]: + count, suffix = 0, utils.trim(suffix) + url = urllib.parse.urljoin(domain, suffix) - while count < retry: - count += 1 + while count < retry: + count += 1 - try: - request = urllib.request.Request(url=url, headers=utils.DEFAULT_HTTP_HEADERS, method="GET") - response = urllib.request.urlopen(request, timeout=6, context=utils.CTX) + try: + request = urllib.request.Request(url=url, headers=utils.DEFAULT_HTTP_HEADERS, method="GET") + response = urllib.request.urlopen(request, timeout=6, context=utils.CTX) + + word = "" if not suffix else (suffix if suffix.startswith("/") else "/" + suffix) + if word and not utils.trim(response.geturl()).endswith(word): + return True, "" + + content = response.read() + try: + content = str(content, encoding="utf8") + except: + content = gzip.decompress(content).decode("utf8") + + return False, content + except urllib.error.HTTPError as e: + if e.code == 404: + return False, "" + except urllib.error.URLError as e: + if isinstance(e.reason, (socket.gaierror, ssl.SSLError, socket.timeout)): + return True, "" + except Exception as e: + pass + + return False, "" + + def attempt_env() -> str: + status, content = request_once(suffix="/env.js") + if status: + return terminated - # do not redirect - # opener = urllib.request.build_opener(utils.NoRedirect) - # response = opener.open(request, timeout=6) + if not content: + return "" - if not utils.trim(response.geturl()).endswith("/env.js"): - return "" + if groups := re.findall(r"\bhost\b:(?:\s+)?[\"\'](https?://[^\s\r\t]+)[\"\']", content, flags=re.I): + return utils.trim(groups[0]) - content = response.read() - try: - content = str(content, encoding="utf8") - except: - content = gzip.decompress(content).decode("utf8") - - groups = re.findall(r"window.routerBase(?:\s+)?=(?:\s+)?['\"](https?://.*)['\"]", content, flags=re.I) - return groups[0].rstrip("/") if groups and groups[0] else domain - except urllib.error.HTTPError: - return domain - except urllib.error.URLError as e: - if isinstance(e.reason, (socket.gaierror, ssl.SSLError, socket.timeout)): - return "" - except Exception as e: - pass + groups = re.findall(r"window.routerBase(?:\s+)?=(?:\s+)?['\"](https?://.*)['\"]", content, flags=re.I) + return groups[0].rstrip("/") if groups and groups[0] else "" + + def attempt_zero() -> str: + status, content = request_once(suffix="/config.json") + if status: + return terminated + + if not content: + return "" + + try: + data = json.loads(content) + # for https://github.com/amyouran/v2board-Zero-Theme + link = utils.trim(data.get("api_base", "")) + if not link: + # for https://github.com/DyAxy/V2B-Theme-Nest + link = utils.trim(data.get("apiUrl", "")) + + return utils.extract_domain(url=link, include_protocal=True) + except: + return "" + + def attempt_buddy() -> str: + # for https://github.com/vlesstop/v2board-theme-buddy + status, content = request_once(suffix="/config.js") + if status: + return terminated + + group = re.findall(r"\bhost\b:(?:\s+)?[\"\'](https?://[^\s\r\t]+)[\"\']", content, flags=re.I) + return "" if not group else utils.trim(group[0]) + + def attempt_aurora() -> str: + # for https://github.com/krsunm/Aurora + status, content = request_once(suffix="") + if status: + return terminated + + group = re.findall(r"\bserverUrl\b:(?:\s+)?[\"\'](https?://[^\s\r\t]+)[\"\']", content, flags=re.I) + return "" if not group else utils.trim(group[0]) + + terminated, retry = "terminated", max(retry, 1) + for func in [attempt_env, attempt_zero, attempt_buddy, attempt_aurora]: + backend = func() + if terminated == backend: + return "" + if backend: + return backend return domain @@ -1668,7 +1729,12 @@ def extract_real_url(domain: str, retry: int = 2) -> str: # extract real routing base url logger.info(f"[AirPortCollector] fetched {len(domains)} airport, start extracting real routing addresses") - sites = utils.multi_thread_run(func=extract_real_url, tasks=domains, num_threads=num_thread, show_progress=display) + sites = utils.multi_thread_run( + func=extract_backend_url, + tasks=domains, + num_threads=num_thread, + show_progress=display, + ) tasks = [[site, rigid, chuck] for site in sites if site] records = {sites[i]: candidates.get(domains[i], "") for i in range(len(sites)) if sites[i]}