From 8ed6b613f8c104615d6aa3d32db695a6fd0dff79 Mon Sep 17 00:00:00 2001 From: Nikhil Kandpal Date: Mon, 15 Apr 2024 09:56:06 -0400 Subject: [PATCH 01/10] initial commit for usgpo --- usgpo/download_files.py | 75 ++++++++++++++++++++ usgpo/get_links.py | 151 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 usgpo/download_files.py create mode 100644 usgpo/get_links.py diff --git a/usgpo/download_files.py b/usgpo/download_files.py new file mode 100644 index 0000000..b2d0dc4 --- /dev/null +++ b/usgpo/download_files.py @@ -0,0 +1,75 @@ +import argparse +import requests +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +import logging +import queue +import time + +import jsonlines +from tqdm.auto import tqdm + + +logging.basicConfig(level=logging.INFO, format="download-files: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s") + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--api-key", required=True, help="GovInfo API key") + parser.add_argument("--links-file", required=True, help="Path to links file (jsonl)") + parser.add_argument("--output-dir", required=True, help="Path to output directory") + parser.add_argument("--workers", type=int, default=10, help="Number of threads") + args = parser.parse_args() + return args + + +def api_query(endpoint, headers, params): + response = requests.get(endpoint, headers=headers, params=params) + if response.status_code == 429: + # Sleep for an hour if we've hit the rate-limit + logging.info("Sleeping for one hour to avoid rate-limit") + time.sleep(60*60) + response = requests.get(endpoint, headers=headers, params=params) + return response + + +def download_file(api_key, file): + download_metadata = file["links"] + link = download_metadata.get("txtLink") + if link is not None: + response = api_query(link, headers=None, params={"api_key": api_key}) + text = response.text + return text + return None + + +def construct_record(api_key, file): + text = download_file(api_key, file) + if text is None: + return None + return { + "title": file["title"], + "date": file["date"], + "author": file["author"], + "publisher": file["publisher"], + "category": file["category"], + "text": text + } + + +def main(args): + records = [] + with jsonlines.open(args.links_file, mode="r") as reader: + with jsonlines.open(os.path.join(args.output_dir, "records.jsonl"), "w") as writer: + with ThreadPoolExecutor(max_workers=args.workers) as executor: + futures = [executor.submit(construct_record, args.api_key, file) for file in reader] + for future in tqdm(as_completed(futures)): + record = future.result() + if record is not None: + writer.write(record) + + + +if __name__ == "__main__": + args = parse_args() + main(args) diff --git a/usgpo/get_links.py b/usgpo/get_links.py new file mode 100644 index 0000000..e5e7499 --- /dev/null +++ b/usgpo/get_links.py @@ -0,0 +1,151 @@ +import argparse +import requests +import json +import logging +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +import queue +import time + +from tqdm.auto import tqdm +import jsonlines + + +logging.basicConfig(level=logging.INFO, format="get-links: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s") + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--api-key", required=True, help="GovInfo API key") + parser.add_argument("--start-date", required=True, help="Start date in ISO8601 format (yyyy-MM-dd'T'HH:mm:ss'Z')") + parser.add_argument("--output-dir", required=True, help="Path to output directory") + parser.add_argument("--workers", type=int, default=10, help="Number of threads") + parser.add_argument("--collections", nargs="+", default=["BILLS", + "BUDGET", + "CDIR", + "CFR", + "CPD", + "CRI", + "CZIC", + "GAOREPORTS", + "GOVPUB", + "GPO", + "HJOURNAL", + "HOB", + "PAI", + "PLAW", + "USCODE"]) + args = parser.parse_args() + return args + + +def api_query(endpoint, headers, params): + response = requests.get(endpoint, headers=headers, params=params) + if response.status_code == 429: + # Sleep for an hour if we've hit the rate-limit + logging.info("Sleeping for one hour to avoid rate-limit") + time.sleep(60*60) + response = requests.get(endpoint, headers=headers, params=params) + return response + + +def get_collections(api_key): + response = api_query("https://api.govinfo.gov/collections", headers={"accept": "application/json"}, params={"api_key": args.api_key}) + if response.status_code == 200: + output = response.json() + for record in output["collections"]: + yield record["collectionCode"] + else: + logging.error(f"get_collections received status code {response.status_code}") + + +def get_packages(api_key, collections, start_date, package_queue): + url = f"https://api.govinfo.gov/published/{start_date}" + offset_mark = "*" + pbar = tqdm(desc="Producer") + while url is not None: + response = api_query(url, + headers={"accept": "application/json"}, + params={"api_key": args.api_key, "offsetMark": offset_mark, "pageSize": 1000, "collection": ",".join(collections)}) + if response.status_code == 200: + output = response.json() + + for record in output["packages"]: + package_queue.put(record) + pbar.update(1) + + url = output["nextPage"] + offset_mark = None + # Prevent too many API requests in a short period of time + time.sleep(5) + else: + logging.error(f"get_packages received status code {response.status_code} for query {url}") + break + + package_queue.put(None) + + +def get_file_links(api_key, package): + package_id = package["packageId"] + response = api_query(f"https://api.govinfo.gov/packages/{package_id}/summary", headers={"accept": "application/json"}, params={"api_key": args.api_key}) + if response.status_code == 200: + output = response.json() + return output.get("download") + return None + + +def get_package_metadata(api_key, package_queue, metadata_queue): + pbar = tqdm(desc="Consumer") + while True: + package = package_queue.get() + if package is None: + package_queue.put(None) + metadata_queue.put(None) + break + + record = { + "title": package.get("title"), + "package_id": package.get("packageId"), + "date": package.get("dateIssued"), + "category": package.get("category"), + "author": package.get("governmentAuthor1"), + "publisher": package.get("publisher"), + "links": get_file_links(api_key, package) + } + metadata_queue.put(record) + pbar.update(1) + + +def write_metadata(output_dir, metadata_queue): + with jsonlines.open(os.path.join(output_dir, "links.jsonl"), mode="w") as writer: + pbar = tqdm(desc="Writer") + while True: + metadata = metadata_queue.get() + if metadata is None: + metadata_queue.task_done() + break + + writer.write(metadata) + pbar.update(1) + + +def main(args): + os.makedirs(args.output_dir, exist_ok=True) + + package_queue = queue.Queue() + metadata_queue = queue.Queue() + + with ThreadPoolExecutor(max_workers=args.workers+2) as executor: + executor.submit(get_packages, args.api_key, args.collections, args.start_date, package_queue) + + for _ in range(args.workers): + executor.submit(get_package_metadata, args.api_key, package_queue, metadata_queue) + + executor.submit(write_metadata, args.output_dir, metadata_queue) + + metadata_queue.join() + + +if __name__ == "__main__": + args = parse_args() + main(args) From 509fe0de73b80077594779c882ae00595394176f Mon Sep 17 00:00:00 2001 From: Nikhil Kandpal Date: Mon, 13 May 2024 10:36:02 -0400 Subject: [PATCH 02/10] Write out to Dolma --- usgpo/download-files.py | 119 ++++++++++++++++++++++++ usgpo/download_files.py | 75 ---------------- usgpo/{get_links.py => get-links.py} | 129 +++++++++++++++++---------- 3 files changed, 200 insertions(+), 123 deletions(-) create mode 100644 usgpo/download-files.py delete mode 100644 usgpo/download_files.py rename usgpo/{get_links.py => get-links.py} (52%) diff --git a/usgpo/download-files.py b/usgpo/download-files.py new file mode 100644 index 0000000..e622548 --- /dev/null +++ b/usgpo/download-files.py @@ -0,0 +1,119 @@ +import argparse +import html +import logging +import os +import queue +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +import datetime + +import jsonlines +import requests +from bs4 import BeautifulSoup +from tqdm.auto import tqdm + +from licensed_pile.write import to_dolma +from licensed_pile.licenses import PermissiveLicenses + +logging.basicConfig( + level=logging.INFO, + format="download-files: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s", +) + + +SOURCE_NAME = "usgpo" + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--api-key", required=True, help="GovInfo API key") + parser.add_argument( + "--links-file", required=True, help="Path to links file (jsonl)" + ) + parser.add_argument( + "--output-dir", + default=f"data/{SOURCE_NAME}/v0", + help="Path to output directory", + ) + parser.add_argument( + "--filename", + default="usgpo.jsonl.gz", + help="The base filename for the USGPO Dolma dataset", + ) + parser.add_argument( + "--shard-size", type=int, default=1, help="Size, in GB, for each shard" + ) + parser.add_argument("--workers", type=int, default=10, help="Number of threads") + args = parser.parse_args() + return args + + +def api_query(endpoint, headers, params): + response = requests.get(endpoint, headers=headers, params=params) + if response.status_code == 429: + # Sleep for an hour if we've hit the rate-limit + logging.info("Sleeping for one hour to avoid rate-limit") + time.sleep(60 * 60) + response = requests.get(endpoint, headers=headers, params=params) + return response + + +def download_file(api_key, file_url): + response = api_query(file_url, headers=None, params={"api_key": api_key}) + text = response.text + return text + + +def parse_html(text): + # Most documents are primarily pre-formatted text inside of the a
 tag
