Skip to content

Conversation

@ohrite
Copy link
Contributor

@ohrite ohrite commented Dec 2, 2025

Description

This PR introduces quality of life fixes and a logical change to how GTFS Schedule processing happens.

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation

How has this been tested?

pytest and on Staging Airflow.

Post-merge follow-ups

  • No action required
  • Actions required (specified below)

Monitor staging pipeline execution

@github-actions
Copy link

github-actions bot commented Dec 2, 2025

Terraform plan in iac/cal-itp-data-infra-staging/airflow/us

Plan: 0 to add, 5 to change, 2 to destroy.
Terraform used the selected providers to generate the following execution
plan. Resource actions are indicated with the following symbols:
!~  update in-place
-   destroy

Terraform will perform the following actions:

  # google_storage_bucket_object.calitp-staging-composer["plugins/gtfs_validator/gtfs-validator-7.1.0-cli.jar"] will be destroyed
  # (because key ["plugins/gtfs_validator/gtfs-validator-7.1.0-cli.jar"] is not in for_each map)
-   resource "google_storage_bucket_object" "calitp-staging-composer" {
-       bucket              = "calitp-staging-composer" -> null
-       content_type        = "application/zip" -> null
-       crc32c              = "1hT7ow==" -> null
-       detect_md5hash      = "omifem08VTXEfwdktvRcLQ==" -> null
-       event_based_hold    = false -> null
-       generation          = 1764982222490193 -> null
-       id                  = "calitp-staging-composer-plugins/gtfs_validator/gtfs-validator-7.1.0-cli.jar" -> null
-       md5hash             = "omifem08VTXEfwdktvRcLQ==" -> null
-       md5hexhash          = "a2689f7a6d3c5535c47f0764b6f45c2d" -> null
-       media_link          = "https://storage.googleapis.com/download/storage/v1/b/calitp-staging-composer/o/plugins%2Fgtfs_validator%2Fgtfs-validator-7.1.0-cli.jar?generation=1764982222490193&alt=media" -> null
-       metadata            = {} -> null
-       name                = "plugins/gtfs_validator/gtfs-validator-7.1.0-cli.jar" -> null
-       output_name         = "plugins/gtfs_validator/gtfs-validator-7.1.0-cli.jar" -> null
-       self_link           = "https://www.googleapis.com/storage/v1/b/calitp-staging-composer/o/plugins%2Fgtfs_validator%2Fgtfs-validator-7.1.0-cli.jar" -> null
-       source              = "../../../../airflow/plugins/gtfs_validator/gtfs-validator-7.1.0-cli.jar" -> null
-       storage_class       = "STANDARD" -> null
-       temporary_hold      = false -> null
#        (6 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer["plugins/hooks/gtfs_validator_hook.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "EwZmVA==" -> (known after apply)
!~      detect_md5hash      = "pAwefvyTF+k+iBxeXN8WmA==" -> "different hash"
!~      generation          = 1765240133054057 -> (known after apply)
        id                  = "calitp-staging-composer-plugins/hooks/gtfs_validator_hook.py"
!~      md5hash             = "pAwefvyTF+k+iBxeXN8WmA==" -> (known after apply)
        name                = "plugins/hooks/gtfs_validator_hook.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer-catalog will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer-catalog" {
!~      content             = (sensitive value)
!~      crc32c              = "90dyPA==" -> (known after apply)
!~      detect_md5hash      = "WG9X8aeYYRuHaIlLed6xkQ==" -> "different hash"
!~      generation          = 1765240133999929 -> (known after apply)
        id                  = "calitp-staging-composer-data/warehouse/target/catalog.json"
!~      md5hash             = "WG9X8aeYYRuHaIlLed6xkQ==" -> (known after apply)
        name                = "data/warehouse/target/catalog.json"
#        (16 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["models/intermediate/gtfs_quality/int_gtfs_quality__schedule_validator_rule_details_unioned.sql"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
!~      crc32c              = "hFPoMg==" -> (known after apply)
!~      detect_md5hash      = "tUCc7PC0+Sx80IeBEYrPfA==" -> "different hash"
!~      generation          = 1765240133079899 -> (known after apply)
        id                  = "calitp-staging-composer-data/warehouse/models/intermediate/gtfs_quality/int_gtfs_quality__schedule_validator_rule_details_unioned.sql"
!~      md5hash             = "tUCc7PC0+Sx80IeBEYrPfA==" -> (known after apply)
        name                = "data/warehouse/models/intermediate/gtfs_quality/int_gtfs_quality__schedule_validator_rule_details_unioned.sql"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["models/mart/gtfs_quality/_mart_gtfs_quality.yml"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
!~      crc32c              = "6mM2hg==" -> (known after apply)
!~      detect_md5hash      = "+wP9c8v4YUh5DO52Pf2+Lw==" -> "different hash"
!~      generation          = 1765240133086081 -> (known after apply)
        id                  = "calitp-staging-composer-data/warehouse/models/mart/gtfs_quality/_mart_gtfs_quality.yml"
!~      md5hash             = "+wP9c8v4YUh5DO52Pf2+Lw==" -> (known after apply)
        name                = "data/warehouse/models/mart/gtfs_quality/_mart_gtfs_quality.yml"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["seeds/gtfs_schedule_validator_rule_details_v7_1_0.csv"] will be destroyed
  # (because key ["seeds/gtfs_schedule_validator_rule_details_v7_1_0.csv"] is not in for_each map)
-   resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
-       bucket              = "calitp-staging-composer" -> null
-       content_type        = "text/plain; charset=utf-8" -> null
-       crc32c              = "HiMSzg==" -> null
-       detect_md5hash      = "REBfI/kjd8XOSStDQgOZEg==" -> null
-       event_based_hold    = false -> null
-       generation          = 1764012980587419 -> null
-       id                  = "calitp-staging-composer-data/warehouse/seeds/gtfs_schedule_validator_rule_details_v7_1_0.csv" -> null
-       md5hash             = "REBfI/kjd8XOSStDQgOZEg==" -> null
-       md5hexhash          = "44405f23f92377c5ce492b4342039912" -> null
-       media_link          = "https://storage.googleapis.com/download/storage/v1/b/calitp-staging-composer/o/data%2Fwarehouse%2Fseeds%2Fgtfs_schedule_validator_rule_details_v7_1_0.csv?generation=1764012980587419&alt=media" -> null
-       metadata            = {} -> null
-       name                = "data/warehouse/seeds/gtfs_schedule_validator_rule_details_v7_1_0.csv" -> null
-       output_name         = "data/warehouse/seeds/gtfs_schedule_validator_rule_details_v7_1_0.csv" -> null
-       self_link           = "https://www.googleapis.com/storage/v1/b/calitp-staging-composer/o/data%2Fwarehouse%2Fseeds%2Fgtfs_schedule_validator_rule_details_v7_1_0.csv" -> null
-       source              = "../../../../warehouse/seeds/gtfs_schedule_validator_rule_details_v7_1_0.csv" -> null
-       storage_class       = "STANDARD" -> null
-       temporary_hold      = false -> null
#        (6 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer-manifest will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer-manifest" {
!~      content             = (sensitive value)
!~      crc32c              = "uvbxzA==" -> (known after apply)
!~      detect_md5hash      = "SO7h8JdugtrtNvqQ/lr9nQ==" -> "different hash"
!~      generation          = 1765240135116821 -> (known after apply)
        id                  = "calitp-staging-composer-data/warehouse/target/manifest.json"
!~      md5hash             = "SO7h8JdugtrtNvqQ/lr9nQ==" -> (known after apply)
        name                = "data/warehouse/target/manifest.json"
#        (16 unchanged attributes hidden)
    }

Plan: 0 to add, 5 to change, 2 to destroy.

📝 Plan generated in Plan Terraform for Warehouse and DAG changes #1156

@github-actions
Copy link

github-actions bot commented Dec 2, 2025

Terraform plan in iac/cal-itp-data-infra/airflow/us

Plan: 0 to add, 10 to change, 0 to destroy.
Terraform used the selected providers to generate the following execution
plan. Resource actions are indicated with the following symbols:
!~  update in-place

Terraform will perform the following actions:

  # google_storage_bucket_object.calitp-composer["dags/download_parse_and_validate_gtfs.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "SgfazQ==" -> (known after apply)
!~      detect_md5hash      = "E5DwBoeO3WOBKVartOgJMw==" -> "different hash"
!~      generation          = 1764122315051550 -> (known after apply)
        id                  = "calitp-composer-dags/download_parse_and_validate_gtfs.py"
!~      md5hash             = "E5DwBoeO3WOBKVartOgJMw==" -> (known after apply)
        name                = "dags/download_parse_and_validate_gtfs.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/hooks/gtfs_unzip_hook.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "sKU1wQ==" -> (known after apply)
!~      detect_md5hash      = "3M2ryxBP2YC4EIrLXJqo3A==" -> "different hash"
!~      generation          = 1763693339191842 -> (known after apply)
        id                  = "calitp-composer-plugins/hooks/gtfs_unzip_hook.py"
!~      md5hash             = "3M2ryxBP2YC4EIrLXJqo3A==" -> (known after apply)
        name                = "plugins/hooks/gtfs_unzip_hook.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/hooks/gtfs_validator_hook.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "0hDaaA==" -> (known after apply)
!~      detect_md5hash      = "XbhzfsiRlTt9goCn2BPzKg==" -> "different hash"
!~      generation          = 1764122315045275 -> (known after apply)
        id                  = "calitp-composer-plugins/hooks/gtfs_validator_hook.py"
!~      md5hash             = "XbhzfsiRlTt9goCn2BPzKg==" -> (known after apply)
        name                = "plugins/hooks/gtfs_validator_hook.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/bigquery_to_download_config_operator.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "q2W/ig==" -> (known after apply)
!~      detect_md5hash      = "pebUFZJxFAXmLZDbts7AvA==" -> "different hash"
!~      generation          = 1764122315043472 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/bigquery_to_download_config_operator.py"
!~      md5hash             = "pebUFZJxFAXmLZDbts7AvA==" -> (known after apply)
        name                = "plugins/operators/bigquery_to_download_config_operator.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/download_config_to_gcs_operator.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "1gloGQ==" -> (known after apply)
!~      detect_md5hash      = "p64KOP3aqa5HEA151RMyYQ==" -> "different hash"
!~      generation          = 1764122315043509 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/download_config_to_gcs_operator.py"
!~      md5hash             = "p64KOP3aqa5HEA151RMyYQ==" -> (known after apply)
        name                = "plugins/operators/download_config_to_gcs_operator.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/gtfs_csv_to_jsonl_operator.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "gUkeBw==" -> (known after apply)
!~      detect_md5hash      = "29ltAjk4ZC7nudhaIAsmFA==" -> "different hash"
!~      generation          = 1764122315043811 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/gtfs_csv_to_jsonl_operator.py"
!~      md5hash             = "29ltAjk4ZC7nudhaIAsmFA==" -> (known after apply)
        name                = "plugins/operators/gtfs_csv_to_jsonl_operator.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/unzip_gtfs_to_gcs_operator.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "u30a+g==" -> (known after apply)
!~      detect_md5hash      = "VRdUPDJwUQDirad658UFYw==" -> "different hash"
!~      generation          = 1764122315040498 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/unzip_gtfs_to_gcs_operator.py"
!~      md5hash             = "VRdUPDJwUQDirad658UFYw==" -> (known after apply)
        name                = "plugins/operators/unzip_gtfs_to_gcs_operator.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/validate_gtfs_to_gcs_operator.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "/vB9ww==" -> (known after apply)
!~      detect_md5hash      = "xJoMFkOgFmaH9vQDF3rcAQ==" -> "different hash"
!~      generation          = 1764122315039512 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/validate_gtfs_to_gcs_operator.py"
!~      md5hash             = "xJoMFkOgFmaH9vQDF3rcAQ==" -> (known after apply)
        name                = "plugins/operators/validate_gtfs_to_gcs_operator.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer-catalog will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer-catalog" {
!~      content             = (sensitive value)
!~      crc32c              = "UxnpEg==" -> (known after apply)
!~      detect_md5hash      = "nNS2NQxEv9QuNz1Evihv4Q==" -> "different hash"
!~      generation          = 1765231988264679 -> (known after apply)
        id                  = "calitp-composer-data/warehouse/target/catalog.json"
!~      md5hash             = "nNS2NQxEv9QuNz1Evihv4Q==" -> (known after apply)
        name                = "data/warehouse/target/catalog.json"
#        (16 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer-manifest will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer-manifest" {
!~      content             = (sensitive value)
!~      crc32c              = "b6cARg==" -> (known after apply)
!~      detect_md5hash      = "RsVFtCRmW9aQ986K+1DUNg==" -> "different hash"
!~      generation          = 1765231989850957 -> (known after apply)
        id                  = "calitp-composer-data/warehouse/target/manifest.json"
!~      md5hash             = "RsVFtCRmW9aQ986K+1DUNg==" -> (known after apply)
        name                = "data/warehouse/target/manifest.json"
#        (16 unchanged attributes hidden)
    }

Plan: 0 to add, 10 to change, 0 to destroy.

📝 Plan generated in Plan Terraform for Warehouse and DAG changes #1156

@erikamov erikamov force-pushed the mov/download_gtfs_changes branch 2 times, most recently from 46a4171 to 07f465c Compare December 3, 2025 02:18
@github-actions
Copy link

github-actions bot commented Dec 3, 2025

Terraform plan in iac/cal-itp-data-infra-staging/composer/us

No changes. Your infrastructure matches the configuration.
No changes. Your infrastructure matches the configuration.

Terraform has compared your real infrastructure against your configuration
and found no differences, so no changes are needed.

📝 Plan generated in Plan Terraform for Warehouse and DAG changes #1156

Copy link
Contributor

@lauriemerrell lauriemerrell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all looks great in Airflow & seems much clearer! I do have a couple questions about downstream consequences / workflow quality of life -- maybe these should just become follow-up tickets but wanted to flag for discussion at this stage.

  1. I am curious about the relationship between this and #4467 -- if we land on needing a way to allow for manual downloads to appear in the pipeline, under the current (pre-this-PR) behavior, it's not a big deal to manually run a download job from e.g. JupyterHub because the parse & validate jobs will find the data based on its presence in a partition.

Under this new approach, can these tasks process data that lands in our buckets from a process outside the Airflow scheduled download DAG task? (Might be fine if the answer is no and that would need special handling but just curious what the approach would be.)

  1. At what point would we delete/deprecate the current versions of these DAG tasks & associated job code?

  2. Noticed that the download config to GCS step gives nice human-readable names for the generated tasks but validate GTFS to GCS and list downloaded files just list index numbers -- is there a way to give them a human-readable name too?

image image
  1. For me personally when I try to run it locally it seems to hang on the steps after download config to GCS -- the next two jobs just sat at Scheduled but that might be something to do with my local instance


ValidateGTFSToGCSOperator.partial(
task_id="validate_gtfs_to_gcs",
destination_bucket=os.getenv("CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION_HOURLY"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question -- maybe for a follow-up: Is it worth revisiting these variable names if they no longer correspond to the DAG task names? (_HOURLY suffix likely to be confusing for future users?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, on the new branch I added labels to all map indexes ;) Loved it too!!
Still testing so more to come.

For sure when we delete the hourly DAGs we will rename those buckets, but for now it is the same to make sure we are using the same data and files.

@erikamov erikamov force-pushed the mov/download_gtfs_changes branch from bb120de to 28b8ae2 Compare December 4, 2025 02:16
@ohrite ohrite force-pushed the mov/download_gtfs_changes branch from 6f00aca to 9261bd2 Compare December 5, 2025 04:46
@erikamov erikamov force-pushed the mov/download_gtfs_changes branch from 63c716a to cd76bd2 Compare December 6, 2025 00:37
@github-actions
Copy link

github-actions bot commented Dec 6, 2025

Terraform plan in iac/cal-itp-data-infra/composer/us

No changes. Your infrastructure matches the configuration.
No changes. Your infrastructure matches the configuration.

Terraform has compared your real infrastructure against your configuration
and found no differences, so no changes are needed.

📝 Plan generated in Plan Terraform for Warehouse and DAG changes #1156

@erikamov erikamov force-pushed the mov/download_gtfs_changes branch from 31ba8ff to 3d6d279 Compare December 6, 2025 01:19
Copy link
Contributor

@erikamov erikamov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excited to see it running! We paired and looks great!

@ohrite ohrite force-pushed the mov/download_gtfs_changes branch 3 times, most recently from 655d618 to d1edbed Compare December 8, 2025 20:17
@erikamov erikamov force-pushed the mov/download_gtfs_changes branch from 27e212c to 78264a3 Compare December 9, 2025 00:02
@github-actions
Copy link

github-actions bot commented Dec 9, 2025

Warehouse report 📦

DAG

Legend (in order of precedence)

Resource type Indicator Resolution
Large table-materialized model Orange Make the model incremental
Large model without partitioning or clustering Orange Add partitioning and/or clustering
View with more than one child Yellow Materialize as a table or incremental
Incremental Light green
Table Green
View White

@erikamov
Copy link
Contributor

erikamov commented Dec 9, 2025

These are the final results from our tests on Staging.

  • 272 sources to download schedules. It is easier to visualize a failing source (CSUMB also fails on Prod):
image
  • 264 successful downloads where validated, unzipped and converted (parsed).
image

@erikamov erikamov force-pushed the mov/download_gtfs_changes branch from 230bf6a to c23b876 Compare December 9, 2025 18:54
@erikamov erikamov merged commit fd5099c into main Dec 9, 2025
8 checks passed
@erikamov erikamov deleted the mov/download_gtfs_changes branch December 9, 2025 19:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants