diff --git a/modules/web_search.py b/modules/web_search.py old mode 100644 new mode 100755 index 597af4b289..7ebe76df0b --- a/modules/web_search.py +++ b/modules/web_search.py @@ -5,7 +5,7 @@ import urllib.request from concurrent.futures import as_completed from datetime import datetime -from urllib.parse import quote_plus +from urllib.parse import quote_plus, urlparse, parse_qs, unquote import requests @@ -20,35 +20,201 @@ def get_current_timestamp(): def download_web_page(url, timeout=10): """ - Download a web page and convert its HTML content to structured Markdown text. + Download a web page and convert its HTML content to Markdown text, + handling Brotli/gzip and non-HTML content robustly. """ - import html2text + logger.info(f"Downloading {url}") + # --- soft deps try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' - } - response = requests.get(url, headers=headers, timeout=timeout) - response.raise_for_status() # Raise an exception for bad status codes - - # Initialize the HTML to Markdown converter - h = html2text.HTML2Text() - h.body_width = 0 - h.ignore_images = True - h.ignore_links = True - - # Convert the HTML to Markdown - markdown_text = h.handle(response.text) - - return markdown_text + import html2text + except Exception: + logger.exception("html2text import failed") + html2text = None + + try: + from readability import Document + except Exception: + Document = None + + try: + import brotli as _brotli + have_brotli = True + except Exception: + _brotli = None + have_brotli = False + + import gzip, zlib, re, html as _html + + headers = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.9", + # IMPORTANT: only advertise br if brotli is installed + "Accept-Encoding": "gzip, deflate" + (", br" if have_brotli else ""), + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + } + + try: + resp = requests.get(url, headers=headers, timeout=timeout) + resp.raise_for_status() + + # --- bail out early if it's not HTML + ctype = resp.headers.get("Content-Type", "").lower() + if not any(t in ctype for t in ("text/html", "application/xhtml+xml")): + logger.warning("Non-HTML content-type %r at %s", ctype, url) + return "" + + # --- get raw bytes then decompress if server didn't/requests couldn't + raw = resp.content # bytes + enc_hdr = resp.headers.get("Content-Encoding", "").lower() + + # If requests didn't decode (it normally does gzip/deflate), handle manually. + if "br" in enc_hdr and have_brotli: + try: + raw = _brotli.decompress(raw) + except Exception: + # it may already be decoded; ignore + pass + elif "gzip" in enc_hdr: + try: + raw = gzip.decompress(raw) + except Exception: + pass + elif "deflate" in enc_hdr: + try: + raw = zlib.decompress(raw, -zlib.MAX_WBITS) + except Exception: + pass + + # --- decode text with a robust charset guess + # use HTTP charset if present + charset = None + if "charset=" in ctype: + charset = ctype.split("charset=")[-1].split(";")[0].strip() + if not charset: + # requests’ detector + charset = resp.apparent_encoding or "utf-8" + try: + html_text = raw.decode(charset, errors="replace") + except Exception: + html_text = raw.decode("utf-8", errors="replace") + + # anti-bot shells (avoid empty output surprises) + if re.search(r"(cf-chl|Just a moment|enable JavaScript)", html_text, re.I): + logger.warning("Possible anti-bot/challenge page at %s", url) + + # --- extract readable text (readability -> html2text -> fallback) + md_readability = "" + if Document is not None: + try: + doc = Document(html_text) + title = (doc.short_title() or "").strip() + main_html = doc.summary(html_partial=True) + main_text = re.sub(r"<[^>]+>", " ", main_html, flags=re.S) + main_text = re.sub(r"\s+", " ", main_text).strip() + if title: + md_readability = f"# {title}\n\n{main_text}".strip() + else: + md_readability = main_text + except Exception: + logger.exception("readability failed on %s", url) + + md_html2text = "" + if html2text is not None: + try: + h = html2text.HTML2Text() + h.body_width = 0 + h.ignore_images = True + h.ignore_links = True + h.single_line_break = True + md_html2text = (h.handle(html_text) or "").strip() + except Exception: + logger.exception("html2text failed on %s", url) + + def _clean(s): + s = re.sub(r"<[^>]+>", " ", s, flags=re.S) + return _html.unescape(re.sub(r"\s+", " ", s)).strip() + + # fallback: meta/title/headers/paragraphs + noscript + parts = [] + t = re.search(r"
]*>(.*?)
", html_text, re.I | re.S)[:8] if _clean(p)] + for n in re.findall(r"", html_text, re.I | re.S): + c = _clean(n) + if c: + parts.append(c) + md_fallback = "\n\n".join([p for p in parts if p]).strip() + + best = max([md_readability, md_html2text, md_fallback], key=lambda s: len(s or "")) + if not best.strip(): + logger.warning("Empty content extracted from %s", url) + return best + except requests.exceptions.RequestException as e: logger.error(f"Error downloading {url}: {e}") return "" - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") + except Exception: + logger.exception("Unexpected error while downloading %s", url) return "" + +def _extract_results_from_duckduckgo(response_text, num_pages): + # 1) Grab the title anchors (they carry the real clickable href) + # We capture both the inner text (title) and href. + anchor_pattern = re.compile( + r']*class="[^"]*result__a[^"]*"[^>]*href="([^"]+)"[^>]*>(.*?)', + re.DOTALL | re.IGNORECASE + ) + matches = anchor_pattern.findall(response_text) + + results = [] + for href, title_html in matches: + # 2) Resolve DuckDuckGo redirect: ?uddg=