+    # If so, just take the contents of that tag instead of the whole document
+    soup = BeautifulSoup(text, "html.parser")
+    pre_tag = soup.find("pre")
+    if pre_tag:
+        parsed_text = pre_tag.get_text()
+    else:
+        parsed_text = text
+    return html.unescape(parsed_text)
+
+
+def construct_record(api_key, file):
+    file_url = file["links"].get("txtLink")
+    if file_url is None:
+        return None
+    raw_html = download_file(api_key, file_url)
+    parsed_text = parse_html(raw_html)
+
+    return {
+        "id": file["package_id"],
+        "title": file["title"],
+        "date": file["date"],
+        "author": file["author"],
+        "publisher": file["publisher"],
+        "category": file["category"],
+        "html": raw_html,
+        "text": parsed_text,
+        "source": SOURCE_NAME,
+        "added": datetime.datetime.utcnow().isoformat(),
+        "metadata": {"license": str(PermissiveLicenses.PD), "url": file_url},
+    }
+
+
+def generate_records(args):
+    with jsonlines.open(args.links_file, mode="r") as reader:
+        with ThreadPoolExecutor(max_workers=args.workers) as executor:
+            futures = [
+                executor.submit(construct_record, args.api_key, file) for file in reader
+            ]
+            for future in as_completed(futures):
+                record = future.result()
+                if record is not None:
+                    yield record
+
+
+def main(args):
+    to_dolma(generate_records(args), args.output_dir, args.filename, args.shard_size)
+
+
+if __name__ == "__main__":
+    args = parse_args()
+    main(args)
diff --git a/usgpo/download_files.py b/usgpo/download_files.py
deleted file mode 100644
index b2d0dc4..0000000
--- a/usgpo/download_files.py
+++ /dev/null
@@ -1,75 +0,0 @@
-import argparse
-import requests
-import os
-from concurrent.futures import ThreadPoolExecutor, as_completed
-import logging
-import queue
-import time
-
-import jsonlines
-from tqdm.auto import tqdm
-
-
-logging.basicConfig(level=logging.INFO, format="download-files: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s")
-
-
-def parse_args():
-    parser = argparse.ArgumentParser()
-    parser.add_argument("--api-key", required=True, help="GovInfo API key")
-    parser.add_argument("--links-file", required=True, help="Path to links file (jsonl)")
-    parser.add_argument("--output-dir", required=True, help="Path to output directory")
-    parser.add_argument("--workers", type=int, default=10, help="Number of threads")
-    args = parser.parse_args()
-    return args
-
-
-def api_query(endpoint, headers, params):
-    response = requests.get(endpoint, headers=headers, params=params)
-    if response.status_code == 429:
-        # Sleep for an hour if we've hit the rate-limit
-        logging.info("Sleeping for one hour to avoid rate-limit")
-        time.sleep(60*60)
-        response = requests.get(endpoint, headers=headers, params=params)
-    return response
-
-
-def download_file(api_key, file):
-    download_metadata = file["links"]
-    link = download_metadata.get("txtLink")
-    if link is not None:
-        response = api_query(link, headers=None, params={"api_key": api_key})
-        text = response.text
-        return text
-    return None
-
-
-def construct_record(api_key, file):
-    text = download_file(api_key, file)
-    if text is None:
-        return None
-    return {
-            "title": file["title"],
-            "date": file["date"],
-            "author": file["author"],
-            "publisher": file["publisher"],
-            "category": file["category"],
-            "text": text
-            }
-
-
-def main(args):
-    records = []
-    with jsonlines.open(args.links_file, mode="r") as reader:
-        with jsonlines.open(os.path.join(args.output_dir, "records.jsonl"), "w") as writer:
-            with ThreadPoolExecutor(max_workers=args.workers) as executor:
-                futures = [executor.submit(construct_record, args.api_key, file) for file in reader]
-                for future in tqdm(as_completed(futures)):
-                    record = future.result()
-                    if record is not None:
-                        writer.write(record)
-                        
-
-
-if __name__ == "__main__":
-    args = parse_args()
-    main(args)
diff --git a/usgpo/get_links.py b/usgpo/get-links.py
similarity index 52%
rename from usgpo/get_links.py
rename to usgpo/get-links.py
index e5e7499..2d9f0b3 100644
--- a/usgpo/get_links.py
+++ b/usgpo/get-links.py
@@ -1,40 +1,52 @@
 import argparse
