Skip to content

Commit 127ab40

Browse files
authored
Merge branch 'main' into feature/http-extra-auth-parameter-in-connection
2 parents 608f8ef + 4521e8d commit 127ab40

File tree

145 files changed

+4828
-2934
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

145 files changed

+4828
-2934
lines changed

Diff for: .github/actions/install-pre-commit/action.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ inputs:
2424
default: "3.9"
2525
uv-version:
2626
description: 'uv version to use'
27-
default: "0.5.14" # Keep this comment to allow automatic replacement of uv version
27+
default: "0.5.20" # Keep this comment to allow automatic replacement of uv version
2828
pre-commit-version:
2929
description: 'pre-commit version to use'
3030
default: "4.0.1" # Keep this comment to allow automatic replacement of pre-commit version

Diff for: Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
5555
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
5656
ARG AIRFLOW_PIP_VERSION=24.3.1
5757
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
58-
ARG AIRFLOW_UV_VERSION=0.5.14
58+
ARG AIRFLOW_UV_VERSION=0.5.20
5959
ARG AIRFLOW_USE_UV="false"
6060
ARG UV_HTTP_TIMEOUT="300"
6161
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"

Diff for: Dockerfile.ci

+12-8
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,15 @@ function environment_initialization() {
872872
fi
873873
}
874874

875+
function handle_mount_sources() {
876+
if [[ ${MOUNT_SOURCES=} == "remove" ]]; then
877+
echo
878+
echo "${COLOR_BLUE}Mounted sources are removed, cleaning up mounted dist-info files${COLOR_RESET}"
879+
echo
880+
rm -rf /usr/local/lib/python${PYTHON_MAJOR_MINOR_VERSION}/site-packages/apache_airflow*.dist-info/
881+
fi
882+
}
883+
875884
function determine_airflow_to_use() {
876885
USE_AIRFLOW_VERSION="${USE_AIRFLOW_VERSION:=""}"
877886
if [[ ${USE_AIRFLOW_VERSION} == "" && ${USE_PACKAGES_FROM_DIST=} != "true" ]]; then
@@ -885,12 +894,6 @@ function determine_airflow_to_use() {
885894
mkdir -p "${AIRFLOW_SOURCES}"/logs/
886895
mkdir -p "${AIRFLOW_SOURCES}"/tmp/
887896
else
888-
if [[ ${USE_AIRFLOW_VERSION} =~ 2\.[7-8].* && ${TEST_TYPE} == "Providers[fab]" ]]; then
889-
echo
890-
echo "${COLOR_YELLOW}Skipping FAB tests on Airflow 2.7 and 2.8 because of FAB incompatibility with them${COLOR_RESET}"
891-
echo
892-
exit 0
893-
fi
894897
if [[ ${CLEAN_AIRFLOW_INSTALLATION=} == "true" ]]; then
895898
echo
896899
echo "${COLOR_BLUE}Uninstalling all packages first${COLOR_RESET}"
@@ -1050,7 +1053,7 @@ function start_webserver_with_examples(){
10501053
echo
10511054
echo "${COLOR_BLUE}Parsing example dags${COLOR_RESET}"
10521055
echo
1053-
airflow scheduler --num-runs 100
1056+
airflow dags reserialize
10541057
echo "Example dags parsing finished"
10551058
echo "Create admin user"
10561059
airflow users create -u admin -p admin -f Thor -l Administrator -r Admin -e [email protected]
@@ -1074,6 +1077,7 @@ function start_webserver_with_examples(){
10741077
echo "${COLOR_BLUE}Airflow webserver started${COLOR_RESET}"
10751078
}
10761079

1080+
handle_mount_sources
10771081
determine_airflow_to_use
10781082
environment_initialization
10791083
check_boto_upgrade
@@ -1264,7 +1268,7 @@ COPY --from=scripts common.sh install_packaging_tools.sh install_additional_depe
12641268
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
12651269
ARG AIRFLOW_PIP_VERSION=24.3.1
12661270
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
1267-
ARG AIRFLOW_UV_VERSION=0.5.14
1271+
ARG AIRFLOW_UV_VERSION=0.5.20
12681272
# TODO(potiuk): automate with upgrade check (possibly)
12691273
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
12701274
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"

Diff for: airflow/api/common/delete_dag.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
7575
# This handles the case when the dag_id is changed in the file
7676
session.execute(
7777
delete(ParseImportError)
78-
.where(ParseImportError.filename == dag.fileloc)
78+
.where(
79+
ParseImportError.filename == dag.fileloc,
80+
ParseImportError.bundle_name == dag.bundle_name,
81+
)
7982
.execution_options(synchronize_session="fetch")
8083
)
8184

Diff for: airflow/api_connexion/endpoints/import_error_endpoint.py

+21-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from collections.abc import Sequence
2020
from typing import TYPE_CHECKING
2121

22-
from sqlalchemy import func, select
22+
from sqlalchemy import func, select, tuple_
2323

2424
from airflow.api_connexion import security
2525
from airflow.api_connexion.exceptions import NotFound, PermissionDenied
@@ -61,7 +61,9 @@ def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) ->
6161
readable_dag_ids = security.get_readable_dags()
6262
file_dag_ids = {
6363
dag_id[0]
64-
for dag_id in session.query(DagModel.dag_id).filter(DagModel.fileloc == error.filename).all()
64+
for dag_id in session.query(DagModel.dag_id)
65+
.filter(DagModel.fileloc == error.filename, DagModel.bundle_name == error.bundle_name)
66+
.all()
6567
}
6668

6769
# Can the user read any DAGs in the file?
@@ -98,9 +100,17 @@ def get_import_errors(
98100
if not can_read_all_dags:
99101
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
100102
readable_dag_ids = security.get_readable_dags()
101-
dagfiles_stmt = select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids))
102-
query = query.where(ParseImportError.filename.in_(dagfiles_stmt))
103-
count_query = count_query.where(ParseImportError.filename.in_(dagfiles_stmt))
103+
dagfiles_stmt = session.execute(
104+
select(DagModel.fileloc, DagModel.bundle_name)
105+
.distinct()
106+
.where(DagModel.dag_id.in_(readable_dag_ids))
107+
).all()
108+
query = query.where(
109+
tuple_(ParseImportError.filename, ParseImportError.bundle_name or None).in_(dagfiles_stmt)
110+
)
111+
count_query = count_query.where(
112+
tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt)
113+
)
104114

