diff --git a/airflow-core/src/airflow/cli/commands/asset_command.py b/airflow-core/src/airflow/cli/commands/asset_command.py index 15430b76488d6..1bf5fd119604c 100644 --- a/airflow-core/src/airflow/cli/commands/asset_command.py +++ b/airflow-core/src/airflow/cli/commands/asset_command.py @@ -129,8 +129,8 @@ def asset_materialize(args, *, session: Session = NEW_SESSION) -> None: """ Materialize the specified asset. - This is done by finding the DAG with the asset defined as outlet, and create - a run for that DAG. + This is done by finding the Dag with the asset defined as outlet, and create + a run for that Dag. """ if not args.name and not args.uri: raise SystemExit("Either --name or --uri is required") @@ -149,7 +149,7 @@ def asset_materialize(args, *, session: Session = NEW_SESSION) -> None: if (dag_id := next(dag_id_it, None)) is None: raise SystemExit(f"Asset with {select_message} does not exist.") if next(dag_id_it, None) is not None: - raise SystemExit(f"More than one DAG materializes asset with {select_message}.") + raise SystemExit(f"More than one Dag materializes asset with {select_message}.") try: user = getuser() diff --git a/airflow-core/src/airflow/cli/commands/config_command.py b/airflow-core/src/airflow/cli/commands/config_command.py index 211372e0c606f..00f57c5df2090 100644 --- a/airflow-core/src/airflow/cli/commands/config_command.py +++ b/airflow-core/src/airflow/cli/commands/config_command.py @@ -663,9 +663,9 @@ def message(self) -> str | None: was_removed=False, new_default="False", suggestion="In Airflow 3.0 the default value for `catchup_by_default` is set to `False`. " - "This means that DAGs without explicit definition of the `catchup` parameter will not " + "This means that Dags without explicit definition of the `catchup` parameter will not " "catchup by default. " - "If your DAGs rely on catchup behavior, not explicitly defined in the DAG definition, " + "If your Dags rely on catchup behavior, not explicitly defined in the Dag definition, " "set this configuration parameter to `True` in the `scheduler` section of your `airflow.cfg` " "to enable the behavior from Airflow 2.x.", ), diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py b/airflow-core/src/airflow/cli/commands/dag_command.py index d0162a49fc572..855fb95cf2493 100644 --- a/airflow-core/src/airflow/cli/commands/dag_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_command.py @@ -104,7 +104,7 @@ def dag_delete(args) -> None: api_client = get_current_api_client() if ( args.yes - or input("This will drop all existing records related to the specified DAG. Proceed? (y/n)").upper() + or input("This will drop all existing records related to the specified Dag. Proceed? (y/n)").upper() == "Y" ): try: @@ -119,21 +119,21 @@ def dag_delete(args) -> None: @cli_utils.action_cli @providers_configuration_loaded def dag_pause(args) -> None: - """Pauses a DAG.""" + """Pauses a Dag.""" set_is_paused(True, args) @cli_utils.action_cli @providers_configuration_loaded def dag_unpause(args) -> None: - """Unpauses a DAG.""" + """Unpauses a Dag.""" set_is_paused(False, args) @providers_configuration_loaded @provide_session def set_is_paused(is_paused: bool, args, *, session: Session = NEW_SESSION) -> None: - """Set is_paused for DAG by a given dag_id.""" + """Set is_paused for Dag by a given dag_id.""" query = select(DagModel) if args.treat_dag_id_as_regex: query = query.where(DagModel.dag_id.regexp_match(args.dag_id)) @@ -144,13 +144,13 @@ def set_is_paused(is_paused: bool, args, *, session: Session = NEW_SESSION) -> N matched_dags = list(session.scalars(query).all()) if not matched_dags: - print(f"No {'un' if is_paused else ''}paused DAGs were found") + print(f"No {'un' if is_paused else ''}paused Dags were found") return if not args.yes and args.treat_dag_id_as_regex: dags_ids = [dag.dag_id for dag in matched_dags] question = ( - f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} DAGs:\n" + f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} Dags:\n" f"{','.join(dags_ids)}" f"\n\nAre you sure? [y/n]" ) @@ -172,7 +172,7 @@ def _update_is_paused(dag_model: DagModel) -> bool: @providers_configuration_loaded def dag_dependencies_show(args) -> None: - """Display DAG dependencies, save to file or show as imgcat image.""" + """Display Dag dependencies, save to file or show as imgcat image.""" deduplicated_dag_dependencies = { dag_id: list(set(dag_dependencies)) for dag_id, dag_dependencies in SerializedDagModel.get_dag_dependencies().items() @@ -196,7 +196,7 @@ def dag_dependencies_show(args) -> None: @providers_configuration_loaded def dag_show(args) -> None: - """Display DAG or saves its graphic representation to the file.""" + """Display Dag or saves its graphic representation to the file.""" from airflow.models.serialized_dag import SerializedDagModel if not (dag := SerializedDagModel.get_dag(dag_id=args.dag_id)): @@ -297,7 +297,7 @@ def dag_state(args, session: Session = NEW_SESSION) -> None: dag = DagModel.get_dagmodel(args.dag_id, session=session) if not dag: - raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table") dr, _ = fetch_dag_run_from_run_id_or_logical_date_string( dag_id=dag.dag_id, value=args.logical_date_or_run_id, @@ -343,10 +343,10 @@ def dag_next_execution(args) -> None: ).one_or_none() if not dag or not last_parsed_dag: - raise SystemExit(f"DAG: {args.dag_id} does not exist in the database") + raise SystemExit(f"Dag: {args.dag_id} does not exist in the database") if last_parsed_dag.is_paused: - print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr) + print("[INFO] Please be reminded this Dag is PAUSED now.", file=sys.stderr) def iter_next_dagrun_info() -> Iterator[DagRunInfo | None]: yield (dagrun_info := dag.timetable.next_run_info_from_dag_model(dag_model=last_parsed_dag)) @@ -381,7 +381,7 @@ def iter_next_dagrun_info() -> Iterator[DagRunInfo | None]: if info is None: print( "[WARN] No following schedule can be found. " - "This DAG may have schedule interval '@once' or `None`.", + "This Dag may have schedule interval '@once' or `None`.", file=sys.stderr, ) print(None) @@ -460,7 +460,7 @@ def get_dag_detail(dag: DAG) -> dict: return {col: dag_detail[col] for col in cols if col in DAG_DETAIL_FIELDS} def filter_dags_by_bundle(dags: Iterable[DAG], bundle_names: list[str] | None) -> Iterable[DAG]: - """Filter DAGs based on the specified bundle name, if provided.""" + """Filter Dags based on the specified bundle name, if provided.""" if not bundle_names: return dags @@ -485,10 +485,10 @@ def filter_dags_by_bundle(dags: Iterable[DAG], bundle_names: list[str] | None) - @providers_configuration_loaded @provide_session def dag_details(args, session: Session = NEW_SESSION): - """Get DAG details given a DAG id.""" + """Get Dag details given a Dag id.""" dag = DagModel.get_dagmodel(args.dag_id, session=session) if not dag: - raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table") dag_detail = DAGResponse.from_orm(dag).model_dump() if args.output in ["table", "plain"]: @@ -604,7 +604,7 @@ def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION) dag = DagModel.get_dagmodel(args.dag_id, session=session) if not dag: - raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table") queries.append(Job.dag_id == args.dag_id) if args.state: @@ -627,14 +627,14 @@ def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION) @providers_configuration_loaded @provide_session def dag_list_dag_runs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None: - """List dag runs for a given DAG.""" + """List dag runs for a given Dag.""" if dag: args.dag_id = dag.dag_id else: dag = DagModel.get_dagmodel(args.dag_id, session=session) if not dag: - raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table") state = args.state.lower() if args.state else None dag_runs = DagRun.find( @@ -665,7 +665,7 @@ def _render_dagrun(dr: DagRun) -> dict[str, str]: @providers_configuration_loaded @provide_session def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None: - """Execute one single DagRun for a given DAG and logical date.""" + """Execute one single DagRun for a given Dag and logical date.""" run_conf = None if args.conf: try: @@ -723,7 +723,7 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No @providers_configuration_loaded @provide_session def dag_reserialize(args, session: Session = NEW_SESSION) -> None: - """Serialize a DAG instance.""" + """Serialize a Dag instance.""" manager = DagBundlesManager() manager.sync_bundles_to_db(session=session) session.commit() diff --git a/airflow-core/src/airflow/cli/commands/task_command.py b/airflow-core/src/airflow/cli/commands/task_command.py index df9c44910fa20..78917761cdda9 100644 --- a/airflow-core/src/airflow/cli/commands/task_command.py +++ b/airflow-core/src/airflow/cli/commands/task_command.py @@ -73,9 +73,9 @@ def _generate_temporary_run_id() -> str: """ - Generate a ``run_id`` for a DAG run that will be created temporarily. + Generate a ``run_id`` for a Dag run that will be created temporarily. - This is used mostly by ``airflow task test`` to create a DAG run that will + This is used mostly by ``airflow task test`` to create a Dag run that will be deleted after the task is run. """ return f"__airflow_temporary_run_{timezone.utcnow().isoformat()}__" @@ -89,16 +89,16 @@ def _get_dag_run( session: Session | None = None, ) -> tuple[DagRun, bool]: """ - Try to retrieve a DAG run from a string representing either a run ID or logical date. + Try to retrieve a Dag run from a string representing either a run ID or logical date. - This checks DAG runs like this: + This checks Dag runs like this: - 1. If the input ``logical_date_or_run_id`` matches a DAG run ID, return the run. + 1. If the input ``logical_date_or_run_id`` matches a Dag run ID, return the run. 2. Try to parse the input as a date. If that works, and the resulting - date matches a DAG run's logical date, return the run. + date matches a Dag run's logical date, return the run. 3. If ``create_if_necessary`` is *False* and the input works for neither of the above, raise ``DagRunNotFound``. - 4. Try to create a new DAG run. If the input looks like a date, use it as + 4. Try to create a new Dag run. If the input looks like a date, use it as the logical date; otherwise use it as a run ID and set the logical date to the current time. """ @@ -173,7 +173,7 @@ def _get_ti( ): dag = task.dag if dag is None: - raise ValueError("Cannot get task instance for a task not assigned to a DAG") + raise ValueError("Cannot get task instance for a task not assigned to a Dag") # this check is imperfect because diff dags could have tasks with same name # but in a task, dag_id is a property that accesses its dag, and we don't @@ -311,7 +311,7 @@ def task_state(args) -> None: @suppress_logs_and_warning @providers_configuration_loaded def task_list(args, dag: DAG | None = None) -> None: - """List the tasks within a DAG at the command line.""" + """List the tasks within a Dag at the command line.""" dag = dag or get_bagged_dag(args.bundle_name, args.dag_id) tasks = sorted(t.task_id for t in dag.tasks) print("\n".join(tasks)) @@ -495,13 +495,13 @@ def task_render(args, dag: DAG | None = None) -> None: @cli_utils.action_cli(check_db=False) @providers_configuration_loaded def task_clear(args) -> None: - """Clear all task instances or only those matched by regex for a DAG(s).""" + """Clear all task instances or only those matched by regex for a Dag(s).""" logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT) if args.dag_id and not args.bundle_name and not args.dag_regex and not args.task_regex: dags = [get_db_dag(bundle_names=args.bundle_name, dag_id=args.dag_id)] else: # todo clear command only accepts a single dag_id. no reason for get_dags with 's' except regex? - # Reading from_db because clear method still not implemented in Task SDK DAG + # Reading from_db because clear method still not implemented in Task SDK Dag dags = get_dags(args.bundle_name, args.dag_id, use_regex=args.dag_regex, from_db=True) if args.task_regex: diff --git a/airflow-core/src/airflow/cli/commands/team_command.py b/airflow-core/src/airflow/cli/commands/team_command.py index 13702d516c45d..263910731810d 100644 --- a/airflow-core/src/airflow/cli/commands/team_command.py +++ b/airflow-core/src/airflow/cli/commands/team_command.py @@ -95,14 +95,14 @@ def team_delete(args, session=NEW_SESSION): # Check for associations associations = [] - # Check DAG bundle associations + # Check Dag bundle associations dag_bundle_count = session.scalar( select(func.count()) .select_from(dag_bundle_team_association_table) .where(dag_bundle_team_association_table.c.team_name == team.name) ) if dag_bundle_count: - associations.append(f"{dag_bundle_count} DAG bundle(s)") + associations.append(f"{dag_bundle_count} Dag bundle(s)") # Check connection associations if connection_count := session.scalar( diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 0ec4bb931aa53..372873ab0e778 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -67,7 +67,7 @@ else: DEFAULT_DATE_REPR = DEFAULT_DATE.isoformat() -# TODO: Check if tests needs side effects - locally there's missing DAG +# TODO: Check if tests needs side effects - locally there's missing Dag pytestmark = pytest.mark.db_test @@ -290,7 +290,7 @@ def test_cli_get_dag_details(self, stdout_capture): dag_command.dag_details(args) out = temp_stdout.getvalue() - # Check if DAG Details field are present + # Check if Dag Details field are present for field in dag_command.DAG_DETAIL_FIELDS: assert field in out @@ -530,21 +530,21 @@ def test_pause_non_existing_dag_do_not_error(self, stdout_capture): with stdout_capture as temp_stdout: dag_command.dag_pause(args) out = temp_stdout.splitlines()[-1] - assert out == "No unpaused DAGs were found" + assert out == "No unpaused Dags were found" def test_unpause_non_existing_dag_do_not_error(self, stdout_capture): args = self.parser.parse_args(["dags", "unpause", "non_existing_dag"]) with stdout_capture as temp_stdout: dag_command.dag_unpause(args) out = temp_stdout.splitlines()[-1] - assert out == "No paused DAGs were found" + assert out == "No paused Dags were found" def test_unpause_already_unpaused_dag_do_not_error(self, stdout_capture): args = self.parser.parse_args(["dags", "unpause", "example_bash_operator", "--yes"]) with stdout_capture as temp_stdout: dag_command.dag_unpause(args) out = temp_stdout.splitlines()[-1] - assert out == "No paused DAGs were found" + assert out == "No paused Dags were found" def test_pausing_already_paused_dag_do_not_error(self, stdout_capture): args = self.parser.parse_args(["dags", "pause", "example_bash_operator", "--yes"]) @@ -552,7 +552,7 @@ def test_pausing_already_paused_dag_do_not_error(self, stdout_capture): dag_command.dag_pause(args) dag_command.dag_pause(args) out = temp_stdout.splitlines()[-1] - assert out == "No unpaused DAGs were found" + assert out == "No unpaused Dags were found" def test_trigger_dag(self): dag_command.dag_trigger( @@ -657,7 +657,7 @@ def test_delete_dag(self): ) def test_dag_delete_when_backfill_and_dagrun_exist(self): - # Test to check that the DAG should be deleted even if + # Test to check that the Dag should be deleted even if # there are backfill records associated with it. from airflow.models.backfill import Backfill @@ -687,7 +687,7 @@ def test_dag_delete_when_backfill_and_dagrun_exist(self): ) def test_delete_dag_existing_file(self, tmp_path): - # Test to check that the DAG should be deleted even if + # Test to check that the Dag should be deleted even if # the file containing it is not deleted path = tmp_path / "testfile" DM = DagModel @@ -818,7 +818,7 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_render_dag, stdout_capture): @mock.patch("airflow.dag_processing.dagbag.BundleDagBag") def test_dag_test_with_bundle_name(self, mock_dagbag, configure_dag_bundles): - """Test that DAG can be tested using bundle name.""" + """Test that Dag can be tested using bundle name.""" mock_dagbag.return_value.get_dag.return_value.test.return_value = DagRun( dag_id="test_example_bash_operator", logical_date=DEFAULT_DATE, state=DagRunState.SUCCESS ) @@ -845,7 +845,7 @@ def test_dag_test_with_bundle_name(self, mock_dagbag, configure_dag_bundles): @mock.patch("airflow.dag_processing.dagbag.BundleDagBag") def test_dag_test_with_dagfile_path(self, mock_dagbag, configure_dag_bundles): - """Test that DAG can be tested using dagfile path.""" + """Test that Dag can be tested using dagfile path.""" mock_dagbag.return_value.get_dag.return_value.test.return_value = DagRun( dag_id="test_example_bash_operator", logical_date=DEFAULT_DATE, state=DagRunState.SUCCESS ) @@ -866,7 +866,7 @@ def test_dag_test_with_dagfile_path(self, mock_dagbag, configure_dag_bundles): @mock.patch("airflow.dag_processing.dagbag.BundleDagBag") def test_dag_test_with_both_bundle_and_dagfile_path(self, mock_dagbag, configure_dag_bundles): - """Test that DAG can be tested using both bundle name and dagfile path.""" + """Test that Dag can be tested using both bundle name and dagfile path.""" mock_dagbag.return_value.get_dag.return_value.test.return_value = DagRun( dag_id="test_example_bash_operator", logical_date=DEFAULT_DATE, state=DagRunState.SUCCESS ) @@ -927,7 +927,7 @@ def test_dag_with_parsing_context( ) dag_command.dag_test(cli_args) - # if dag_parsing_context is not set, this DAG will only have 1 task + # if dag_parsing_context is not set, this Dag will only have 1 task assert len(mock_get_or_create_dagrun.call_args[1]["dag"].task_ids) == 2 def test_dag_test_run_inline_trigger(self, dag_maker): @@ -1007,14 +1007,14 @@ def test_dag_test_with_mark_success(self, mock__execute_task): @conf_vars({("core", "load_examples"): "false"}) def test_get_dag_excludes_examples_with_bundle(self, configure_testing_dag_bundle): - """Test that example DAGs are excluded when bundle names are passed.""" + """Test that example Dags are excluded when bundle names are passed.""" try: from airflow.utils.cli import get_bagged_dag except ImportError: # Prior to Airflow 3.1.0. from airflow.utils.cli import get_dag as get_bagged_dag # type: ignore with configure_testing_dag_bundle(TEST_DAGS_FOLDER / "test_sensor.py"): - # example DAG should not be found since include_examples=False + # example Dag should not be found since include_examples=False with pytest.raises(AirflowException, match="could not be found"): get_bagged_dag(bundle_names=["testing"], dag_id="example_simplest_dag") diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py b/airflow-core/tests/unit/cli/commands/test_task_command.py index b66384d5ad426..35f17e6332229 100644 --- a/airflow-core/tests/unit/cli/commands/test_task_command.py +++ b/airflow-core/tests/unit/cli/commands/test_task_command.py @@ -143,7 +143,7 @@ def test_test_no_logical_date(self): def test_task_render_with_custom_timetable(self, mock_fetch_dag_run_from_run_id_or_logical_date_string): """ Test that the `tasks render` CLI command queries the database correctly - for a DAG with a custom timetable. Verifies that a query is executed to + for a Dag with a custom timetable. Verifies that a query is executed to fetch the appropriate DagRun and that the database interaction occurs as expected. """ mock_fetch_dag_run_from_run_id_or_logical_date_string.return_value = (None, None) diff --git a/airflow-core/tests/unit/cli/commands/test_team_command.py b/airflow-core/tests/unit/cli/commands/test_team_command.py index 55892489f8b77..5f4441cd6f0a3 100644 --- a/airflow-core/tests/unit/cli/commands/test_team_command.py +++ b/airflow-core/tests/unit/cli/commands/test_team_command.py @@ -166,17 +166,17 @@ def test_team_delete_empty_name(self): team_command.team_delete(self.parser.parse_args(["teams", "delete", "", "--yes"])) def test_team_delete_with_dag_bundle_association(self): - """Test deleting team that has DAG bundle associations.""" + """Test deleting team that has Dag bundle associations.""" # Create team team_command.team_create(self.parser.parse_args(["teams", "create", "bundle-team"])) team = self.session.scalar(select(Team).where(Team.name == "bundle-team")) - # Create a DAG bundle first + # Create a Dag bundle first dag_bundle = DagBundleModel(name="test-bundle") self.session.add(dag_bundle) self.session.commit() - # Create a DAG bundle association + # Create a Dag bundle association self.session.execute( dag_bundle_team_association_table.insert().values( dag_bundle_name="test-bundle", team_name=team.name @@ -187,7 +187,7 @@ def test_team_delete_with_dag_bundle_association(self): # Try to delete team with pytest.raises( SystemExit, - match="Cannot delete team 'bundle-team' because it is associated with: 1 DAG bundle\\(s\\)", + match="Cannot delete team 'bundle-team' because it is associated with: 1 Dag bundle\\(s\\)", ): team_command.team_delete(self.parser.parse_args(["teams", "delete", "bundle-team", "--yes"])) @@ -251,7 +251,7 @@ def test_team_delete_with_multiple_associations(self): team_command.team_create(self.parser.parse_args(["teams", "create", "multi-team"])) team = self.session.scalar(select(Team).where(Team.name == "multi-team")) - # Create a DAG bundle first + # Create a Dag bundle first dag_bundle = DagBundleModel(name="multi-bundle") self.session.add(dag_bundle) self.session.commit() @@ -277,7 +277,7 @@ def test_team_delete_with_multiple_associations(self): error_msg = str(exc_info.value) assert "Cannot delete team 'multi-team' because it is associated with:" in error_msg - assert "1 DAG bundle(s)" in error_msg + assert "1 Dag bundle(s)" in error_msg assert "1 connection(s)" in error_msg assert "1 variable(s)" in error_msg assert "1 pool(s)" in error_msg