Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .sqlfluffignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
warehouse/models/mart/ntd_validation/fct_ntd_rr20_service_checks.sql
airflow/composer/**/*.sql
81 changes: 81 additions & 0 deletions airflow/dags/download_and_parse_littlepay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
from datetime import datetime

from operators.littlepay_csv_to_jsonl_operator import LittlepayCSVToJSONLOperator
from operators.littlepay_s3_to_gcs_operator import LittlepayS3ToGCSOperator

from airflow import DAG, XComArg
from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.operators.latest_only import LatestOnlyOperator

LITTLEPAY_TRANSIT_PROVIDER_BUCKETS = {
"atn": "littlepay-datafeed-prod-atn-5c319c40",
}


with DAG(
dag_id="download_and_parse_littlepay",
# Every day at midnight
schedule="0 0 * * *",
start_date=datetime(2025, 11, 1),
catchup=False,
tags=["payments", "littlepay"],
):
latest_only = LatestOnlyOperator(task_id="latest_only", depends_on_past=False)

for provider, bucket in LITTLEPAY_TRANSIT_PROVIDER_BUCKETS.items():
littlepay_files = S3ListOperator(
task_id="littlepay_list",
bucket=bucket,
aws_conn_id=f"aws_{provider}",
)

def sync_littlepay_kwargs(source_path):
filename = os.path.basename(source_path)
filetype = os.path.basename(os.path.dirname(source_path))
return {
"source_path": source_path,
"destination_path": os.path.join(
filetype,
f"instance={provider}",
f"filename={filename}",
"ts={{ ts }}",
filename,
),
"report_path": os.path.join(
"raw_littlepay_sync_job_result" f"instance={provider}",
"ts={{ ts }}",
f"results_{filename}.jsonl",
),
}

sync_littlepay = LittlepayS3ToGCSOperator.partial(
aws_conn_id=f"aws_{provider}",
source_bucket=bucket,
destination_bucket=os.environ.get("CALITP_BUCKET__LITTLEPAY_RAW_V3"),
).expand_kwargs(XComArg(littlepay_files).map(sync_littlepay_kwargs))

def parse_littlepay_kwargs(source_file):
return {
"source_path": os.path.join(
source_file["filetype"],
f"instance={provider}",
f"filename={source_file['filename']}",
"ts={{ ts }}",
source_file["filename"],
),
"destination_path": os.path.join(
source_file["filetype"],
f"instance={provider}",
f"extract_filename={source_file['filename']}",
"ts={{ ts }}",
f"{os.path.splitext(source_file['filename'])[0]}.jsonl.gz",
),
}

parse_littlepay = LittlepayCSVToJSONLOperator(
source_bucket=os.environ.get("CALITP_BUCKET__LITTLEPAY_RAW_V3"),
destination_bucket=os.environ.get("CALITP_BUCKET__LITTLEPAY_PARSED_V3"),
).expand_kwargs(XComArg(sync_littlepay).map(parse_littlepay_kwargs))

sync_littlepay >> parse_littlepay
66 changes: 66 additions & 0 deletions airflow/dags/download_and_parse_ntd_xlsx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
from datetime import datetime

from operators.ntd_xlsx_to_gcs_operator import NTDXLSXToGCSOperator
from operators.ntd_xlsx_to_jsonl_operator import NTDXLSXToJSONLOperator

from airflow import DAG
from airflow.operators.latest_only import LatestOnlyOperator

NTD_PRODUCTS = [
{
"type": "annual_database_agency_information",
"short_type": "_2022_agency_information",
"year": 2022,
"url": "https://www.transit.dot.gov/ntd/data-product/2022-annual-database-agency-information",
},
]

with DAG(
dag_id="download_and_parse_ntd_xlsx",
# Every day at midnight
schedule="0 0 * * *",
start_date=datetime(2025, 11, 1),
catchup=False,
tags=["ntd"],
):
latest_only = LatestOnlyOperator(task_id="latest_only", depends_on_past=False)

for ntd_product in NTD_PRODUCTS:
download_xlsx = NTDXLSXToGCSOperator(
task_id="ntd_xlsx_to_gcs",
source_url=ntd_product["url"],
source_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
source_path=os.path.join(
f"{ntd_product['type']}_raw",
f"{ntd_product['year']}",
"dt={{ dt }}",
"execution_ts={{ ts }}",
f"{ntd_product['year']}__{ntd_product['type']}_raw.xlsx",
),
)

parse_xlsx = NTDXLSXToJSONLOperator(
task_id="ntd_xlsx_to_jsonl",
source_bucket=os.environ.get("CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"),
source_path=os.path.join(
f"{ntd_product['type']}_raw",
f"{ntd_product['year']}",
"dt={{ dt }}",
"execution_ts={{ ts }}",
f"{ntd_product['year']}__{ntd_product['type']}_raw.xlsx",
),
destination_bucket=os.environ.get(
"CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"
),
destination_path=os.path.join(
f"{ntd_product['type']}",
f"{ntd_product['year']}",
f"{ntd_product['short_type']}",
"dt={{ dt }}",
"execution_ts={{ ts }}",
f"{ntd_product['year']}__{ntd_product['type']}__{ntd_product['short_type']}.jsonl.gz",
),
)

