Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/cli/commands/asset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ def asset_materialize(args, *, session: Session = NEW_SESSION) -> None:
"""
Materialize the specified asset.

This is done by finding the DAG with the asset defined as outlet, and create
a run for that DAG.
This is done by finding the Dag with the asset defined as outlet, and create
a run for that Dag.
"""
if not args.name and not args.uri:
raise SystemExit("Either --name or --uri is required")
Expand All @@ -149,7 +149,7 @@ def asset_materialize(args, *, session: Session = NEW_SESSION) -> None:
if (dag_id := next(dag_id_it, None)) is None:
raise SystemExit(f"Asset with {select_message} does not exist.")
if next(dag_id_it, None) is not None:
raise SystemExit(f"More than one DAG materializes asset with {select_message}.")
raise SystemExit(f"More than one Dag materializes asset with {select_message}.")

try:
user = getuser()
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/cli/commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,9 @@ def message(self) -> str | None:
was_removed=False,
new_default="False",
suggestion="In Airflow 3.0 the default value for `catchup_by_default` is set to `False`. "
"This means that DAGs without explicit definition of the `catchup` parameter will not "
"This means that Dags without explicit definition of the `catchup` parameter will not "
"catchup by default. "
"If your DAGs rely on catchup behavior, not explicitly defined in the DAG definition, "
"If your Dags rely on catchup behavior, not explicitly defined in the Dag definition, "
"set this configuration parameter to `True` in the `scheduler` section of your `airflow.cfg` "
"to enable the behavior from Airflow 2.x.",
),
Expand Down
40 changes: 20 additions & 20 deletions airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def dag_delete(args) -> None:
api_client = get_current_api_client()
if (
args.yes
or input("This will drop all existing records related to the specified DAG. Proceed? (y/n)").upper()
or input("This will drop all existing records related to the specified Dag. Proceed? (y/n)").upper()
== "Y"
):
try:
Expand All @@ -119,21 +119,21 @@ def dag_delete(args) -> None:
@cli_utils.action_cli
@providers_configuration_loaded
def dag_pause(args) -> None:
"""Pauses a DAG."""
"""Pauses a Dag."""
set_is_paused(True, args)


@cli_utils.action_cli
@providers_configuration_loaded
def dag_unpause(args) -> None:
"""Unpauses a DAG."""
"""Unpauses a Dag."""
set_is_paused(False, args)


@providers_configuration_loaded
@provide_session
def set_is_paused(is_paused: bool, args, *, session: Session = NEW_SESSION) -> None:
"""Set is_paused for DAG by a given dag_id."""
"""Set is_paused for Dag by a given dag_id."""
query = select(DagModel)
if args.treat_dag_id_as_regex:
query = query.where(DagModel.dag_id.regexp_match(args.dag_id))
Expand All @@ -144,13 +144,13 @@ def set_is_paused(is_paused: bool, args, *, session: Session = NEW_SESSION) -> N

matched_dags = list(session.scalars(query).all())
if not matched_dags:
print(f"No {'un' if is_paused else ''}paused DAGs were found")
print(f"No {'un' if is_paused else ''}paused Dags were found")
return

if not args.yes and args.treat_dag_id_as_regex:
dags_ids = [dag.dag_id for dag in matched_dags]
question = (
f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} DAGs:\n"
f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} Dags:\n"
f"{','.join(dags_ids)}"
f"\n\nAre you sure? [y/n]"
)
Expand All @@ -172,7 +172,7 @@ def _update_is_paused(dag_model: DagModel) -> bool:

@providers_configuration_loaded
def dag_dependencies_show(args) -> None:
"""Display DAG dependencies, save to file or show as imgcat image."""
"""Display Dag dependencies, save to file or show as imgcat image."""
deduplicated_dag_dependencies = {
dag_id: list(set(dag_dependencies))
for dag_id, dag_dependencies in SerializedDagModel.get_dag_dependencies().items()
Expand All @@ -196,7 +196,7 @@ def dag_dependencies_show(args) -> None:

@providers_configuration_loaded
def dag_show(args) -> None:
"""Display DAG or saves its graphic representation to the file."""
"""Display Dag or saves its graphic representation to the file."""
from airflow.models.serialized_dag import SerializedDagModel

if not (dag := SerializedDagModel.get_dag(dag_id=args.dag_id)):
Expand Down Expand Up @@ -297,7 +297,7 @@ def dag_state(args, session: Session = NEW_SESSION) -> None:
dag = DagModel.get_dagmodel(args.dag_id, session=session)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table")
dr, _ = fetch_dag_run_from_run_id_or_logical_date_string(
dag_id=dag.dag_id,
value=args.logical_date_or_run_id,
Expand Down Expand Up @@ -343,10 +343,10 @@ def dag_next_execution(args) -> None:
).one_or_none()

if not dag or not last_parsed_dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in the database")
raise SystemExit(f"Dag: {args.dag_id} does not exist in the database")

if last_parsed_dag.is_paused:
print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)
print("[INFO] Please be reminded this Dag is PAUSED now.", file=sys.stderr)

def iter_next_dagrun_info() -> Iterator[DagRunInfo | None]:
yield (dagrun_info := dag.timetable.next_run_info_from_dag_model(dag_model=last_parsed_dag))
Expand Down Expand Up @@ -381,7 +381,7 @@ def iter_next_dagrun_info() -> Iterator[DagRunInfo | None]:
if info is None:
print(
"[WARN] No following schedule can be found. "
"This DAG may have schedule interval '@once' or `None`.",
"This Dag may have schedule interval '@once' or `None`.",
file=sys.stderr,
)
print(None)
Expand Down Expand Up @@ -460,7 +460,7 @@ def get_dag_detail(dag: DAG) -> dict:
return {col: dag_detail[col] for col in cols if col in DAG_DETAIL_FIELDS}

def filter_dags_by_bundle(dags: Iterable[DAG], bundle_names: list[str] | None) -> Iterable[DAG]:
"""Filter DAGs based on the specified bundle name, if provided."""
"""Filter Dags based on the specified bundle name, if provided."""
if not bundle_names:
return dags

Expand All @@ -485,10 +485,10 @@ def filter_dags_by_bundle(dags: Iterable[DAG], bundle_names: list[str] | None) -
@providers_configuration_loaded
@provide_session
def dag_details(args, session: Session = NEW_SESSION):
"""Get DAG details given a DAG id."""
"""Get Dag details given a Dag id."""
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table")
dag_detail = DAGResponse.from_orm(dag).model_dump()

if args.output in ["table", "plain"]:
Expand Down Expand Up @@ -604,7 +604,7 @@ def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION)
dag = DagModel.get_dagmodel(args.dag_id, session=session)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table")
queries.append(Job.dag_id == args.dag_id)

if args.state:
Expand All @@ -627,14 +627,14 @@ def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION)
@providers_configuration_loaded
@provide_session
def dag_list_dag_runs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None:
"""List dag runs for a given DAG."""
"""List dag runs for a given Dag."""
if dag:
args.dag_id = dag.dag_id
else:
dag = DagModel.get_dagmodel(args.dag_id, session=session)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
raise SystemExit(f"Dag: {args.dag_id} does not exist in 'dag' table")