105115
total_entries = session.scalars(count_query).one()
106116
import_errors = session.scalars(query.offset(offset).limit(limit)).all()
@@ -109,7 +119,12 @@ def get_import_errors(
109119
for import_error in import_errors:
110120
# Check if user has read access to all the DAGs defined in the file
111121
file_dag_ids = (
112-
session.query(DagModel.dag_id).filter(DagModel.fileloc == import_error.filename).all()
122+
session.query(DagModel.dag_id)
123+
.filter(
124+
DagModel.fileloc == import_error.filename,
125+
DagModel.bundle_name == import_error.bundle_name,
126+
)
127+
.all()
113128
)
114129
requests: Sequence[IsAuthorizedDagRequest] = [
115130
{

Diff for: airflow/api_connexion/openapi/v1.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -3370,6 +3370,10 @@ components:
33703370
type: string
33713371
readOnly: true
33723372
description: The filename
3373+
bundle_name:
3374+
type: string
3375+
readOnly: true
3376+
description: The bundle name
33733377
stack_trace:
33743378
type: string
33753379
readOnly: true

Diff for: airflow/api_connexion/schemas/error_schema.py

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class Meta:
3535
import_error_id = auto_field("id", dump_only=True)
3636
timestamp = auto_field(format="iso", dump_only=True)
3737
filename = auto_field(dump_only=True)
38+
bundle_name = auto_field(dump_only=True)
3839
stack_trace = auto_field("stacktrace", dump_only=True)
3940

4041

Diff for: airflow/api_fastapi/core_api/datamodels/import_error.py

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class ImportErrorResponse(BaseModel):
3131
id: int = Field(alias="import_error_id")
3232
timestamp: datetime
3333
filename: str
34+
bundle_name: str
3435
stacktrace: str = Field(alias="stack_trace")
3536

3637

Diff for: airflow/api_fastapi/core_api/datamodels/plugins.py

-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ class PluginResponse(BaseModel):
7272
global_operator_extra_links: list[str]
7373
operator_extra_links: list[str]
7474
source: Annotated[str, BeforeValidator(coerce_to_string)]
75-
ti_deps: list[Annotated[str, BeforeValidator(coerce_to_string)]]
7675
listeners: list[str]
7776
timetables: list[str]
7877

Diff for: airflow/api_fastapi/core_api/openapi/v1-generated.yaml

+4-6
Original file line numberDiff line numberDiff line change
@@ -8440,6 +8440,9 @@ components:
84408440
filename:
84418441
type: string
84428442
title: Filename
8443+
bundle_name:
8444+
type: string
8445+
title: Bundle Name
84438446
stack_trace:
84448447
type: string
84458448
title: Stack Trace
@@ -8448,6 +8451,7 @@ components:
84488451
- import_error_id
84498452
- timestamp
84508453
- filename
8454+
- bundle_name
84518455
- stack_trace
84528456
title: ImportErrorResponse
84538457
description: Import Error Response.
@@ -8695,11 +8699,6 @@ components:
86958699
source:
86968700
type: string
86978701
title: Source
8698-
ti_deps:
8699-
items:
8700-
type: string
8701-
type: array
8702-
title: Ti Deps
87038702
listeners:
87048703
items:
87058704
type: string
@@ -8721,7 +8720,6 @@ components:
87218720
- global_operator_extra_links
87228721
- operator_extra_links
87238722
- source
8724-
- ti_deps
87258723
- listeners
87268724
- timetables
87278725
title: PluginResponse

Diff for: airflow/api_fastapi/core_api/routes/public/backfills.py

+34-10
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@
3838
from airflow.api_fastapi.core_api.openapi.exceptions import (
3939
create_openapi_http_exception_doc,
4040
)
41+
from airflow.exceptions import DagNotFound
4142
from airflow.models import DagRun
4243
from airflow.models.backfill import (
4344
AlreadyRunningBackfill,
4445
Backfill,
4546
BackfillDagRun,
47+
DagNoScheduleException,
4648
_create_backfill,
4749
_do_dry_run,
4850
)
@@ -209,6 +211,17 @@ def create_backfill(
209211
status_code=status.HTTP_409_CONFLICT,
210212
detail=f"There is already a running backfill for dag {backfill_request.dag_id}",
211213
)
214+
except DagNoScheduleException:
215+
raise HTTPException(
216+
status_code=status.HTTP_409_CONFLICT,
217+
detail=f"{backfill_request.dag_id} has no schedule",
218+
)
219+
220+
except DagNotFound:
221+
raise HTTPException(
222+
status_code=status.HTTP_404_NOT_FOUND,
223+
detail=f"Could not find dag {backfill_request.dag_id}",
224+
)
212225

213226

214227
@backfills_router.post(
@@ -227,14 +240,25 @@ def create_backfill_dry_run(
227240
from_date = timezone.coerce_datetime(body.from_date)
228241
to_date = timezone.coerce_datetime(body.to_date)
229242

230-
backfills_dry_run = _do_dry_run(
231-
dag_id=body.dag_id,
232-
from_date=from_date,
233-
to_date=to_date,
234-
reverse=body.run_backwards,
235-
reprocess_behavior=body.reprocess_behavior,
236-
session=session,
237-
)
238-
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]
243+
try:
244+
backfills_dry_run = _do_dry_run(
245+
dag_id=body.dag_id,
246+
from_date=from_date,
247+
to_date=to_date,
248+
reverse=body.run_backwards,
249+
reprocess_behavior=body.reprocess_behavior,
250+
session=session,
251+
)
252+
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]
239253

240-
return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))
254+
return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))
255+
except DagNotFound:
256+
raise HTTPException(
257+
status_code=status.HTTP_404_NOT_FOUND,
258+
detail=f"Could not find dag {body.dag_id}",
259+
)
260+
except DagNoScheduleException:
261+
raise HTTPException(
262+
status_code=status.HTTP_409_CONFLICT,
263+
detail=f"{body.dag_id} has no schedule",
264+
)

