Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions airflow/dags/download_and_parse_ntd_xlsx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
from datetime import datetime

from operators.ntd_xlsx_list_tabs_operator import NTDXLSXListTabsOperator
from operators.ntd_xlsx_to_gcs_operator import NTDXLSXToGCSOperator
from operators.ntd_xlsx_to_jsonl_operator import NTDXLSXToJSONLOperator

from airflow import DAG, XComArg
from airflow.decorators import task
from airflow.operators.latest_only import LatestOnlyOperator

NTD_PRODUCTS = [
{
"type": "annual_database_agency_information",
"short_type": "_2022_agency_information",
"year": 2022,
"url": "https://www.transit.dot.gov/ntd/data-product/2022-annual-database-agency-information",
},
]

with DAG(
dag_id="download_and_parse_ntd_xlsx",
# Every day at midnight
schedule="0 0 * * *",
start_date=datetime(2025, 11, 1),
catchup=False,
tags=["ntd"],
):
latest_only = LatestOnlyOperator(task_id="latest_only", depends_on_past=False)

@task
def create_download_kwargs(ntd_product):
return {
"source_url": ntd_product["url"],
"destination_path": os.path.join(
f"{ntd_product['type']}_raw",
f"{ntd_product['year']}",
"dt={{ dt }}",
"execution_ts={{ ts }}",
f"{ntd_product['year']}__{ntd_product['type']}_raw.xlsx",
),
}

download_kwargs = create_download_kwargs.expand(ntd_product=NTD_PRODUCTS)

download_xlsx = NTDXLSXToGCSOperator(
task_id="download_to_gcs",
destination_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
).expand_kwargs(download_kwargs)

xlsx_tabs = NTDXLSXListTabsOperator(
task_id="ntd_xlsx_list_tabs",
source_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
).expand_kwargs(
download_xlsx.map(lambda dl: {"source_path": dl["destination_path"]})
)

@task
def create_parse_kwargs(xlsx_tab_ntd_products) -> dict:
xlsx_tab, ntd_products = xlsx_tab_ntd_products
tab_name = (
xlsx_tab["tab"]
.lower()
.replace(" ", "_")
.replace("(", "_")
.replace(")", "_")
.replace(":", "_")
.replace("-", "_")
)
result = []
for ntd_product in ntd_products:
result.append(
{
"tab": xlsx_tab["tab"],
"source_path": xlsx_tab["source_path"],
"destination_path": os.path.join(
ntd_product["type"],
ntd_product["year"],
xlsx_tab["tab"],
"dt={{ dt }}",
"execution_ts={{ ts }}",
f"{ntd_product['year']}__{ntd_product['type']}__{tab_name}.jsonl.gz",
),
}
)
return result

parse_kwargs = create_parse_kwargs.expand(
xlsx_tab=XComArg(xlsx_tabs).zip(fillvalue=NTD_PRODUCTS)
)

parse_xlsx = NTDXLSXToJSONLOperator.partial(
task_id="xlsx_to_jsonl",
source_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
destination_bucket=os.environ.get(
"CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"
),
).expand_kwargs(parse_kwargs)

latest_only >> download_kwargs
41 changes: 41 additions & 0 deletions airflow/plugins/hooks/ntd_xlsx_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from bs4 import BeautifulSoup

from airflow.hooks.base import BaseHook
from airflow.hooks.http_hook import HttpHook

HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"sec-ch-ua": '"Not A;Brand"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
}


class NTDXLSXHook(BaseHook):
_http_conn_id: str
_method: str

def __init__(self, method: str = "GET", http_conn_id: str = "http_dot"):
super().__init__()
self._http_conn_id: str = http_conn_id
self._method: str = method

def http_hook(self) -> HttpHook:
return HttpHook(method=self._method, http_conn_id=self._http_conn_id)