-import requests
 import json
 import logging
 import os
-from concurrent.futures import ThreadPoolExecutor, as_completed
 import queue
 import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
 
-from tqdm.auto import tqdm
 import jsonlines
+import requests
+from tqdm.auto import tqdm
 
-
-logging.basicConfig(level=logging.INFO, format="get-links: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s")
+logging.basicConfig(
+    level=logging.INFO,
+    format="get-links: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s",
+)
 
 
 def parse_args():
     parser = argparse.ArgumentParser()
     parser.add_argument("--api-key", required=True, help="GovInfo API key")
-    parser.add_argument("--start-date", required=True, help="Start date in ISO8601 format (yyyy-MM-dd'T'HH:mm:ss'Z')")
+    parser.add_argument(
+        "--start-date",
+        required=True,
+        help="Start date in ISO8601 format (yyyy-MM-dd'T'HH:mm:ss'Z')",
+    )
     parser.add_argument("--output-dir", required=True, help="Path to output directory")
-    parser.add_argument("--workers", type=int, default=10, help="Number of threads")
-    parser.add_argument("--collections", nargs="+", default=["BILLS",
-                                                             "BUDGET",
-                                                             "CDIR",
-                                                             "CFR",
-                                                             "CPD",
-                                                             "CRI",
-                                                             "CZIC",
-                                                             "GAOREPORTS",
-                                                             "GOVPUB",
-                                                             "GPO",
-                                                             "HJOURNAL",
-                                                             "HOB",
-                                                             "PAI",
-                                                             "PLAW",
-                                                             "USCODE"])
+    parser.add_argument("--workers", type=int, default=20, help="Number of threads")
+    parser.add_argument(
+        "--collections",
+        nargs="+",
+        default=[
+            "BILLS",
+            "BUDGET",
+            "CDIR",
+            "CFR",
+            "CPD",
+            "CRI",
+            "CZIC",
+            "GAOREPORTS",
+            "GOVPUB",
+            "GPO",
+            "HJOURNAL",
+            "HOB",
+            "PAI",
+            "PLAW",
+            "USCODE",
+        ],
+    )
     args = parser.parse_args()
     return args
 
@@ -44,13 +56,17 @@ def api_query(endpoint, headers, params):
     if response.status_code == 429:
         # Sleep for an hour if we've hit the rate-limit
         logging.info("Sleeping for one hour to avoid rate-limit")
-        time.sleep(60*60)
+        time.sleep(60 * 60)
         response = requests.get(endpoint, headers=headers, params=params)
     return response
 
 
 def get_collections(api_key):
-    response = api_query("https://api.govinfo.gov/collections", headers={"accept": "application/json"}, params={"api_key": args.api_key})
+    response = api_query(
+        "https://api.govinfo.gov/collections",
+        headers={"accept": "application/json"},
+        params={"api_key": args.api_key},
+    )
     if response.status_code == 200:
         output = response.json()
         for record in output["collections"]:
@@ -64,9 +80,16 @@ def get_packages(api_key, collections, start_date, package_queue):
     offset_mark = "*"
     pbar = tqdm(desc="Producer")
     while url is not None:
-        response = api_query(url, 
-                             headers={"accept": "application/json"}, 
-                             params={"api_key": args.api_key, "offsetMark": offset_mark, "pageSize": 1000, "collection": ",".join(collections)})
+        response = api_query(
+            url,
+            headers={"accept": "application/json"},
+            params={
+                "api_key": args.api_key,
+                "offsetMark": offset_mark,
+                "pageSize": 1000,
+                "collection": ",".join(collections),
+            },
+        )
         if response.status_code == 200:
             output = response.json()
 