Diff for: airflow/api_fastapi/core_api/routes/public/import_error.py

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def get_import_errors(
7474
"id",
7575
"timestamp",
7676
"filename",
77+
"bundle_name",
7778
"stacktrace",
7879
],
7980
ParseImportError,

Diff for: airflow/cli/cli_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,7 @@ def string_lower_type(val):
858858
help="The option name",
859859
)
860860

861+
# lint
861862
ARG_LINT_CONFIG_SECTION = Arg(
862863
("--section",),
863864
help="The section name(s) to lint in the airflow config.",

Diff for: airflow/cli/commands/local_commands/standalone_command.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from airflow.configuration import conf
3131
from airflow.executors import executor_constants
3232
from airflow.executors.executor_loader import ExecutorLoader
33+
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
3334
from airflow.jobs.job import most_recent_job
3435
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
3536
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
@@ -76,6 +77,12 @@ def run(self):
7677
command=["scheduler"],
7778
env=env,
7879
)
80+
self.subcommands["dag-processor"] = SubCommand(
81+
self,
82+
name="dag-processor",
83+
command=["dag-processor"],
84+
env=env,
85+
)
7986
self.subcommands["webserver"] = SubCommand(
8087
self,
8188
name="webserver",
@@ -147,6 +154,7 @@ def print_output(self, name: str, output):
147154
"fastapi-api": "magenta",
148155
"webserver": "green",
149156
"scheduler": "blue",
157+
"dag-processor": "yellow",
150158
"triggerer": "cyan",
151159
"standalone": "white",
152160
}
@@ -169,6 +177,7 @@ def calculate_env(self):
169177
We override some settings as part of being standalone.
170178
"""
171179
env = dict(os.environ)
180+
env["AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR"] = "True"
172181

173182
# Make sure we're using a local executor flavour
174183
executor_class, _ = ExecutorLoader.import_default_executor_cls()
@@ -205,6 +214,7 @@ def is_ready(self):
205214
return (
206215
self.port_open(self.web_server_port)
207216
and self.job_running(SchedulerJobRunner)
217+
and self.job_running(DagProcessorJobRunner)
208218
and self.job_running(TriggererJobRunner)
209219
)
210220

@@ -228,7 +238,7 @@ def job_running(self, job_runner_class: type[BaseJobRunner]):
228238
"""
229239
Check if the given job name is running and heartbeating correctly.
230240
231-
Used to tell if scheduler is alive.
241+
Used to tell if a component is alive.
232242
"""
233243
recent = most_recent_job(job_runner_class.job_type)
234244
if not recent:

Diff for: airflow/cli/commands/remote_commands/config_command.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,21 @@ def message(self) -> str:
122122
# core
123123
ConfigChange(
124124
config=ConfigParameter("core", "check_slas"),
125-
suggestion="The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in "
126-
"future",
125+
suggestion="The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in future",
127126
),
128127
ConfigChange(
129-
config=ConfigParameter("core", "strict_asset_uri_validation"),
130-
suggestion="Asset URI with a defined scheme will now always be validated strictly, "
128+
config=ConfigParameter("core", "strict_dataset_uri_validation"),
129+
suggestion="Dataset URI with a defined scheme will now always be validated strictly, "
131130
"raising a hard error on validation failure.",
132131
),
132+
ConfigChange(
133+
config=ConfigParameter("core", "dataset_manager_class"),
134+
renamed_to=ConfigParameter("core", "asset_manager_class"),
135+
),
136+
ConfigChange(
137+
config=ConfigParameter("core", "dataset_manager_kwargs"),
138+
renamed_to=ConfigParameter("core", "asset_manager_kwargs"),
139+
),
133140
ConfigChange(
134141
config=ConfigParameter("core", "worker_precheck"),
135142
renamed_to=ConfigParameter("celery", "worker_precheck"),
@@ -237,6 +244,9 @@ def message(self) -> str:
237244
ConfigChange(
238245
config=ConfigParameter("webserver", "allow_raw_html_descriptions"),
239246
),
247+
ConfigChange(
248+
config=ConfigParameter("webserver", "cookie_samesite"),
249+
),
240250
ConfigChange(
241251
config=ConfigParameter("webserver", "update_fab_perms"),
242252
renamed_to=ConfigParameter("fab", "update_fab_perms"),

Diff for: airflow/cli/commands/remote_commands/dag_command.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
from airflow.jobs.job import Job
3838
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
3939
from airflow.models.serialized_dag import SerializedDagModel
40+
from airflow.sdk.definitions._internal.dag_parsing_context import _airflow_parsing_context_manager
4041
from airflow.utils import cli as cli_utils, timezone
4142
from airflow.utils.cli import get_dag, process_subdir, suppress_logs_and_warning
42-
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
4343
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
4444
from airflow.utils.helpers import ask_yesno
4545
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

0 commit comments

Comments
 (0)