def run(self, endpoint: str) -> str:
body = self.http_hook().run(endpoint=endpoint, headers=HEADERS).text
soup = BeautifulSoup(body, "html.parser")
link = soup.find(
"a",
type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
href = link.get("href")
return self.http_hook().run(endpoint=href, headers=HEADERS)
51 changes: 51 additions & 0 deletions airflow/plugins/operators/ntd_xlsx_list_tabs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import io
from typing import Sequence

import openpyxl

from airflow.models import BaseOperator
from airflow.models.taskinstance import Context
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class NTDXLSXListTabsOperator(BaseOperator):
template_fields: Sequence[str] = (
"source_bucket",
"source_path",
"gcp_conn_id",
)

def __init__(
self,
source_bucket: str,
source_path: str,
gcp_conn_id: str = "google_cloud_default",
**kwargs,
) -> None:
super().__init__(**kwargs)

self._gcs_hook = None
self.source_bucket = source_bucket
self.source_path = source_path
self.gcp_conn_id = gcp_conn_id

def gcs_hook(self) -> GCSHook:
return GCSHook(gcp_conn_id=self.gcp_conn_id)

def source_name(self) -> str:
return self.source_bucket.replace("gs://", "")

def source(self) -> bytes:
return self.gcs_hook().download(
bucket_name=self.source_name(),
object_name=self.source_path,
)

def workbook(self) -> bytes:
return openpyxl.load_workbook(filename=io.BytesIO(self.source()))

def execute(self, context: Context) -> str:
workbook = self.workbook()
return [
{"tab": s, "source_path": self.source_path} for s in workbook.sheetnames
]
58 changes: 58 additions & 0 deletions airflow/plugins/operators/ntd_xlsx_to_gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Sequence

from hooks.ntd_xlsx_hook import NTDXLSXHook

from airflow.hooks.http_hook import HttpHook
from airflow.models import BaseOperator
from airflow.models.taskinstance import Context
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class NTDXLSXToGCSOperator(BaseOperator):
template_fields: Sequence[str] = (
"source_url",
"destination_bucket",
"destination_path",
"http_conn_id",
"gcp_conn_id",
)

def __init__(
self,
source_url: str,
destination_bucket: str,
destination_path: str,
http_conn_id: str = "http_ntd",
gcp_conn_id: str = "google_cloud_default",
**kwargs,
) -> None:
super().__init__(**kwargs)

self.source_url = source_url
self.destination_bucket = destination_bucket
self.destination_path = destination_path
self.http_conn_id = http_conn_id
self.gcp_conn_id = gcp_conn_id

def destination_name(self) -> str:
return self.destination_bucket.replace("gs://", "")

def gcs_hook(self) -> GCSHook:
return GCSHook(gcp_conn_id=self.gcp_conn_id)

def ntd_xlsx_hook(self) -> NTDXLSXHook:
return NTDXLSXHook(method="GET", http_conn_id=self.http_conn_id)

def http_hook(self) -> NTDXLSXHook:
return HttpHook(method="GET", http_conn_id=self.http_conn_id)

def execute(self, context: Context) -> str:
response = self.ntd_xlsx_hook().run(endpoint=self.source_url)
self.gcs_hook().upload(
data=response.content,
bucket_name=self.destination_name(),
object_name=self.destination_path,
mime_type=response.headers.get("Content-Type"),
gzip=False,
)
return {"destination_path": self.destination_path}
75 changes: 75 additions & 0 deletions airflow/plugins/operators/ntd_xlsx_to_jsonl_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import csv
import io
import json
from typing import Sequence

import openpyxl

from airflow.models import BaseOperator
from airflow.models.taskinstance import Context
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class NTDXLSXToJSONLOperator(BaseOperator):
template_fields: Sequence[str] = (
"tab",
"source_bucket",
"source_path",
"destination_bucket",
"destination_path",
"gcp_conn_id",
)

def __init__(
self,
tab: str,
source_bucket: str,
source_path: str,
destination_bucket: str,
destination_path: str,
gcp_conn_id: str = "google_cloud_default",
**kwargs,
) -> None:
super().__init__(**kwargs)

self._gcs_hook = None
self.tab = tab
self.source_bucket = source_bucket
self.source_path = source_path
self.destination_bucket = destination_bucket
self.destination_path = destination_path
self.gcp_conn_id = gcp_conn_id

def gcs_hook(self) -> GCSHook:
return GCSHook(gcp_conn_id=self.gcp_conn_id)

def destination_name(self) -> str:
return self.destination_bucket.replace("gs://", "")

def source_name(self) -> str:
return self.source_bucket.replace("gs://", "")

def source(self) -> bytes:
return self.gcs_hook().download(
bucket_name=self.source_name(),
object_name=self.source_path,
)

def rows(self) -> bytes:
workbook = openpyxl.load_workbook(filename=io.BytesIO(self.source()))
csv_file = io.StringIO()
writer = csv.writer(csv_file)
for row in workbook[self.tab].rows:
writer.writerow([cell.value for cell in row])
csv_file.seek(0)
return csv.DictReader(csv_file)

def execute(self, context: Context) -> str:
self.gcs_hook().upload(
bucket_name=self.destination_name(),
object_name=self.destination_path,
data="\n".join([json.dumps(r, separators=(",", ":")) for r in self.rows()]),
mime_type="application/jsonl",
gzip=True,
)
return {"destination_path": self.destination_path}
Loading