@@ -74,20 +97,26 @@ def get_packages(api_key, collections, start_date, package_queue):
                 package_queue.put(record)
                 pbar.update(1)
 
-            url = output["nextPage"] 
+            url = output["nextPage"]
             offset_mark = None
             # Prevent too many API requests in a short period of time
             time.sleep(5)
         else:
-            logging.error(f"get_packages received status code {response.status_code} for query {url}")
+            logging.error(
+                f"get_packages received status code {response.status_code} for query {url}"
+            )
             break
-    
+
     package_queue.put(None)
 
 
 def get_file_links(api_key, package):
     package_id = package["packageId"]
-    response = api_query(f"https://api.govinfo.gov/packages/{package_id}/summary", headers={"accept": "application/json"}, params={"api_key": args.api_key})
+    response = api_query(
+        f"https://api.govinfo.gov/packages/{package_id}/summary",
+        headers={"accept": "application/json"},
+        params={"api_key": args.api_key},
+    )
     if response.status_code == 200:
         output = response.json()
         return output.get("download")
@@ -104,17 +133,17 @@ def get_package_metadata(api_key, package_queue, metadata_queue):
             break
 
         record = {
-                "title": package.get("title"),
-                "package_id": package.get("packageId"),
-                "date": package.get("dateIssued"),
-                "category": package.get("category"),
-                "author": package.get("governmentAuthor1"),
-                "publisher": package.get("publisher"),
-                "links": get_file_links(api_key, package) 
-                }
+            "title": package.get("title"),
+            "package_id": package.get("packageId"),
+            "date": package.get("dateIssued"),
+            "category": package.get("category"),
+            "author": package.get("governmentAuthor1"),
+            "publisher": package.get("publisher"),
+            "links": get_file_links(api_key, package),
+        }
         metadata_queue.put(record)
         pbar.update(1)
-        
+
 
 def write_metadata(output_dir, metadata_queue):
     with jsonlines.open(os.path.join(output_dir, "links.jsonl"), mode="w") as writer:
@@ -124,27 +153,31 @@ def write_metadata(output_dir, metadata_queue):
             if metadata is None:
                 metadata_queue.task_done()
                 break
-            
+
             writer.write(metadata)
             pbar.update(1)
 
 
 def main(args):
     os.makedirs(args.output_dir, exist_ok=True)
-    
+
     package_queue = queue.Queue()
     metadata_queue = queue.Queue()
 
-    with ThreadPoolExecutor(max_workers=args.workers+2) as executor:
-        executor.submit(get_packages, args.api_key, args.collections, args.start_date, package_queue)
-        
+    with ThreadPoolExecutor(max_workers=args.workers + 2) as executor:
+        executor.submit(
+            get_packages, args.api_key, args.collections, args.start_date, package_queue
+        )
+
         for _ in range(args.workers):
-            executor.submit(get_package_metadata, args.api_key, package_queue, metadata_queue)
-        
+            executor.submit(
+                get_package_metadata, args.api_key, package_queue, metadata_queue
+            )
+
         executor.submit(write_metadata, args.output_dir, metadata_queue)
 
         metadata_queue.join()
-    
+
 
 if __name__ == "__main__":
     args = parse_args()

From 78aced6767ee2590c8bb288eba88542845c6f361 Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Mon, 13 May 2024 10:36:13 -0400
Subject: [PATCH 03/10] Driver script

---
 usgpo/get-data.sh | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
 create mode 100644 usgpo/get-data.sh

diff --git a/usgpo/get-data.sh b/usgpo/get-data.sh
new file mode 100644
index 0000000..b54f0e4
--- /dev/null
+++ b/usgpo/get-data.sh
@@ -0,0 +1,14 @@
+set -e
+
+api_key=${1}
+start_date=${2}
+
+USGPO_DIRECTORY="data/usgpo"
+
+mkdir -p ${USGPO_DIRECTORY}/raw
+
+echo "Getting Document Links"
+python usgpo/get-links.py --api-key "${api_key}" --start-date "${start_date}" --output-dir ${USGPO_DIRECTORY}/raw
+
+echo "Downloading Documents"
+python usgpo/download-files.py --api-key ${api_key} --links-file ${USGPO_DIRECTORY}/raw/links.jsonl

From f06e1e8da215f0eaebb31d3610e908f4dd0300b7 Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Mon, 13 May 2024 10:44:28 -0400
Subject: [PATCH 04/10] fix logging

---
 usgpo/download-files.py | 16 ++++++----------
 usgpo/get-links.py      | 19 ++++++++++---------
 2 files changed, 16 insertions(+), 19 deletions(-)

diff --git a/usgpo/download-files.py b/usgpo/download-files.py
index e622548..f094893 100644
--- a/usgpo/download-files.py
+++ b/usgpo/download-files.py
@@ -1,25 +1,19 @@
 import argparse
+import datetime
 import html
-import logging
 import os
 import queue
 import time
 from concurrent.futures import ThreadPoolExecutor, as_completed
-import datetime
 
 import jsonlines
 import requests
 from bs4 import BeautifulSoup
 from tqdm.auto import tqdm
 
-from licensed_pile.write import to_dolma
+from licensed_pile import logs
 from licensed_pile.licenses import PermissiveLicenses
-
-logging.basicConfig(
-    level=logging.INFO,
-    format="download-files: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s",
-)
-
+from licensed_pile.write import to_dolma
 
 SOURCE_NAME = "usgpo"
 
@@ -49,10 +43,11 @@ def parse_args():
 
 
 def api_query(endpoint, headers, params):
