Skip to content

Commit 599dedb

Browse files
committed
Analysts see a fanned-out DAG for processing NTD XLSX files
1 parent a6a41c8 commit 599dedb

16 files changed

+129907
-14
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import os
2+
from datetime import datetime
3+
4+
from operators.ntd_xlsx_list_tabs_operator import NTDXLSXListTabsOperator
5+
from operators.ntd_xlsx_to_gcs_operator import NTDXLSXToGCSOperator
6+
from operators.ntd_xlsx_to_jsonl_operator import NTDXLSXToJSONLOperator
7+
8+
from airflow import DAG, XComArg
9+
from airflow.decorators import task
10+
from airflow.operators.latest_only import LatestOnlyOperator
11+
12+
NTD_PRODUCTS = [
13+
{
14+
"type": "annual_database_agency_information",
15+
"short_type": "_2022_agency_information",
16+
"year": 2022,
17+
"url": "https://www.transit.dot.gov/ntd/data-product/2022-annual-database-agency-information",
18+
},
19+
]
20+
21+
with DAG(
22+
dag_id="download_and_parse_ntd_xlsx",
23+
# Every day at midnight
24+
schedule="0 0 * * *",
25+
start_date=datetime(2025, 11, 1),
26+
catchup=False,
27+
tags=["ntd"],
28+
):
29+
latest_only = LatestOnlyOperator(task_id="latest_only", depends_on_past=False)
30+
31+
@task
32+
def create_download_kwargs(ntd_product):
33+
return {
34+
"source_url": ntd_product["url"],
35+
"destination_path": os.path.join(
36+
f"{ntd_product['type']}_raw",
37+
f"{ntd_product['year']}",
38+
"dt={{ dt }}",
39+
"execution_ts={{ ts }}",
40+
f"{ntd_product['year']}__{ntd_product['type']}_raw.xlsx",
41+
),
42+
}
43+
44+
download_kwargs = create_download_kwargs.expand(ntd_product=NTD_PRODUCTS)
45+
46+
download_xlsx = NTDXLSXToGCSOperator(
47+
task_id="download_to_gcs",
48+
destination_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
49+
).expand_kwargs(download_kwargs)
50+
51+
xlsx_tabs = NTDXLSXListTabsOperator(
52+
task_id="ntd_xlsx_list_tabs",
53+
source_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
54+
).expand_kwargs(
55+
download_xlsx.map(lambda dl: {"source_path": dl["destination_path"]})
56+
)
57+
58+
@task
59+
def create_parse_kwargs(xlsx_tab_ntd_products) -> dict:
60+
xlsx_tab, ntd_products = xlsx_tab_ntd_products
61+
tab_name = (
62+
xlsx_tab["tab"]
63+
.lower()
64+
.replace(" ", "_")
65+
.replace("(", "_")
66+
.replace(")", "_")
67+
.replace(":", "_")
68+
.replace("-", "_")
69+
)
70+
result = []
71+
for ntd_product in ntd_products:
72+
result.append(
73+
{
74+
"tab": xlsx_tab["tab"],
75+
"source_path": xlsx_tab["source_path"],
76+
"destination_path": os.path.join(
77+
ntd_product["type"],
78+
ntd_product["year"],
79+
xlsx_tab["tab"],
80+
"dt={{ dt }}",
81+
"execution_ts={{ ts }}",
82+
f"{ntd_product['year']}__{ntd_product['type']}__{tab_name}.jsonl.gz",
83+
),
84+
}
85+
)
86+
return result
87+
88+
parse_kwargs = create_parse_kwargs.expand(
89+
xlsx_tab=XComArg(xlsx_tabs).zip(fillvalue=NTD_PRODUCTS)
90+
)
91+
92+
parse_xlsx = NTDXLSXToJSONLOperator.partial(
93+
task_id="xlsx_to_jsonl",
94+
source_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
95+
destination_bucket=os.environ.get(
96+
"CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"
97+
),
98+
).expand_kwargs(parse_kwargs)
99+
100+
latest_only >> download_kwargs
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from bs4 import BeautifulSoup
2+
3+
from airflow.hooks.base import BaseHook
4+
from airflow.hooks.http_hook import HttpHook
5+
6+
HEADERS = {
7+
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
8+
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
9+
"Accept-Language": "en-US,en;q=0.5",
10+
"Accept-Encoding": "gzip, deflate, br",
11+
"Connection": "keep-alive",
12+
"Upgrade-Insecure-Requests": "1",
13+
"sec-ch-ua": '"Not A;Brand"',
14+
"sec-fetch-dest": "document",
15+
"sec-fetch-mode": "navigate",
16+
"sec-fetch-site": "none",
17+
"sec-fetch-user": "?1",
18+
}
19+
20+
21+
class NTDXLSXHook(BaseHook):
22+
_http_conn_id: str
23+
_method: str
24+
25+
def __init__(self, method: str = "GET", http_conn_id: str = "http_dot"):
26+
super().__init__()
27+
self._http_conn_id: str = http_conn_id
28+
self._method: str = method
29+
30+
def http_hook(self) -> HttpHook:
31+
return HttpHook(method=self._method, http_conn_id=self._http_conn_id)
32+
33+
def run(self, endpoint: str) -> str:
34+
body = self.http_hook().run(endpoint=endpoint, headers=HEADERS).text
35+
soup = BeautifulSoup(body, "html.parser")
36+
link = soup.find(
37+
"a",
38+
type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
39+
)
40+
href = link.get("href")
41+
return self.http_hook().run(endpoint=href, headers=HEADERS)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import io
2+
from typing import Sequence
3+
4+
import openpyxl
5+
6+
from airflow.models import BaseOperator
7+
from airflow.models.taskinstance import Context
8+
from airflow.providers.google.cloud.hooks.gcs import GCSHook
9+
10+
11+
class NTDXLSXListTabsOperator(BaseOperator):
12+
template_fields: Sequence[str] = (
13+
"source_bucket",
14+
"source_path",
15+
"gcp_conn_id",
16+
)
17+
18+
def __init__(
19+
self,
20+
source_bucket: str,
21+
source_path: str,
22+
gcp_conn_id: str = "google_cloud_default",
23+
**kwargs,
24+
) -> None:
25+
super().__init__(**kwargs)
26+
27+
self._gcs_hook = None
28+
self.source_bucket = source_bucket
29+
self.source_path = source_path
30+
self.gcp_conn_id = gcp_conn_id
31+
32+
def gcs_hook(self) -> GCSHook:
33+
return GCSHook(gcp_conn_id=self.gcp_conn_id)
34+
35+
def source_name(self) -> str:
36+
return self.source_bucket.replace("gs://", "")
37+
38+
def source(self) -> bytes:
39+
return self.gcs_hook().download(
40+
bucket_name=self.source_name(),
41+
object_name=self.source_path,
42+
)
43+
44+
def workbook(self) -> bytes:
45+
return openpyxl.load_workbook(filename=io.BytesIO(self.source()))
46+
47+
def execute(self, context: Context) -> str:
48+
workbook = self.workbook()
49+
return [
50+
{"tab": s, "source_path": self.source_path} for s in workbook.sheetnames
51+
]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from typing import Sequence
2+
3+
from hooks.ntd_xlsx_hook import NTDXLSXHook
4+
5+
from airflow.hooks.http_hook import HttpHook
6+
from airflow.models import BaseOperator
7+
from airflow.models.taskinstance import Context
8+
from airflow.providers.google.cloud.hooks.gcs import GCSHook
9+
10+
11+
class NTDXLSXToGCSOperator(BaseOperator):
12+
template_fields: Sequence[str] = (
13+
"source_url",
14+
"destination_bucket",
15+
"destination_path",
16+
"http_conn_id",
17+
"gcp_conn_id",
18+
)
19+
20+
def __init__(
21+
self,
22+
source_url: str,
23+
destination_bucket: str,
24+
destination_path: str,
25+
http_conn_id: str = "http_ntd",
26+
gcp_conn_id: str = "google_cloud_default",
27+
**kwargs,
28+
) -> None:
29+
super().__init__(**kwargs)
30+
31+
self.source_url = source_url
32+
self.destination_bucket = destination_bucket
33+
self.destination_path = destination_path
34+
self.http_conn_id = http_conn_id
35+
self.gcp_conn_id = gcp_conn_id
36+
37+
def destination_name(self) -> str:
38+
return self.destination_bucket.replace("gs://", "")
39+
40+
def gcs_hook(self) -> GCSHook:
41+
return GCSHook(gcp_conn_id=self.gcp_conn_id)
42+
43+
def ntd_xlsx_hook(self) -> NTDXLSXHook:
44+
return NTDXLSXHook(method="GET", http_conn_id=self.http_conn_id)
45+
46+
def http_hook(self) -> NTDXLSXHook:
47+
return HttpHook(method="GET", http_conn_id=self.http_conn_id)
48+
49+
def execute(self, context: Context) -> str:
50+
response = self.ntd_xlsx_hook().run(endpoint=self.source_url)
51+
self.gcs_hook().upload(
52+
data=response.content,
53+
bucket_name=self.destination_name(),
54+
object_name=self.destination_path,
55+
mime_type=response.headers.get("Content-Type"),
56+
gzip=False,
57+
)
58+
return {"destination_path": self.destination_path}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import csv
2+
import io
3+
import json
4+
from typing import Sequence
5+
6+
import openpyxl
7+
8+
from airflow.models import BaseOperator
9+
from airflow.models.taskinstance import Context
10+
from airflow.providers.google.cloud.hooks.gcs import GCSHook
11+
12+
13+
class NTDXLSXToJSONLOperator(BaseOperator):
14+
template_fields: Sequence[str] = (
15+
"tab",
16+
"source_bucket",
17+
"source_path",
18+
"destination_bucket",
19+
"destination_path",
20+
"gcp_conn_id",
21+
)
22+
23+
def __init__(
24+
self,
25+
tab: str,
26+
source_bucket: str,
27+
source_path: str,
28+
destination_bucket: str,
29+
destination_path: str,
30+
gcp_conn_id: str = "google_cloud_default",
31+
**kwargs,
32+
) -> None:
33+
super().__init__(**kwargs)
34+
35+
self._gcs_hook = None
36+
self.tab = tab
37+
self.source_bucket = source_bucket
38+
self.source_path = source_path
39+
self.destination_bucket = destination_bucket
40+
self.destination_path = destination_path
41+
self.gcp_conn_id = gcp_conn_id
42+
43+
def gcs_hook(self) -> GCSHook:
44+
return GCSHook(gcp_conn_id=self.gcp_conn_id)
45+
46+
def destination_name(self) -> str:
47+
return self.destination_bucket.replace("gs://", "")
48+
49+
def source_name(self) -> str:
50+
return self.source_bucket.replace("gs://", "")
51+
52+
def source(self) -> bytes:
53+
return self.gcs_hook().download(
54+
bucket_name=self.source_name(),
55+
object_name=self.source_path,
56+
)
57+
58+
def rows(self) -> bytes:
59+
workbook = openpyxl.load_workbook(filename=io.BytesIO(self.source()))
60+
csv_file = io.StringIO()
61+
writer = csv.writer(csv_file)
62+
for row in workbook[self.tab].rows:
63+
writer.writerow([cell.value for cell in row])
64+
csv_file.seek(0)
65+
return csv.DictReader(csv_file)
66+
67+
def execute(self, context: Context) -> str:
68+
self.gcs_hook().upload(
69+
bucket_name=self.destination_name(),
70+
object_name=self.destination_path,
71+
data="\n".join([json.dumps(r, separators=(",", ":")) for r in self.rows()]),
72+
mime_type="application/jsonl",
73+
gzip=True,
74+
)
75+
return {"destination_path": self.destination_path}

0 commit comments

Comments
 (0)