diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 74a99316cdea3..63d7f48580a97 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -83,7 +83,7 @@ from airflow.models.base import Base, StringID from airflow.models.baseoperator import BaseOperator from airflow.models.dag_version import DagVersion -from airflow.models.dagrun import DagRun +from airflow.models.dagrun import RUN_ID_REGEX, DagRun from airflow.models.taskinstance import ( Context, TaskInstance, @@ -1768,6 +1768,16 @@ def create_dagrun( if not isinstance(run_id, str): raise ValueError(f"`run_id` should be a str, not {type(run_id)}") + # This is also done on the DagRun model class, but SQLAlchemy column + # validator does not work well for some reason. + if not re2.match(RUN_ID_REGEX, run_id): + regex = airflow_conf.get("scheduler", "allowed_run_id_pattern").strip() + if not regex or not re2.match(regex, run_id): + raise ValueError( + f"The run_id provided '{run_id}' does not match regex pattern " + f"'{regex}' or '{RUN_ID_REGEX}'" + ) + # Prevent a manual run from using an ID that looks like a scheduled run. if run_type == DagRunType.MANUAL: if (inferred_run_type := DagRunType.from_run_id(run_id)) != DagRunType.MANUAL: diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index a9532a2e7293d..249886b5dd50f 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -281,7 +281,7 @@ def validate_run_id(self, key: str, run_id: str) -> str | None: if regex and re2.match(regex, run_id): return run_id raise ValueError( - f"The run_id provided '{run_id}' does not match the pattern '{regex}' or '{RUN_ID_REGEX}'" + f"The run_id provided '{run_id}' does not match regex pattern '{regex}' or '{RUN_ID_REGEX}'" ) @property diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index ec003fea7d97b..37092f31a6d7b 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -732,7 +732,7 @@ This is an example test want to verify the structure of a code-generated DAG aga TEST_DAG_ID = "my_custom_operator_dag" TEST_TASK_ID = "my_custom_operator_task" - TEST_RUN_ID = "my_custom_oeprator_dag_run" + TEST_RUN_ID = "my_custom_operator_dag_run" @pytest.fixture()