+    logger = logs.get_logger("usgpo")
     response = requests.get(endpoint, headers=headers, params=params)
     if response.status_code == 429:
         # Sleep for an hour if we've hit the rate-limit
-        logging.info("Sleeping for one hour to avoid rate-limit")
+        logger.info("Sleeping for one hour to avoid rate-limit")
         time.sleep(60 * 60)
         response = requests.get(endpoint, headers=headers, params=params)
     return response
@@ -116,4 +111,5 @@ def main(args):
 
 if __name__ == "__main__":
     args = parse_args()
+    logs.configure_logging("usgpo")
     main(args)
diff --git a/usgpo/get-links.py b/usgpo/get-links.py
index 2d9f0b3..e605907 100644
--- a/usgpo/get-links.py
+++ b/usgpo/get-links.py
@@ -1,6 +1,5 @@
 import argparse
 import json
-import logging
 import os
 import queue
 import time
@@ -10,11 +9,6 @@
 import requests
 from tqdm.auto import tqdm
 
-logging.basicConfig(
-    level=logging.INFO,
-    format="get-links: [%(asctime)s] [%(funcName)s] %(levelname)s - %(message)s",
-)
-
 
 def parse_args():
     parser = argparse.ArgumentParser()
@@ -52,16 +46,18 @@ def parse_args():
 
 
 def api_query(endpoint, headers, params):
+    logger = logs.get_logger("usgpo")
     response = requests.get(endpoint, headers=headers, params=params)
     if response.status_code == 429:
         # Sleep for an hour if we've hit the rate-limit
-        logging.info("Sleeping for one hour to avoid rate-limit")
+        logger.info("Sleeping for one hour to avoid rate-limit")
         time.sleep(60 * 60)
         response = requests.get(endpoint, headers=headers, params=params)
     return response
 
 
 def get_collections(api_key):
