From ce33c66397b93411b8b17a93e75df22e2ceabd76 Mon Sep 17 00:00:00 2001 From: Ben Cuan Date: Sun, 29 Dec 2024 01:03:23 -0800 Subject: [PATCH 1/6] add image downloading --- .gitignore | 1 + substack_scraper.py | 351 +++++++++++++++++++++++++------------------- 2 files changed, 203 insertions(+), 149 deletions(-) diff --git a/.gitignore b/.gitignore index 86effc14..26f4d36e 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ substack_html_pages/* # Ignore substack_md_files directory /substack_md_files/ +substack_images/ \ No newline at end of file diff --git a/substack_scraper.py b/substack_scraper.py index 7644d6db..6864913b 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -1,9 +1,14 @@ import argparse import json import os +import hashlib +import mimetypes +import re from abc import ABC, abstractmethod +from pathlib import Path from typing import List, Optional, Tuple from time import sleep +from urllib.parse import urlparse, unquote from bs4 import BeautifulSoup import html2text @@ -17,76 +22,139 @@ from webdriver_manager.microsoft import EdgeChromiumDriverManager from selenium.webdriver.edge.options import Options as EdgeOptions from selenium.webdriver.chrome.service import Service -from urllib.parse import urlparse from config import EMAIL, PASSWORD -USE_PREMIUM: bool = False # Set to True if you want to login to Substack and convert paid for posts -BASE_SUBSTACK_URL: str = "https://www.thefitzwilliam.com/" # Substack you want to convert to markdown -BASE_MD_DIR: str = "substack_md_files" # Name of the directory we'll save the .md essay files -BASE_HTML_DIR: str = "substack_html_pages" # Name of the directory we'll save the .html essay files -HTML_TEMPLATE: str = "author_template.html" # HTML template to use for the author page +USE_PREMIUM: bool = False +BASE_SUBSTACK_URL: str = "https://www.thefitzwilliam.com/" +BASE_MD_DIR: str = "substack_md_files" +BASE_HTML_DIR: str = "substack_html_pages" +BASE_IMAGE_DIR: str = "substack_images" +HTML_TEMPLATE: str = "author_template.html" JSON_DATA_DIR: str = "data" -NUM_POSTS_TO_SCRAPE: int = 3 # Set to 0 if you want all posts +NUM_POSTS_TO_SCRAPE: int = 3 +def count_images_in_markdown(md_content: str) -> int: + """Count number of Substack CDN image URLs in markdown content.""" + pattern = r'https://substackcdn\.com/image/fetch/[^\s\)]+\)' + matches = re.findall(pattern, md_content) + return len(matches) -def extract_main_part(url: str) -> str: - parts = urlparse(url).netloc.split('.') # Parse the URL to get the netloc, and split on '.' - return parts[1] if parts[0] == 'www' else parts[0] # Return the main part of the domain, while ignoring 'www' if - # present +def is_post_url(url: str) -> bool: + return "/p/" in url + +def get_publication_url(url: str) -> str: + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}/" +def extract_main_part(url: str) -> str: + parts = urlparse(url).netloc.split('.') + return parts[1] if parts[0] == 'www' else parts[0] + +def get_post_slug(url: str) -> str: + match = re.search(r'/p/([^/]+)', url) + return match.group(1) if match else 'unknown_post' + +def sanitize_filename(url: str) -> str: + """Create a safe filename from URL or content.""" + # Extract original filename from CDN URL + if "substackcdn.com" in url: + # Get the actual image URL after the CDN parameters + original_url = unquote(url.split("/https%3A%2F%2F")[1]) + filename = original_url.split("/")[-1] + else: + filename = url.split("/")[-1] + + # Remove invalid characters + filename = re.sub(r'[<>:"/\\|?*]', '', filename) + + # If filename is too long or empty, create hash-based name + if len(filename) > 100 or not filename: + hash_object = hashlib.md5(url.encode()) + ext = mimetypes.guess_extension(requests.head(url).headers.get('content-type', '')) or '.jpg' + filename = f"{hash_object.hexdigest()}{ext}" + + return filename + +def download_image(url: str, save_path: Path, pbar: Optional[tqdm] = None) -> Optional[str]: + """Download image from URL and save to path.""" + try: + response = requests.get(url, stream=True) + if response.status_code == 200: + save_path.parent.mkdir(parents=True, exist_ok=True) + with open(save_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + if pbar: + pbar.update(1) + return str(save_path) + except Exception as e: + if pbar: + pbar.write(f"Error downloading image {url}: {str(e)}") + return None + +def process_markdown_images(md_content: str, author: str, post_slug: str, pbar: Optional[tqdm] = None) -> str: + """Process markdown content to download images and update references.""" + image_dir = Path(BASE_IMAGE_DIR) / author / post_slug + + def replace_image(match): + url = match.group(0).strip('()') + filename = sanitize_filename(url) + save_path = image_dir / filename + + if not save_path.exists(): + download_image(url, save_path, pbar) + + rel_path = os.path.relpath(save_path, Path(BASE_MD_DIR) / author) + return f"({rel_path})" + + pattern = r'\(https://substackcdn\.com/image/fetch/[^\s\)]+\)' + return re.sub(pattern, replace_image, md_content) def generate_html_file(author_name: str) -> None: - """ - Generates a HTML file for the given author. - """ if not os.path.exists(BASE_HTML_DIR): os.makedirs(BASE_HTML_DIR) - # Read JSON data json_path = os.path.join(JSON_DATA_DIR, f'{author_name}.json') with open(json_path, 'r', encoding='utf-8') as file: essays_data = json.load(file) - # Convert JSON data to a JSON string for embedding embedded_json_data = json.dumps(essays_data, ensure_ascii=False, indent=4) with open(HTML_TEMPLATE, 'r', encoding='utf-8') as file: html_template = file.read() - # Insert the JSON string into the script tag in the HTML template html_with_data = html_template.replace('', author_name).replace( '', f'' ) html_with_author = html_with_data.replace('author_name', author_name) - # Write the modified HTML to a new file html_output_path = os.path.join(BASE_HTML_DIR, f'{author_name}.html') with open(html_output_path, 'w', encoding='utf-8') as file: file.write(html_with_author) - class BaseSubstackScraper(ABC): - def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str): - if not base_substack_url.endswith("/"): - base_substack_url += "/" - self.base_substack_url: str = base_substack_url - - self.writer_name: str = extract_main_part(base_substack_url) - md_save_dir: str = f"{md_save_dir}/{self.writer_name}" - - self.md_save_dir: str = md_save_dir - self.html_save_dir: str = f"{html_save_dir}/{self.writer_name}" - - if not os.path.exists(md_save_dir): - os.makedirs(md_save_dir) - print(f"Created md directory {md_save_dir}") - if not os.path.exists(self.html_save_dir): - os.makedirs(self.html_save_dir) - print(f"Created html directory {self.html_save_dir}") - - self.keywords: List[str] = ["about", "archive", "podcast"] - self.post_urls: List[str] = self.get_all_post_urls() + def __init__(self, url: str, md_save_dir: str, html_save_dir: str, download_images: bool = False): + self.is_single_post = is_post_url(url) + self.base_substack_url = get_publication_url(url) + self.writer_name = extract_main_part(self.base_substack_url) + self.post_slug = get_post_slug(url) if self.is_single_post else None + + self.md_save_dir = Path(md_save_dir) / self.writer_name + self.html_save_dir = Path(html_save_dir) / self.writer_name + self.image_dir = Path(BASE_IMAGE_DIR) / self.writer_name + self.download_images = download_images + + for directory in [self.md_save_dir, self.html_save_dir]: + directory.mkdir(parents=True, exist_ok=True) + print(f"Created directory {directory}") + + if self.is_single_post: + self.post_urls = [url] + else: + self.keywords = ["about", "archive", "podcast"] + self.post_urls = self.get_all_post_urls() def get_all_post_urls(self) -> List[str]: """ @@ -296,53 +364,65 @@ def save_essays_data_to_json(self, essays_data: list) -> None: json.dump(essays_data, f, ensure_ascii=False, indent=4) def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: - """ - Iterates over all posts and saves them as markdown and html files - """ + """Iterates over posts and saves them as markdown and html files with progress bars.""" essays_data = [] count = 0 total = num_posts_to_scrape if num_posts_to_scrape != 0 else len(self.post_urls) - for url in tqdm(self.post_urls, total=total): - try: - md_filename = self.get_filename_from_url(url, filetype=".md") - html_filename = self.get_filename_from_url(url, filetype=".html") - md_filepath = os.path.join(self.md_save_dir, md_filename) - html_filepath = os.path.join(self.html_save_dir, html_filename) - - if not os.path.exists(md_filepath): - soup = self.get_url_soup(url) - if soup is None: - total += 1 - continue - title, subtitle, like_count, date, md = self.extract_post_data(soup) - self.save_to_file(md_filepath, md) - - # Convert markdown to HTML and save - html_content = self.md_to_html(md) - self.save_to_html_file(html_filepath, html_content) - - essays_data.append({ - "title": title, - "subtitle": subtitle, - "like_count": like_count, - "date": date, - "file_link": md_filepath, - "html_link": html_filepath - }) - else: - print(f"File already exists: {md_filepath}") - except Exception as e: - print(f"Error scraping post: {e}") - count += 1 - if num_posts_to_scrape != 0 and count == num_posts_to_scrape: - break + + with tqdm(total=total, desc="Scraping posts") as pbar: + for url in self.post_urls: + try: + post_slug = url.split('/')[-1] + md_filename = self.get_filename_from_url(url, filetype=".md") + html_filename = self.get_filename_from_url(url, filetype=".html") + md_filepath = os.path.join(self.md_save_dir, md_filename) + html_filepath = os.path.join(self.html_save_dir, html_filename) + + if not os.path.exists(md_filepath): + soup = self.get_url_soup(url) + if soup is None: + total += 1 + continue + + title, subtitle, like_count, date, md = self.extract_post_data(soup) + + # Count images before downloading + total_images = count_images_in_markdown(md) + post_slug = url.split("/p/")[-1].split("/")[0] + + with tqdm(total=total_images, desc=f"Downloading images for {post_slug}", leave=False) as img_pbar: + md = process_markdown_images(md, self.writer_name, post_slug, img_pbar) + + self.save_to_file(md_filepath, md) + html_content = self.md_to_html(md) + self.save_to_html_file(html_filepath, html_content) + + essays_data.append({ + "title": title, + "subtitle": subtitle, + "like_count": like_count, + "date": date, + "file_link": md_filepath, + "html_link": html_filepath + }) + else: + pbar.write(f"File already exists: {md_filepath}") + + except Exception as e: + pbar.write(f"Error scraping post: {e}") + + count += 1 + pbar.update(1) + if num_posts_to_scrape != 0 and count == num_posts_to_scrape: + break + self.save_essays_data_to_json(essays_data=essays_data) generate_html_file(author_name=self.writer_name) class SubstackScraper(BaseSubstackScraper): - def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str): - super().__init__(base_substack_url, md_save_dir, html_save_dir) + def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str, download_images: bool = False): + super().__init__(base_substack_url, md_save_dir, html_save_dir, download_images) def get_url_soup(self, url: str) -> Optional[BeautifulSoup]: """ @@ -368,9 +448,10 @@ def __init__( headless: bool = False, edge_path: str = '', edge_driver_path: str = '', - user_agent: str = '' + user_agent: str = '', + download_images: bool = False, ) -> None: - super().__init__(base_substack_url, md_save_dir, html_save_dir) + super().__init__(base_substack_url, md_save_dir, html_save_dir, download_images) options = EdgeOptions() if headless: @@ -438,102 +519,74 @@ def get_url_soup(self, url: str) -> BeautifulSoup: def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Scrape a Substack site.") + parser = argparse.ArgumentParser(description="Scrape a Substack site or individual post.") parser.add_argument( - "-u", "--url", type=str, help="The base URL of the Substack site to scrape." + "-u", "--url", type=str, required=True, + help="URL of either a Substack publication or individual post" ) parser.add_argument( "-d", "--directory", type=str, help="The directory to save scraped posts." ) parser.add_argument( - "-n", - "--number", - type=int, - default=0, - help="The number of posts to scrape. If 0 or not provided, all posts will be scraped.", + "-n", "--number", type=int, default=0, + help="The number of posts to scrape. If 0 or not provided, all posts will be scraped. Ignored for single posts." ) parser.add_argument( - "-p", - "--premium", - action="store_true", - help="Include -p in command to use the Premium Substack Scraper with selenium.", + "--images", action="store_true", + help="Download images and update markdown to use local paths" ) parser.add_argument( - "--headless", - action="store_true", - help="Include -h in command to run browser in headless mode when using the Premium Substack " - "Scraper.", + "-p", "--premium", action="store_true", + help="Use the Premium Substack Scraper with selenium." ) parser.add_argument( - "--edge-path", - type=str, - default="", - help='Optional: The path to the Edge browser executable (i.e. "path_to_msedge.exe").', + "--headless", action="store_true", + help="Run browser in headless mode when using the Premium Substack Scraper." ) parser.add_argument( - "--edge-driver-path", - type=str, - default="", - help='Optional: The path to the Edge WebDriver executable (i.e. "path_to_msedgedriver.exe").', + "--edge-path", type=str, default="", + help='Optional: The path to the Edge browser executable.' ) parser.add_argument( - "--user-agent", - type=str, - default="", - help="Optional: Specify a custom user agent for selenium browser automation. Useful for " - "passing captcha in headless mode", + "--edge-driver-path", type=str, default="", + help='Optional: The path to the Edge WebDriver executable.' ) parser.add_argument( - "--html-directory", - type=str, - help="The directory to save scraped posts as HTML files.", + "--user-agent", type=str, default="", + help="Optional: Specify a custom user agent for selenium browser automation." + ) + parser.add_argument( + "--html-directory", type=str, + help="The directory to save scraped posts as HTML files." ) - return parser.parse_args() - def main(): args = parse_args() - + if args.directory is None: args.directory = BASE_MD_DIR - + if args.html_directory is None: args.html_directory = BASE_HTML_DIR - if args.url: - if args.premium: - scraper = PremiumSubstackScraper( - args.url, - headless=args.headless, - md_save_dir=args.directory, - html_save_dir=args.html_directory - ) - else: - scraper = SubstackScraper( - args.url, - md_save_dir=args.directory, - html_save_dir=args.html_directory - ) - scraper.scrape_posts(args.number) - - else: # Use the hardcoded values at the top of the file - if USE_PREMIUM: - scraper = PremiumSubstackScraper( - base_substack_url=BASE_SUBSTACK_URL, - md_save_dir=args.directory, - html_save_dir=args.html_directory, - edge_path=args.edge_path, - edge_driver_path=args.edge_driver_path - ) - else: - scraper = SubstackScraper( - base_substack_url=BASE_SUBSTACK_URL, - md_save_dir=args.directory, - html_save_dir=args.html_directory - ) - scraper.scrape_posts(num_posts_to_scrape=NUM_POSTS_TO_SCRAPE) - + if args.premium: + scraper = PremiumSubstackScraper( + args.url, + headless=args.headless, + md_save_dir=args.directory, + html_save_dir=args.html_directory, + download_images=args.images + ) + else: + scraper = SubstackScraper( + args.url, + md_save_dir=args.directory, + html_save_dir=args.html_directory, + download_images=args.images + ) + + scraper.scrape_posts(args.number) if __name__ == "__main__": - main() + main() \ No newline at end of file From 1ae3e4e8154b04e4dec7304716fc64d9fe090370 Mon Sep 17 00:00:00 2001 From: Ben Cuan Date: Sun, 29 Dec 2024 13:40:46 -0800 Subject: [PATCH 2/6] add tests --- requirements.txt | 1 + substack_scraper.py | 8 +- tests/test_substack_scraper.py | 181 +++++++++++++++++++++++++++++++++ 3 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 tests/test_substack_scraper.py diff --git a/requirements.txt b/requirements.txt index c58926a7..3fe971b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ selenium==4.16.0 tqdm==4.66.1 webdriver_manager==4.0.1 Markdown==3.6 +pytest==8.3.4 \ No newline at end of file diff --git a/substack_scraper.py b/substack_scraper.py index 6864913b..d4d81b0f 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -59,7 +59,7 @@ def sanitize_filename(url: str) -> str: # Extract original filename from CDN URL if "substackcdn.com" in url: # Get the actual image URL after the CDN parameters - original_url = unquote(url.split("/https%3A%2F%2F")[1]) + original_url = unquote(url.split("https://")[1]) filename = original_url.split("/")[-1] else: filename = url.split("/")[-1] @@ -80,17 +80,22 @@ def download_image(url: str, save_path: Path, pbar: Optional[tqdm] = None) -> Op try: response = requests.get(url, stream=True) if response.status_code == 200: + print('HIII') save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) + print("HI12") if pbar: + print("PBAR") pbar.update(1) return str(save_path) except Exception as e: if pbar: pbar.write(f"Error downloading image {url}: {str(e)}") + else: + print(f"Error downloading image {url}: {str(e)}") return None def process_markdown_images(md_content: str, author: str, post_slug: str, pbar: Optional[tqdm] = None) -> str: @@ -101,7 +106,6 @@ def replace_image(match): url = match.group(0).strip('()') filename = sanitize_filename(url) save_path = image_dir / filename - if not save_path.exists(): download_image(url, save_path, pbar) diff --git a/tests/test_substack_scraper.py b/tests/test_substack_scraper.py new file mode 100644 index 00000000..dc658ca6 --- /dev/null +++ b/tests/test_substack_scraper.py @@ -0,0 +1,181 @@ +import os +import shutil +import pytest +from pathlib import Path +from unittest.mock import Mock, patch + +from substack_scraper import ( + BASE_IMAGE_DIR, + SubstackScraper, + count_images_in_markdown, + sanitize_filename, + process_markdown_images, +) + +@pytest.fixture +def mock_html_content(): + return """ + + +

Test Post

+

Test Subtitle

+
+

Test content with image:

+ + +
+ + + """ + +@pytest.fixture +def mock_image_response(): + return b"fake-image-data" + +@pytest.fixture +def temp_dir(tmp_path): + """Create temporary directory structure for tests""" + md_dir = tmp_path / "substack_md_files" + html_dir = tmp_path / "substack_html_pages" + img_dir = tmp_path / "substack_images" + + md_dir.mkdir() + html_dir.mkdir() + img_dir.mkdir() + + return tmp_path + +def test_count_images_in_markdown(): + markdown_content = """ + Here's an image: + ![Test](https://substackcdn.com/image/fetch/test1.jpg) + And another: + ![Test2](https://substackcdn.com/image/fetch/test2.jpg) + And some text. + """ + assert count_images_in_markdown(markdown_content) == 2 + +def test_sanitize_filename(): + url = "https://substackcdn.com/image/fetch/w_720/test%2Fimage.jpg" + filename = sanitize_filename(url) + assert isinstance(filename, str) + assert filename.endswith(".jpg") + assert "/" not in filename + assert "\\" not in filename + +def test_process_markdown_images(temp_dir, monkeypatch): + markdown_content = """ + ![Test](https://substackcdn.com/image/fetch/test1.jpg) + ![Test2](https://substackcdn.com/image/fetch/test2.jpg) + """ + + # Delete testauthor folder if exists + test_author_dir = Path(BASE_IMAGE_DIR) / "testauthor" + if test_author_dir.exists(): + shutil.rmtree(test_author_dir) + + # Mock requests.get + mock_get = Mock() + mock_get.return_value.iter_content = lambda chunk_size: [] + mock_get.return_value.status_code = 200 + monkeypatch.setattr("requests.get", mock_get) + + # Mock tqdm + mock_tqdm = Mock() + mock_tqdm.update = Mock() + + processed_md = process_markdown_images( + markdown_content, + "testauthor", + "testpost", + mock_tqdm + ) + + assert "../substack_images/" in processed_md + assert mock_get.called + assert mock_tqdm.update.called + +def test_scraper_initialization(temp_dir): + scraper = SubstackScraper( + "https://test.substack.com", + str(temp_dir / "substack_md_files"), + str(temp_dir / "substack_html_pages") + ) + assert scraper.writer_name == "test" + assert os.path.exists(scraper.md_save_dir) + assert os.path.exists(scraper.html_save_dir) + +@patch("requests.get") +def test_scraper_single_post(mock_get, temp_dir, mock_html_content): + mock_get.return_value.ok = True + mock_get.return_value.content = mock_html_content.encode() + + scraper = SubstackScraper( + "https://test.substack.com", + str(temp_dir / "substack_md_files"), + str(temp_dir / "substack_html_pages") + ) + + url = "https://test.substack.com/p/test-post" + soup = scraper.get_url_soup(url) + title, subtitle, like_count, date, md = scraper.extract_post_data(soup) + + assert title == "Test Post" + assert subtitle == "Test Subtitle" + assert isinstance(md, str) + +def test_premium_content_handling(temp_dir, monkeypatch): + html_with_paywall = """ + + +

Premium Content

+ + + """ + + # Mock requests.get + mock_get = Mock() + mock_get.return_value.content = html_with_paywall.encode() + monkeypatch.setattr("requests.get", mock_get) + + scraper = SubstackScraper( + "https://test.substack.com", + str(temp_dir / "substack_md_files"), + str(temp_dir / "substack_html_pages") + ) + + result = scraper.get_url_soup("https://test.substack.com/p/premium-post") + assert result is None + +def test_image_download_error_handling(temp_dir, monkeypatch): + # Mock requests.get to simulate network error + def mock_get(*args, **kwargs): + raise Exception("Network error") + + monkeypatch.setattr("requests.get", mock_get) + + markdown_content = "![Test](https://substackcdn.com/image/fetch/test.jpg)" + mock_tqdm = Mock() + + # Should not raise exception but log error + processed_md = process_markdown_images( + markdown_content, + "testauthor", + "testpost", + mock_tqdm + ) + +def test_directory_structure(temp_dir): + scraper = SubstackScraper( + "https://test.substack.com", + str(temp_dir / "substack_md_files"), + str(temp_dir / "substack_html_pages") + ) + + assert Path(scraper.md_save_dir).exists() + assert Path(scraper.html_save_dir).exists() + assert "test" in str(scraper.md_save_dir) + assert "test" in str(scraper.html_save_dir) + +if __name__ == "__main__": + pytest.main(["-v"]) From b17babcf695ee7c92f690b3188349770d1b2beda Mon Sep 17 00:00:00 2001 From: Ben Cuan Date: Sun, 29 Dec 2024 13:55:56 -0800 Subject: [PATCH 3/6] only download images if --images is passed in --- substack_scraper.py | 16 +++++++--------- tests/test_substack_scraper.py | 27 +++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/substack_scraper.py b/substack_scraper.py index d4d81b0f..bfa1a102 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -80,15 +80,12 @@ def download_image(url: str, save_path: Path, pbar: Optional[tqdm] = None) -> Op try: response = requests.get(url, stream=True) if response.status_code == 200: - print('HIII') save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) - print("HI12") if pbar: - print("PBAR") pbar.update(1) return str(save_path) except Exception as e: @@ -390,13 +387,14 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: title, subtitle, like_count, date, md = self.extract_post_data(soup) - # Count images before downloading - total_images = count_images_in_markdown(md) - post_slug = url.split("/p/")[-1].split("/")[0] - - with tqdm(total=total_images, desc=f"Downloading images for {post_slug}", leave=False) as img_pbar: - md = process_markdown_images(md, self.writer_name, post_slug, img_pbar) + if self.download_images: + # Count images before downloading + total_images = count_images_in_markdown(md) + post_slug = url.split("/p/")[-1].split("/")[0] + with tqdm(total=total_images, desc=f"Downloading images for {post_slug}", leave=False) as img_pbar: + md = process_markdown_images(md, self.writer_name, post_slug, img_pbar) + self.save_to_file(md_filepath, md) html_content = self.md_to_html(md) self.save_to_html_file(html_filepath, html_content) diff --git a/tests/test_substack_scraper.py b/tests/test_substack_scraper.py index dc658ca6..7792dde8 100644 --- a/tests/test_substack_scraper.py +++ b/tests/test_substack_scraper.py @@ -177,5 +177,32 @@ def test_directory_structure(temp_dir): assert "test" in str(scraper.md_save_dir) assert "test" in str(scraper.html_save_dir) +def test_scraper_without_images(temp_dir): + """Test that images are not downloaded when --images flag is not set""" + + # Initialize scraper with images=False + scraper = SubstackScraper( + base_substack_url="https://on.substack.com", + md_save_dir=str(temp_dir / "substack_md_files"), + html_save_dir=str(temp_dir / "substack_html_pages"), + download_images=False + ) + + # Run scraper + scraper.scrape_posts(num_posts_to_scrape=1) + + # # Check that markdown files were created + md_files = list(Path(temp_dir / "substack_md_files" / "on").glob("*.md")) + assert len(md_files) > 0 + + # Check that no image directory was created + img_dir = temp_dir / "substack_images" / "on" + assert not img_dir.exists() + + # Verify markdown content still contains original image URLs + with open(md_files[0], 'r') as f: + content = f.read() + assert "https://substackcdn.com/image/fetch" in content + if __name__ == "__main__": pytest.main(["-v"]) From 905bcd267e5e09f00bcd0954f3ae23d597c5dfdd Mon Sep 17 00:00:00 2001 From: Ben Cuan Date: Sun, 29 Dec 2024 13:59:01 -0800 Subject: [PATCH 4/6] update readme --- README.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/README.md b/README.md index c81f0df4..f32206db 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,25 @@ To scrape a specific number of posts: python substack_scraper.py --url https://example.substack.com --directory /path/to/save/posts --number 5 ``` +To scrape a single post: + +```bash +python substack_scraper.py --url https://example.substack.com/p/example-post --directory /path/to/save/posts +``` + +To scrape images and download them to a `substack_images/` folder locally: + +```bash +python substack_scraper.py --url https://example.substack.com --directory /path/to/save/posts --images +``` + +### Testing +Run tests using pytest: + +``` +python -m pytest +``` + ### Online Version For a hassle-free experience without any local setup: From 7c39b9c00706e77922c4e569a51e5e4eed1214bd Mon Sep 17 00:00:00 2001 From: Ben Cuan Date: Sun, 29 Dec 2024 14:08:49 -0800 Subject: [PATCH 5/6] cleanup diff --- substack_scraper.py | 159 ++++++++++++++++++++++++++++---------------- 1 file changed, 103 insertions(+), 56 deletions(-) diff --git a/substack_scraper.py b/substack_scraper.py index bfa1a102..7df5e610 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -24,14 +24,14 @@ from selenium.webdriver.chrome.service import Service from config import EMAIL, PASSWORD -USE_PREMIUM: bool = False -BASE_SUBSTACK_URL: str = "https://www.thefitzwilliam.com/" -BASE_MD_DIR: str = "substack_md_files" -BASE_HTML_DIR: str = "substack_html_pages" -BASE_IMAGE_DIR: str = "substack_images" -HTML_TEMPLATE: str = "author_template.html" +USE_PREMIUM: bool = False # Set to True if you want to login to Substack and convert paid for posts +BASE_SUBSTACK_URL: str = "https://www.thefitzwilliam.com/" # Substack you want to convert to markdown +BASE_MD_DIR: str = "substack_md_files" # Name of the directory we'll save the .md essay files +BASE_HTML_DIR: str = "substack_html_pages" # Name of the directory we'll save the .html essay files +BASE_IMAGE_DIR: str = "substack_images" # Name of the directory we'll save images to if --images is passed in +HTML_TEMPLATE: str = "author_template.html" # HTML template to use for the author page JSON_DATA_DIR: str = "data" -NUM_POSTS_TO_SCRAPE: int = 3 +NUM_POSTS_TO_SCRAPE: int = 3 # Set to 0 if you want all posts def count_images_in_markdown(md_content: str) -> int: """Count number of Substack CDN image URLs in markdown content.""" @@ -47,8 +47,9 @@ def get_publication_url(url: str) -> str: return f"{parsed.scheme}://{parsed.netloc}/" def extract_main_part(url: str) -> str: - parts = urlparse(url).netloc.split('.') - return parts[1] if parts[0] == 'www' else parts[0] + parts = urlparse(url).netloc.split('.') # Parse the URL to get the netloc, and split on '.' + return parts[1] if parts[0] == 'www' else parts[0] # Return the main part of the domain, while ignoring 'www' if + # present def get_post_slug(url: str) -> str: match = re.search(r'/p/([^/]+)', url) @@ -113,49 +114,57 @@ def replace_image(match): return re.sub(pattern, replace_image, md_content) def generate_html_file(author_name: str) -> None: + """ + Generates a HTML file for the given author. + """ if not os.path.exists(BASE_HTML_DIR): os.makedirs(BASE_HTML_DIR) + # Read JSON data json_path = os.path.join(JSON_DATA_DIR, f'{author_name}.json') with open(json_path, 'r', encoding='utf-8') as file: essays_data = json.load(file) + # Convert JSON data to a JSON string for embedding embedded_json_data = json.dumps(essays_data, ensure_ascii=False, indent=4) with open(HTML_TEMPLATE, 'r', encoding='utf-8') as file: html_template = file.read() + # Insert the JSON string into the script tag in the HTML template html_with_data = html_template.replace('', author_name).replace( '', f'' ) html_with_author = html_with_data.replace('author_name', author_name) + # Write the modified HTML to a new file html_output_path = os.path.join(BASE_HTML_DIR, f'{author_name}.html') with open(html_output_path, 'w', encoding='utf-8') as file: file.write(html_with_author) + class BaseSubstackScraper(ABC): - def __init__(self, url: str, md_save_dir: str, html_save_dir: str, download_images: bool = False): - self.is_single_post = is_post_url(url) - self.base_substack_url = get_publication_url(url) - self.writer_name = extract_main_part(self.base_substack_url) - self.post_slug = get_post_slug(url) if self.is_single_post else None + def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str, download_images: bool = False): + self.is_single_post: bool = is_post_url(base_substack_url) + self.base_substack_url: str = get_publication_url(base_substack_url) + self.writer_name: str = extract_main_part(self.base_substack_url) + self.post_slug: Optional[str] = get_post_slug(base_substack_url) if self.is_single_post else None - self.md_save_dir = Path(md_save_dir) / self.writer_name - self.html_save_dir = Path(html_save_dir) / self.writer_name - self.image_dir = Path(BASE_IMAGE_DIR) / self.writer_name - self.download_images = download_images + self.md_save_dir: str = Path(md_save_dir) / self.writer_name + self.html_save_dir: str = Path(html_save_dir) / self.writer_name + self.image_dir: str = Path(BASE_IMAGE_DIR) / self.writer_name + self.download_images: bool = download_images for directory in [self.md_save_dir, self.html_save_dir]: directory.mkdir(parents=True, exist_ok=True) print(f"Created directory {directory}") if self.is_single_post: - self.post_urls = [url] + self.post_urls = [base_substack_url] else: - self.keywords = ["about", "archive", "podcast"] - self.post_urls = self.get_all_post_urls() + self.keywords: List[str] = ["about", "archive", "podcast"] + self.post_urls: List[str] = self.get_all_post_urls() def get_all_post_urls(self) -> List[str]: """ @@ -365,7 +374,9 @@ def save_essays_data_to_json(self, essays_data: list) -> None: json.dump(essays_data, f, ensure_ascii=False, indent=4) def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: - """Iterates over posts and saves them as markdown and html files with progress bars.""" + """ + Iterates over all posts and saves them as markdown and html files + """ essays_data = [] count = 0 total = num_posts_to_scrape if num_posts_to_scrape != 0 else len(self.post_urls) @@ -523,43 +534,58 @@ def get_url_soup(self, url: str) -> BeautifulSoup: def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Scrape a Substack site or individual post.") parser.add_argument( - "-u", "--url", type=str, required=True, - help="URL of either a Substack publication or individual post" + "-u", "--url", type=str, help="The base URL of the Substack site to scrape." ) parser.add_argument( "-d", "--directory", type=str, help="The directory to save scraped posts." ) parser.add_argument( - "-n", "--number", type=int, default=0, - help="The number of posts to scrape. If 0 or not provided, all posts will be scraped. Ignored for single posts." + "-n", + "--number", + type=int, + default=0, + help="The number of posts to scrape. If 0 or not provided, all posts will be scraped.", ) parser.add_argument( - "--images", action="store_true", - help="Download images and update markdown to use local paths" + "-p", + "--premium", + action="store_true", + help="Include -p in command to use the Premium Substack Scraper with selenium.", ) parser.add_argument( - "-p", "--premium", action="store_true", - help="Use the Premium Substack Scraper with selenium." + "--headless", + action="store_true", + help="Include -h in command to run browser in headless mode when using the Premium Substack " + "Scraper.", ) parser.add_argument( - "--headless", action="store_true", - help="Run browser in headless mode when using the Premium Substack Scraper." + "--edge-path", + type=str, + default="", + help='Optional: The path to the Edge browser executable (i.e. "path_to_msedge.exe").', ) parser.add_argument( - "--edge-path", type=str, default="", - help='Optional: The path to the Edge browser executable.' + "--edge-driver-path", + type=str, + default="", + help='Optional: The path to the Edge WebDriver executable (i.e. "path_to_msedgedriver.exe").', ) parser.add_argument( - "--edge-driver-path", type=str, default="", - help='Optional: The path to the Edge WebDriver executable.' + "--user-agent", + type=str, + default="", + help="Optional: Specify a custom user agent for selenium browser automation. Useful for " + "passing captcha in headless mode", ) parser.add_argument( - "--user-agent", type=str, default="", - help="Optional: Specify a custom user agent for selenium browser automation." + "--html-directory", + type=str, + help="The directory to save scraped posts as HTML files.", ) parser.add_argument( - "--html-directory", type=str, - help="The directory to save scraped posts as HTML files." + "--images", + action="store_true", + help="Download images and update markdown to use local paths" ) return parser.parse_args() @@ -572,23 +598,44 @@ def main(): if args.html_directory is None: args.html_directory = BASE_HTML_DIR - if args.premium: - scraper = PremiumSubstackScraper( - args.url, - headless=args.headless, - md_save_dir=args.directory, - html_save_dir=args.html_directory, - download_images=args.images - ) - else: - scraper = SubstackScraper( - args.url, - md_save_dir=args.directory, - html_save_dir=args.html_directory, - download_images=args.images - ) + if args.url: + if args.premium: + scraper = PremiumSubstackScraper( + args.url, + headless=args.headless, + md_save_dir=args.directory, + html_save_dir=args.html_directory, + download_images=args.images + ) + else: + scraper = SubstackScraper( + args.url, + md_save_dir=args.directory, + html_save_dir=args.html_directory, + download_images=args.images + ) + scraper.scrape_posts(args.number) + + else: # Use the hardcoded values at the top of the file + if USE_PREMIUM: + scraper = PremiumSubstackScraper( + base_substack_url=BASE_SUBSTACK_URL, + md_save_dir=args.directory, + html_save_dir=args.html_directory, + edge_path=args.edge_path, + edge_driver_path=args.edge_driver_path, + download_images=args.images + ) + else: + scraper = SubstackScraper( + base_substack_url=BASE_SUBSTACK_URL, + md_save_dir=args.directory, + html_save_dir=args.html_directory, + download_images=args.images + ) + scraper.scrape_posts(num_posts_to_scrape=NUM_POSTS_TO_SCRAPE) scraper.scrape_posts(args.number) if __name__ == "__main__": - main() \ No newline at end of file + main() From dd41cc5789c573a2a1c2b8a73ad5796ef19bd443 Mon Sep 17 00:00:00 2001 From: Ben Cuan Date: Sun, 29 Dec 2024 20:31:23 -0800 Subject: [PATCH 6/6] cleanup image links --- substack_scraper.py | 44 +++++++++++- tests/test_substack_scraper.py | 126 ++++++++++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 4 deletions(-) diff --git a/substack_scraper.py b/substack_scraper.py index 7df5e610..35002868 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -33,10 +33,45 @@ JSON_DATA_DIR: str = "data" NUM_POSTS_TO_SCRAPE: int = 3 # Set to 0 if you want all posts +def clean_linked_images(md_content: str) -> str: + """ + Converts markdown linked images to simple image references. + + Args: + md_content: String containing markdown content + + Returns: + String with cleaned markdown where linked images are converted to simple image references + + Example: + >>> md = "[![alt text](/img/test.png)](/img/test.png)" + >>> clean_linked_images(md) + '![alt text](/img/test.png)' + """ + # Pattern matches: [![any text](/path/img.ext)](/path/img.ext) + pattern = r'\[!\[(.*?)\]\((.*?)\)\]\(.*?\)' + + # Replace with: ![text](/path/img.ext) + cleaned = re.sub(pattern, r'![\1](\2)', md_content) + + return cleaned + def count_images_in_markdown(md_content: str) -> int: - """Count number of Substack CDN image URLs in markdown content.""" - pattern = r'https://substackcdn\.com/image/fetch/[^\s\)]+\)' - matches = re.findall(pattern, md_content) + """ + Count number of image references in markdown content. + + Args: + md_content: Markdown content to analyze + + Returns: + Number of unique images found + """ + # First clean linked images + cleaned_content = clean_linked_images(md_content) + + # Then count remaining image references + pattern = r'!\[.*?\]\((.*?)\)' + matches = re.findall(pattern, cleaned_content) return len(matches) def is_post_url(url: str) -> bool: @@ -100,6 +135,9 @@ def process_markdown_images(md_content: str, author: str, post_slug: str, pbar: """Process markdown content to download images and update references.""" image_dir = Path(BASE_IMAGE_DIR) / author / post_slug + # First clean up any linked images + md_content = clean_linked_images(md_content) + def replace_image(match): url = match.group(0).strip('()') filename = sanitize_filename(url) diff --git a/tests/test_substack_scraper.py b/tests/test_substack_scraper.py index 7792dde8..627918f9 100644 --- a/tests/test_substack_scraper.py +++ b/tests/test_substack_scraper.py @@ -7,6 +7,7 @@ from substack_scraper import ( BASE_IMAGE_DIR, SubstackScraper, + clean_linked_images, count_images_in_markdown, sanitize_filename, process_markdown_images, @@ -177,7 +178,130 @@ def test_directory_structure(temp_dir): assert "test" in str(scraper.md_save_dir) assert "test" in str(scraper.html_save_dir) -def test_scraper_without_images(temp_dir): +@pytest.mark.parametrize("test_case", [ + { + "name": "basic_cleaning", + "input": """ + Some text here + [![Image 1](/img/test/image1.png)](/img/test/image1.png) + More text + [![](/img/test/image2.jpg)](/img/test/image2.jpg) + Final text + """, + "expected": """ + Some text here + ![Image 1](/img/test/image1.png) + More text + ![](/img/test/image2.jpg) + Final text + """ + }, + { + "name": "mixed_content", + "input": """ + Regular link: [Link text](https://example.com) + Regular image: ![Alt text](/img/regular.jpg) + Linked image: [![Image](/img/linked/test.png)](/img/linked/test.png) + """, + "expected": """ + Regular link: [Link text](https://example.com) + Regular image: ![Alt text](/img/regular.jpg) + Linked image: ![Image](/img/linked/test.png) + """ + }, + { + "name": "substack_cdn", + "input": """ + [![](/img/test/image1.jpg)](https://substackcdn.com/image/fetch/test1.jpg) + [![Alt text](https://substackcdn.com/image/fetch/test2.jpg)](https://substackcdn.com/image/fetch/test2.jpg) + """, + "expected": """ + ![](/img/test/image1.jpg) + ![Alt text](https://substackcdn.com/image/fetch/test2.jpg) + """ + }, + { + "name": "no_changes_needed", + "input": """ + # Header + Regular text + ![Image](/img/test.jpg) + [Link](https://example.com) + """, + "expected": """ + # Header + Regular text + ![Image](/img/test.jpg) + [Link](https://example.com) + """ + }, + { + "name": "empty_content", + "input": "", + "expected": "" + }, + { + "name": "preserve_newlines", + "input": """ + Line 1 + + [![Image](/test.jpg)](/test.jpg) + + Line 2 + """, + "expected": """ + Line 1 + + ![Image](/test.jpg) + + Line 2 + """ + }, + { + "name": "special_characters", + "input": """ + [![Test & Demo](/img/test&demo.jpg)](/img/test&demo.jpg) + [![Spaces Test](/img/spaces%20test.jpg)](/img/spaces%20test.jpg) + """, + "expected": """ + ![Test & Demo](/img/test&demo.jpg) + ![Spaces Test](/img/spaces%20test.jpg) + """ + } +]) +def test_clean_linked_images(test_case): + """ + Parametrized test for cleaning linked images in markdown content. + Tests various scenarios including basic cleaning, mixed content, + CDN URLs, empty content, and special characters. + """ + result = clean_linked_images(test_case["input"]) + assert result.strip() == test_case["expected"].strip() + +def test_clean_linked_images_integration(temp_dir, monkeypatch): + """Test integration with markdown processing pipeline.""" + # Initialize scraper with images=False + scraper = SubstackScraper( + base_substack_url="https://on.substack.com", + md_save_dir=str(temp_dir / "substack_md_files"), + html_save_dir=str(temp_dir / "substack_html_pages"), + download_images=True + ) + # Run scraper + scraper.scrape_posts(num_posts_to_scrape=1) + + # # Check that markdown files were created + md_files = list(Path(temp_dir / "substack_md_files" / "on").glob("*.md")) + assert len(md_files) > 0 + + # Verify markdown content still contains original image URLs + with open(md_files[0], 'r') as f: + content = f.read() + assert "[![" not in content + assert "](" in content + assert "![" in content + +def test_scraper_without_images_integration(temp_dir): """Test that images are not downloaded when --images flag is not set""" # Initialize scraper with images=False