state = args.state.lower() if args.state else None
dag_runs = DagRun.find(
Expand Down Expand Up @@ -665,7 +665,7 @@ def _render_dagrun(dr: DagRun) -> dict[str, str]:
@providers_configuration_loaded
@provide_session
def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None:
"""Execute one single DagRun for a given DAG and logical date."""
"""Execute one single DagRun for a given Dag and logical date."""
run_conf = None
if args.conf:
try:
Expand Down Expand Up @@ -723,7 +723,7 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No
@providers_configuration_loaded
@provide_session
def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
"""Serialize a DAG instance."""
"""Serialize a Dag instance."""
manager = DagBundlesManager()
manager.sync_bundles_to_db(session=session)
session.commit()
Expand Down
22 changes: 11 additions & 11 deletions airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@

def _generate_temporary_run_id() -> str:
"""
Generate a ``run_id`` for a DAG run that will be created temporarily.
Generate a ``run_id`` for a Dag run that will be created temporarily.

This is used mostly by ``airflow task test`` to create a DAG run that will
This is used mostly by ``airflow task test`` to create a Dag run that will
be deleted after the task is run.
"""
return f"__airflow_temporary_run_{timezone.utcnow().isoformat()}__"
Expand All @@ -89,16 +89,16 @@ def _get_dag_run(
session: Session | None = None,
) -> tuple[DagRun, bool]:
"""
Try to retrieve a DAG run from a string representing either a run ID or logical date.
Try to retrieve a Dag run from a string representing either a run ID or logical date.

This checks DAG runs like this:
This checks Dag runs like this:

1. If the input ``logical_date_or_run_id`` matches a DAG run ID, return the run.
1. If the input ``logical_date_or_run_id`` matches a Dag run ID, return the run.
2. Try to parse the input as a date. If that works, and the resulting
date matches a DAG run's logical date, return the run.
date matches a Dag run's logical date, return the run.
3. If ``create_if_necessary`` is *False* and the input works for neither of
the above, raise ``DagRunNotFound``.
4. Try to create a new DAG run. If the input looks like a date, use it as
4. Try to create a new Dag run. If the input looks like a date, use it as
the logical date; otherwise use it as a run ID and set the logical date
to the current time.
"""
Expand Down Expand Up @@ -173,7 +173,7 @@ def _get_ti(
):
dag = task.dag
if dag is None:
raise ValueError("Cannot get task instance for a task not assigned to a DAG")
raise ValueError("Cannot get task instance for a task not assigned to a Dag")

# this check is imperfect because diff dags could have tasks with same name
# but in a task, dag_id is a property that accesses its dag, and we don't
Expand Down Expand Up @@ -311,7 +311,7 @@ def task_state(args) -> None:
@suppress_logs_and_warning
@providers_configuration_loaded
def task_list(args, dag: DAG | None = None) -> None:
"""List the tasks within a DAG at the command line."""
"""List the tasks within a Dag at the command line."""
dag = dag or get_bagged_dag(args.bundle_name, args.dag_id)
tasks = sorted(t.task_id for t in dag.tasks)
print("\n".join(tasks))
Expand Down Expand Up @@ -495,13 +495,13 @@ def task_render(args, dag: DAG | None = None) -> None:
@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def task_clear(args) -> None:
"""Clear all task instances or only those matched by regex for a DAG(s)."""
"""Clear all task instances or only those matched by regex for a Dag(s)."""
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
if args.dag_id and not args.bundle_name and not args.dag_regex and not args.task_regex:
dags = [get_db_dag(bundle_names=args.bundle_name, dag_id=args.dag_id)]
else:
# todo clear command only accepts a single dag_id. no reason for get_dags with 's' except regex?
# Reading from_db because clear method still not implemented in Task SDK DAG
# Reading from_db because clear method still not implemented in Task SDK Dag
dags = get_dags(args.bundle_name, args.dag_id, use_regex=args.dag_regex, from_db=True)

if args.task_regex:
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/cli/commands/team_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ def team_delete(args, session=NEW_SESSION):
# Check for associations
associations = []

# Check DAG bundle associations
# Check Dag bundle associations
dag_bundle_count = session.scalar(
select(func.count())
.select_from(dag_bundle_team_association_table)
.where(dag_bundle_team_association_table.c.team_name == team.name)
)
if dag_bundle_count:
associations.append(f"{dag_bundle_count} DAG bundle(s)")
associations.append(f"{dag_bundle_count} Dag bundle(s)")

# Check connection associations
if connection_count := session.scalar(
Expand Down
Loading
Loading