+    logger = logs.get_logger("usgpo")
     response = api_query(
         "https://api.govinfo.gov/collections",
         headers={"accept": "application/json"},
@@ -72,10 +68,11 @@ def get_collections(api_key):
         for record in output["collections"]:
             yield record["collectionCode"]
     else:
-        logging.error(f"get_collections received status code {response.status_code}")
+        logger.error(f"get_collections received status code {response.status_code}")
 
 
 def get_packages(api_key, collections, start_date, package_queue):
+    logger = logs.get_logger("usgpo")
     url = f"https://api.govinfo.gov/published/{start_date}"
     offset_mark = "*"
     pbar = tqdm(desc="Producer")
@@ -102,7 +99,7 @@ def get_packages(api_key, collections, start_date, package_queue):
             # Prevent too many API requests in a short period of time
             time.sleep(5)
         else:
-            logging.error(
+            logger.error(
                 f"get_packages received status code {response.status_code} for query {url}"
             )
             break
@@ -165,15 +162,18 @@ def main(args):
     metadata_queue = queue.Queue()
 
     with ThreadPoolExecutor(max_workers=args.workers + 2) as executor:
+        # One thread for getting each package (i.e. file) from the specified collections
         executor.submit(
             get_packages, args.api_key, args.collections, args.start_date, package_queue
         )
 
+        # `args.workers` threads for getting package metadata
         for _ in range(args.workers):
             executor.submit(
                 get_package_metadata, args.api_key, package_queue, metadata_queue
             )
 
+        # One thread for writing out the package metadata to disk
         executor.submit(write_metadata, args.output_dir, metadata_queue)
 
         metadata_queue.join()
@@ -181,4 +181,5 @@ def main(args):
 
 if __name__ == "__main__":
     args = parse_args()
+    logs.configure_logging("usgpo")
     main(args)

From b7def76c8439982e0c239971e7922195c45314a0 Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Mon, 13 May 2024 10:46:55 -0400
Subject: [PATCH 05/10] Set default start-date parameter as 01/01/1990

---
 usgpo/get-data.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/usgpo/get-data.sh b/usgpo/get-data.sh
index b54f0e4..fc91ee2 100644
--- a/usgpo/get-data.sh
+++ b/usgpo/get-data.sh
@@ -1,7 +1,7 @@
 set -e
 
 api_key=${1}
-start_date=${2}
+start_date=${2:-"1990-01-01'T'00:00:00'Z'"}
 
 USGPO_DIRECTORY="data/usgpo"
 

From 931ddd147114936d490829cd35cb5938abfa9113 Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Mon, 13 May 2024 10:52:27 -0400
Subject: [PATCH 06/10] Add README

---
 usgpo/README.md | 8 ++++++++
 1 file changed, 8 insertions(+)
 create mode 100644 usgpo/README.md

diff --git a/usgpo/README.md b/usgpo/README.md
new file mode 100644
index 0000000..3e96dff
--- /dev/null
+++ b/usgpo/README.md
@@ -0,0 +1,8 @@
+# USGPO
+
+Government documents published by the [US Government Publishing Office](http://www.gpo.gov). Since each of these documents are authored by the US Federal Government, they are in the Public Domain.
+
+# Data Collection
+
+To collect the documents, run the script `usgpo/get-data.sh` from the repo's top-level directory. Internally, this will run `get-links.py` to get a collection of links to the government documents and `download-files.py` to download each link and parse out the relevant text. This command will save the final dataset in `data/usgpo/v0`.
+

From 6e1e3ee44d5b5733b85880dcb1637de9111e3d56 Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Mon, 10 Jun 2024 11:50:41 -0400
Subject: [PATCH 07/10] - fix the dataset output location to be in usgpo/ - fix
 missing imports

---
 usgpo/get-data.sh  | 4 ++--
 usgpo/get-links.py | 1 +
 2 files changed, 3 insertions(+), 2 deletions(-)
 mode change 100644 => 100755 usgpo/get-data.sh

diff --git a/usgpo/get-data.sh b/usgpo/get-data.sh
old mode 100644
new mode 100755
index fc91ee2..afeb8cc
--- a/usgpo/get-data.sh
+++ b/usgpo/get-data.sh
@@ -8,7 +8,7 @@ USGPO_DIRECTORY="data/usgpo"
 mkdir -p ${USGPO_DIRECTORY}/raw
 
 echo "Getting Document Links"
-python usgpo/get-links.py --api-key "${api_key}" --start-date "${start_date}" --output-dir ${USGPO_DIRECTORY}/raw
+python get-links.py --api-key "${api_key}" --start-date "${start_date}" --output-dir ${USGPO_DIRECTORY}/raw
 
 echo "Downloading Documents"
-python usgpo/download-files.py --api-key ${api_key} --links-file ${USGPO_DIRECTORY}/raw/links.jsonl
+python download-files.py --api-key ${api_key} --links-file ${USGPO_DIRECTORY}/raw/links.jsonl
diff --git a/usgpo/get-links.py b/usgpo/get-links.py
index e605907..7b0bf0f 100644
--- a/usgpo/get-links.py
+++ b/usgpo/get-links.py
@@ -9,6 +9,7 @@
 import requests
 from tqdm.auto import tqdm
 
+from licensed_pile import logs
 
 def parse_args():
     parser = argparse.ArgumentParser()

From 68e44b94fc674e9a29fa739ab172f1bcc11d77e3 Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Mon, 10 Jun 2024 11:53:27 -0400
Subject: [PATCH 08/10] black

---
 usgpo/get-links.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/usgpo/get-links.py b/usgpo/get-links.py
index 7b0bf0f..91d8ffa 100644
--- a/usgpo/get-links.py
+++ b/usgpo/get-links.py
@@ -11,6 +11,7 @@
 
 from licensed_pile import logs
 
+
 def parse_args():
     parser = argparse.ArgumentParser()
     parser.add_argument("--api-key", required=True, help="GovInfo API key")

From f14783bc35054a98c995c4742003bda73c31997f Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Thu, 20 Jun 2024 14:05:09 -0400
Subject: [PATCH 09/10] - factor out API calls into `utils.py` - Changed from
 producer/consumer parallelism to (1) collect all packages and then (2)
 collect all metadata with thread pool - Changed from BeautifulSoup hardcoded
 html parsing of 
 tag to more flexible `trafilatura.extract` since some
 documents are more complex html

---
 usgpo/download-files.py |  38 ++----------
 usgpo/get-links.py      | 127 ++++++++++++----------------------------
 usgpo/utils.py          |  15 +++++
 3 files changed, 59 insertions(+), 121 deletions(-)
 create mode 100644 usgpo/utils.py

diff --git a/usgpo/download-files.py b/usgpo/download-files.py
index f094893..69dad32 100644
--- a/usgpo/download-files.py
+++ b/usgpo/download-files.py
@@ -1,16 +1,12 @@
 import argparse
 import datetime
-import html
-import os
-import queue
-import time
 from concurrent.futures import ThreadPoolExecutor, as_completed
 
 import jsonlines
-import requests
-from bs4 import BeautifulSoup
+import trafilatura
 from tqdm.auto import tqdm
 
+from utils import api_query
 from licensed_pile import logs
 from licensed_pile.licenses import PermissiveLicenses
 from licensed_pile.write import to_dolma
@@ -42,41 +38,18 @@ def parse_args():
     return args
 
 
-def api_query(endpoint, headers, params):
-    logger = logs.get_logger("usgpo")
-    response = requests.get(endpoint, headers=headers, params=params)
-    if response.status_code == 429:
-        # Sleep for an hour if we've hit the rate-limit
-        logger.info("Sleeping for one hour to avoid rate-limit")
-        time.sleep(60 * 60)
-        response = requests.get(endpoint, headers=headers, params=params)
-    return response
-
-
 def download_file(api_key, file_url):
     response = api_query(file_url, headers=None, params={"api_key": api_key})
     text = response.text
     return text
 
 
-def parse_html(text):
-    # Most documents are primarily pre-formatted text inside of the a 
 tag
-    # If so, just take the contents of that tag instead of the whole document
-    soup = BeautifulSoup(text, "html.parser")
-    pre_tag = soup.find("pre")
-    if pre_tag:
-        parsed_text = pre_tag.get_text()
-    else:
-        parsed_text = text
-    return html.unescape(parsed_text)
-
-
 def construct_record(api_key, file):
     file_url = file["links"].get("txtLink")
     if file_url is None:
         return None
-    raw_html = download_file(api_key, file_url)
-    parsed_text = parse_html(raw_html)
+    html = download_file(api_key, file_url)
+    text = trafilatura.extract(html)
 
     return {
         "id": file["package_id"],
@@ -85,8 +58,7 @@ def construct_record(api_key, file):
         "author": file["author"],
         "publisher": file["publisher"],
         "category": file["category"],
-        "html": raw_html,
-        "text": parsed_text,
+        "text": text,
         "source": SOURCE_NAME,
         "added": datetime.datetime.utcnow().isoformat(),
         "metadata": {"license": str(PermissiveLicenses.PD), "url": file_url},
diff --git a/usgpo/get-links.py b/usgpo/get-links.py
index 91d8ffa..b6ae62d 100644
--- a/usgpo/get-links.py
+++ b/usgpo/get-links.py
@@ -1,14 +1,12 @@
 import argparse
-import json
 import os
-import queue
 import time
 from concurrent.futures import ThreadPoolExecutor, as_completed
 
 import jsonlines
-import requests
 from tqdm.auto import tqdm
 
+from utils import api_query
 from licensed_pile import logs
 
 
@@ -34,7 +32,6 @@ def parse_args():
             "CRI",
             "CZIC",
             "GAOREPORTS",
-            "GOVPUB",
             "GPO",
             "HJOURNAL",
             "HOB",
@@ -47,37 +44,13 @@ def parse_args():
     return args
 
 
-def api_query(endpoint, headers, params):
+def get_packages(api_key, collections, start_date):
     logger = logs.get_logger("usgpo")
-    response = requests.get(endpoint, headers=headers, params=params)
-    if response.status_code == 429:
-        # Sleep for an hour if we've hit the rate-limit
-        logger.info("Sleeping for one hour to avoid rate-limit")
-        time.sleep(60 * 60)
-        response = requests.get(endpoint, headers=headers, params=params)
-    return response
 
-
-def get_collections(api_key):
-    logger = logs.get_logger("usgpo")
-    response = api_query(
-        "https://api.govinfo.gov/collections",
-        headers={"accept": "application/json"},
-        params={"api_key": args.api_key},
-    )
-    if response.status_code == 200:
-        output = response.json()
-        for record in output["collections"]:
-            yield record["collectionCode"]
-    else:
-        logger.error(f"get_collections received status code {response.status_code}")
-
-
-def get_packages(api_key, collections, start_date, package_queue):
-    logger = logs.get_logger("usgpo")
     url = f"https://api.govinfo.gov/published/{start_date}"
     offset_mark = "*"
-    pbar = tqdm(desc="Producer")
+    packages = []
+    pbar = tqdm()
     while url is not None:
         response = api_query(
             url,
@@ -93,20 +66,19 @@ def get_packages(api_key, collections, start_date, package_queue):
             output = response.json()
 
             for record in output["packages"]:
-                package_queue.put(record)
+                packages.append(record)
                 pbar.update(1)
 
             url = output["nextPage"]
             offset_mark = None
-            # Prevent too many API requests in a short period of time
+            # Sleep since a sudden burst of requests seems to result in erroneous rate-limiting
             time.sleep(5)
         else:
             logger.error(
                 f"get_packages received status code {response.status_code} for query {url}"
             )
             break
-
-    package_queue.put(None)
+    return packages
 
 
 def get_file_links(api_key, package):
@@ -122,63 +94,42 @@ def get_file_links(api_key, package):
     return None
 
 
-def get_package_metadata(api_key, package_queue, metadata_queue):
-    pbar = tqdm(desc="Consumer")
-    while True:
-        package = package_queue.get()
-        if package is None:
-            package_queue.put(None)
-            metadata_queue.put(None)
-            break
-
-        record = {
-            "title": package.get("title"),
-            "package_id": package.get("packageId"),
-            "date": package.get("dateIssued"),
-            "category": package.get("category"),
-            "author": package.get("governmentAuthor1"),
-            "publisher": package.get("publisher"),
-            "links": get_file_links(api_key, package),
-        }
-        metadata_queue.put(record)
-        pbar.update(1)
-
-
-def write_metadata(output_dir, metadata_queue):
-    with jsonlines.open(os.path.join(output_dir, "links.jsonl"), mode="w") as writer:
-        pbar = tqdm(desc="Writer")
-        while True:
-            metadata = metadata_queue.get()
-            if metadata is None:
-                metadata_queue.task_done()
-                break
-
-            writer.write(metadata)
-            pbar.update(1)
+def get_package_metadata(api_key, package):
+    record = {
+        "title": package.get("title"),
+        "package_id": package.get("packageId"),
+        "date": package.get("dateIssued"),
+        "category": package.get("category"),
+        "author": package.get("governmentAuthor1"),
+        "publisher": package.get("publisher"),
+        "links": get_file_links(api_key, package),
+    }
+    return record
 
 
 def main(args):
+    logger = logs.get_logger("usgpo")
     os.makedirs(args.output_dir, exist_ok=True)
-
-    package_queue = queue.Queue()
-    metadata_queue = queue.Queue()
-
-    with ThreadPoolExecutor(max_workers=args.workers + 2) as executor:
-        # One thread for getting each package (i.e. file) from the specified collections
-        executor.submit(
-            get_packages, args.api_key, args.collections, args.start_date, package_queue
-        )
-
-        # `args.workers` threads for getting package metadata
-        for _ in range(args.workers):
-            executor.submit(
-                get_package_metadata, args.api_key, package_queue, metadata_queue
-            )
-
-        # One thread for writing out the package metadata to disk
-        executor.submit(write_metadata, args.output_dir, metadata_queue)
-
-        metadata_queue.join()
+    
+    # Get packages from the specified USGPO collections from `args.start_date` to current day
+    logger.info(f"Getting packages from the following collections: {args.collections}")
+    packages = get_packages(args.api_key, args.collections, args.start_date)
+    
+    logger.info(f"Getting package metadata and writing out to {args.output_dir}")
+    with jsonlines.open(os.path.join(args.output_dir, "links.jsonl"), mode="w", flush=True) as writer:
+        # Spawn multiple worker threads to get the metadata associated with all packages
+        with ThreadPoolExecutor(max_workers=args.workers) as executor:
+            metadata_futures_to_package = {executor.submit(get_package_metadata, args.api_key, package): package for package in packages}
+
+            # Write out package metadata to file
+            for metadata_future in tqdm(as_completed(metadata_futures_to_package)):
+                package = metadata_futures_to_package[metadata_future]
+                try:
+                    record = metadata_future.result()
+                except Exception as e:
+                    logger.error(f"Package {package} raised exception {e}")
+                    continue
+                writer.write(record)
 
 
 if __name__ == "__main__":
diff --git a/usgpo/utils.py b/usgpo/utils.py
new file mode 100644
index 0000000..cfd35d3
--- /dev/null
+++ b/usgpo/utils.py
@@ -0,0 +1,15 @@
+import requests
+import time
+
+from licensed_pile import logs
+
+
+def api_query(endpoint, headers, params):
+    logger = logs.get_logger("usgpo")
+    response = requests.get(endpoint, headers=headers, params=params)
+    if response.status_code == 429:
+        # Sleep for an hour if we've hit the rate-limit
+        logger.info("Exceeded rate-limit, sleeping for one hour")
+        time.sleep(60 * 60)
+        response = requests.get(endpoint, headers=headers, params=params)
+    return response

From 8d46b15272d919b285514ca8181e75005a97d7e1 Mon Sep 17 00:00:00 2001
From: Nikhil Kandpal 
Date: Mon, 24 Jun 2024 22:20:18 -0400
Subject: [PATCH 10/10] - black + isort - wrap download-files construct_record
 in try/catch - if 
 tag is available, use pre-formatted text. else use
 trafilatura.extract to convert to markdown

---
 usgpo/download-files.py | 67 ++++++++++++++++++++++++++++++-----------
 usgpo/get-links.py      | 15 ++++++---
 2 files changed, 59 insertions(+), 23 deletions(-)

diff --git a/usgpo/download-files.py b/usgpo/download-files.py
index 69dad32..11dbd89 100644
--- a/usgpo/download-files.py
+++ b/usgpo/download-files.py
@@ -4,9 +4,10 @@
 
 import jsonlines
 import trafilatura
+from bs4 import BeautifulSoup
 from tqdm.auto import tqdm
-
 from utils import api_query
+
 from licensed_pile import logs
 from licensed_pile.licenses import PermissiveLicenses
 from licensed_pile.write import to_dolma
@@ -44,25 +45,55 @@ def download_file(api_key, file_url):
     return text
 
 
+def parse_html(html):
+    # Most documents are pre-formatted text inside of the a 
 tag
+    # For the rest of the documents, we use trafilatura to extract to markdown
+    soup = BeautifulSoup(html, "html.parser")
+    pre_tag = soup.find("pre")
+    if pre_tag:
+        text = pre_tag.get_text()
+    else:
+        text = trafilatura.extract(html, output_format="markdown")
+    return text
+
+
 def construct_record(api_key, file):
-    file_url = file["links"].get("txtLink")
-    if file_url is None:
+    logger = logs.get_logger("usgpo")
+    try:
+        links = file.get("links")
+        if links is None:
+            return None
+
+        file_url = links.get("txtLink")
+        # Occassionally there will be multiple txtLinks pointing to the same URL. Just take the first.
+        if isinstance(file_url, list):
+            file_url = file_url[0]
+
+        if file_url is None:
+            return None
+
+        html = download_file(api_key, file_url)
+        text = parse_html(html)
+
+        if text is None or len(text) == 0:
+            return None
+
+        return {
+            "id": file["package_id"],
+            "title": file["title"],
+            "date": file["date"],
+            "author": file["author"],
+            "publisher": file["publisher"],
+            "category": file["category"],
+            "text": text,
+            "source": SOURCE_NAME,
+            "added": datetime.datetime.utcnow().isoformat(),
+            "metadata": {"license": str(PermissiveLicenses.PD), "url": file_url},
+        }
+
+    except Exception as e:
+        logger.error(f"Failed to download package {file['package_id']}: {e}")
         return None
-    html = download_file(api_key, file_url)
-    text = trafilatura.extract(html)
-
-    return {
-        "id": file["package_id"],
-        "title": file["title"],
-        "date": file["date"],
-        "author": file["author"],
-        "publisher": file["publisher"],
-        "category": file["category"],
-        "text": text,
-        "source": SOURCE_NAME,
-        "added": datetime.datetime.utcnow().isoformat(),
-        "metadata": {"license": str(PermissiveLicenses.PD), "url": file_url},
-    }
 
 
 def generate_records(args):
diff --git a/usgpo/get-links.py b/usgpo/get-links.py
index b6ae62d..3ed37a6 100644
--- a/usgpo/get-links.py
+++ b/usgpo/get-links.py
@@ -5,8 +5,8 @@
 
 import jsonlines
 from tqdm.auto import tqdm
-
 from utils import api_query
+
 from licensed_pile import logs
 
 
@@ -110,16 +110,21 @@ def get_package_metadata(api_key, package):
 def main(args):
     logger = logs.get_logger("usgpo")
     os.makedirs(args.output_dir, exist_ok=True)
-    
+
     # Get packages from the specified USGPO collections from `args.start_date` to current day
     logger.info(f"Getting packages from the following collections: {args.collections}")
     packages = get_packages(args.api_key, args.collections, args.start_date)
-    
+
     logger.info(f"Getting package metadata and writing out to {args.output_dir}")
-    with jsonlines.open(os.path.join(args.output_dir, "links.jsonl"), mode="w", flush=True) as writer:
+    with jsonlines.open(
+        os.path.join(args.output_dir, "links.jsonl"), mode="w", flush=True
+    ) as writer:
         # Spawn multiple worker threads to get the metadata associated with all packages
         with ThreadPoolExecutor(max_workers=args.workers) as executor:
-            metadata_futures_to_package = {executor.submit(get_package_metadata, args.api_key, package): package for package in packages}
+            metadata_futures_to_package = {
+                executor.submit(get_package_metadata, args.api_key, package): package
+                for package in packages
+            }
 
             # Write out package metadata to file
             for metadata_future in tqdm(as_completed(metadata_futures_to_package)):