latest_only >> download_xlsx >> parse_xlsx
40 changes: 40 additions & 0 deletions airflow/plugins/hooks/ntd_xlsx_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from bs4 import BeautifulSoup

from airflow.hooks.base import BaseHook
from airflow.hooks.http_hook import HttpHook

HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"sec-ch-ua": '"Not A;Brand"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
}


class NTDXLSXHook(BaseHook):
_http_conn_id: str
_method: str

def __init__(self, method: str = "GET", http_conn_id: str = "http_dot"):
super().__init__()
self._http_conn_id: str = http_conn_id
self._method: str = method

def http_hook(self) -> HttpHook:
return HttpHook(method=self._method, http_conn_id=self._http_conn_id)

def run(self, endpoint: str) -> str:
body = self.http_hook().run(endpoint=endpoint, headers=HEADERS).text
soup = BeautifulSoup(body, "html.parser")
link = soup.find(
"a",
type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
return link.get("href")
75 changes: 75 additions & 0 deletions airflow/plugins/operators/littlepay_s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
from typing import Sequence

from airflow.models import BaseOperator
from airflow.models.taskinstance import Context
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class LittlepayS3ToGCSOperator(BaseOperator):
template_fields: Sequence[str] = (
"source_bucket",
"source_path",
"destination_bucket",
"destination_path",
"report_path",
"aws_conn_id",
"gcp_conn_id",
)

def __init__(
self,
source_bucket: str,
source_path: str,
destination_bucket: str,
destination_path: str,
report_path: str,
aws_conn_id: str = "amazon_default",
gcp_conn_id: str = "google_cloud_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.source_bucket: str = source_bucket
self.source_path: str = source_path
self.destination_bucket: str = destination_bucket
self.destination_path: str = destination_path
self.report_path: str = report_path
self.aws_conn_id: str = aws_conn_id
self.gcp_conn_id: str = gcp_conn_id

def gcs_hook(self) -> GCSHook:
return GCSHook(gcp_conn_id=self.gcp_conn_id)

def s3_hook(self) -> S3Hook:
return S3Hook(aws_conn_id=self.aws_conn_id)

def source_name(self) -> str:
return self.source_bucket.replace("gs://", "")

def destination_name(self) -> str:
return self.destination_bucket.replace("gs://", "")

def filename(self) -> str:
return os.path.basename(self.source_path)

def filetype(self) -> str:
return os.path.basename(os.path.dirname(self.source_path))

def execute(self, context: Context) -> list[dict]:
s3_object = self.s3_hook().get_key(
bucket_name=self.source_name(), key=self.source_path
)
body = s3_object.get().get("Body")
self.gcs_hook().upload(
bucket_name=self.destination_name(),
object_name=self.destination_path,
data=body.read(),
mime_type=s3_object.content_type,
)
return {
"filename": self.filename(),
"filetype": self.filetype(),
"destination_path": self.destination_path,
"report_path": self.report_path,
}
59 changes: 59 additions & 0 deletions airflow/plugins/operators/ntd_xlsx_to_gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Sequence

from hooks.ntd_xlsx_hook import HEADERS, NTDXLSXHook

from airflow.hooks.http_hook import HttpHook
from airflow.models import BaseOperator
from airflow.models.taskinstance import Context
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class NTDXLSXToGCSOperator(BaseOperator):
template_fields: Sequence[str] = (
"source_url",
"destination_bucket",
"destination_path",
"http_conn_id",
"gcp_conn_id",
)

def __init__(
self,
source_url: str,
destination_bucket: str,
destination_path: str,
http_conn_id: str = "http_ntd",
gcp_conn_id: str = "google_cloud_default",
**kwargs,
) -> None:
super().__init__(**kwargs)

self.source_url = source_url
self.destination_bucket = destination_bucket
self.destination_path = destination_path
self.http_conn_id = http_conn_id
self.gcp_conn_id = gcp_conn_id

def destination_name(self) -> str:
return self.destination_bucket.replace("gs://", "")

def gcs_hook(self) -> GCSHook:
return GCSHook(gcp_conn_id=self.gcp_conn_id)

def ntd_xlsx_hook(self) -> NTDXLSXHook:
return NTDXLSXHook(method="GET", http_conn_id=self.http_conn_id)

def http_hook(self) -> NTDXLSXHook:
return HttpHook(method="GET", http_conn_id=self.http_conn_id)

def execute(self, context: Context) -> str:
resolved_url = self.ntd_xlsx_hook().run(endpoint=self.source_url)
response = self.http_hook().run(endpoint=resolved_url, headers=HEADERS)
self.gcs_hook().upload(
data=response.content,
bucket_name=self.destination_name(),
object_name=self.destination_path,
mime_type=response.headers.get("Content-Type"),
gzip=False,
)
return {"resolved_url": resolved_url, "destination_path": self.destination_path}
Loading
Loading