Skip to content

Commit

Permalink
Merge branch 'main' into feature/paginated-generic-transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
dabla authored Jan 14, 2025
2 parents 0e70c4a + 1cf1d62 commit d721806
Show file tree
Hide file tree
Showing 174 changed files with 3,669 additions and 1,238 deletions.
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ docs/_build/
docs/_api/
docs/_doctrees/

# Exclude new providers docs generated files
providers/**/docs/_api/

# files generated by memray
*.py.*.html
*.py.*.bin
7 changes: 3 additions & 4 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

labelPRBasedOnFilePath:
provider:airbyte:
- providers/src/airflow/providers/airbyte/**/*
- docs/apache-airflow-providers-airbyte/**/*
- providers/tests/airbyte/**/*
- providers/tests/system/airbyte/**/*
- providers/airbyte/src/airflow/providers/airbyte/**/*
- providers/airbyte/docs/**/*
- providers/airbyte/tests/**/*

provider:alibaba:
- providers/src/airflow/providers/alibaba/**/*
Expand Down
21 changes: 14 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,19 @@ repos:
pass_filenames: true
files: ^providers/src/airflow/providers/.*/(operators|transfers|sensors)/.*\.py$
additional_dependencies: [ 'rich>=12.4.4' ]
- id: update-providers-init-py
name: Update providers __init__.py files
entry: ./scripts/ci/pre_commit/update_providers_init.py
- id: update-providers-build-files
name: Update providers build files files
entry: ./scripts/ci/pre_commit/update_providers_build_files.py
language: python
pass_filenames: true
files: ^providers/[^\/]*/__init__.py$|^providers/[^\/]*/[^\/]*/__init__.py$|^providers/.*/provider.yaml$|^airflow_breeze/templates/PROVIDER__INIT__PY_TEMPLATE.py.jinja2^
files: |
(?x)
^providers/[^\/]*/src/airflow/providers/[^\/]*/__init__.py$|
^providers/[^\/]*/[^\/]*/src/airflow/providers/[^\/]*/[^\/]*/__init__.py$|
^providers/.*/provider.yaml$|
^airflow_breeze/templates/PROVIDER__INIT__PY_TEMPLATE.py.jinja2$
^airflow_breeze/templates/get_provider_info_TEMPLATE.py.jinja2$
^airflow_breeze/templates/PROVIDER_README_TEMPLATE.rst.jinja2$
additional_dependencies: ['rich>=12.4.4','requests']
require_serial: true
- id: ruff
Expand Down Expand Up @@ -701,8 +708,7 @@ repos:
^airflow/decorators/.*$|
^airflow/hooks/.*$|
^airflow/operators/.*$|
^providers/src/airflow/providers/.*$|
^providers/src/airflow/providers/standard/sensors/.*$|
^providers/.*$|
^dev/provider_packages/.*$
- id: check-base-operator-usage
language: pygrep
Expand Down Expand Up @@ -781,6 +787,7 @@ repos:
entry: ./scripts/ci/pre_commit/check_license.py
language: python
files: ^.*LICENSE.*$|^.*LICENCE.*$
exclude: ^providers/.*/src/.*/LICENSE$
pass_filenames: false
- id: check-aiobotocore-optional
name: Check if aiobotocore is an optional dependency only
Expand Down Expand Up @@ -1376,7 +1383,7 @@ repos:
name: Validate provider.yaml files
entry: ./scripts/ci/pre_commit/check_provider_yaml_files.py
language: python
files: ^providers/src/airflow/providers/.*/provider\.yaml$
files: ^providers/src/airflow/providers/.*/provider\.yaml$|^providers/.*/src/provider\.yaml$
additional_dependencies: ['rich>=12.4.4']
require_serial: true
- id: check-template-fields-valid
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,8 @@ function install_airflow() {
local installation_command_flags
if [[ ${AIRFLOW_INSTALLATION_METHOD} == "." ]]; then
# When installing from sources - we always use `--editable` mode
# TODO(potiuk) when we move all providers to new structure, we will be able to remove all that and
# Use `uv sync` rather than `uv pip install` rather than finding all pyproject toml / projects here
installation_command_flags="--editable .[${AIRFLOW_EXTRAS}]${AIRFLOW_VERSION_SPECIFICATION} --editable ./task_sdk"
while IFS= read -r -d '' pyproject_toml_file; do
project_folder=$(dirname ${pyproject_toml_file})
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ function install_airflow() {
local installation_command_flags
if [[ ${AIRFLOW_INSTALLATION_METHOD} == "." ]]; then
# When installing from sources - we always use `--editable` mode
# TODO(potiuk) when we move all providers to new structure, we will be able to remove all that and
# Use `uv sync` rather than `uv pip install` rather than finding all pyproject toml / projects here
installation_command_flags="--editable .[${AIRFLOW_EXTRAS}]${AIRFLOW_VERSION_SPECIFICATION} --editable ./task_sdk"
while IFS= read -r -d '' pyproject_toml_file; do
project_folder=$(dirname ${pyproject_toml_file})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
job=Job(),
processor=DagFileProcessorManager(
processor_timeout=processor_timeout_seconds,
dag_directory=args.subdir,
max_runs=args.num_runs,
),
)
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")

Expand All @@ -322,6 +324,8 @@
"host": ELASTICSEARCH_HOST,
"frontend": ELASTICSEARCH_FRONTEND,
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
"write_to_es": ELASTICSEARCH_WRITE_TO_ES,
"target_index": ELASTICSEARCH_TARGET_INDEX,
"json_format": ELASTICSEARCH_JSON_FORMAT,
"json_fields": ELASTICSEARCH_JSON_FIELDS,
"host_field": ELASTICSEARCH_HOST_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ json_fields = asctime, filename, lineno, levelname, message
host_field = host
offset_field = offset
index_patterns = _all
write_to_es = False
target_index = airflow-logs

[elasticsearch_configs]
use_ssl = False
Expand Down
16 changes: 13 additions & 3 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@
log = logging.getLogger(__name__)


def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) -> Iterator[DagModel]:
def _create_orm_dags(
bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session
) -> Iterator[DagModel]:
for dag in dags:
orm_dag = DagModel(dag_id=dag.dag_id)
if dag.is_paused_upon_creation is not None:
orm_dag.is_paused = dag.is_paused_upon_creation
orm_dag.bundle_name = bundle_name
log.info("Creating ORM DAG for %s", dag.dag_id)
session.add(orm_dag)
yield orm_dag
Expand Down Expand Up @@ -270,6 +273,8 @@ def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str],


def update_dag_parsing_results_in_db(
bundle_name: str,
bundle_version: str | None,
dags: Collection[MaybeSerializedDAG],
import_errors: dict[str, str],
warnings: set[DagWarning],
Expand Down Expand Up @@ -307,7 +312,7 @@ def update_dag_parsing_results_in_db(
)
log.debug("Calling the DAG.bulk_sync_to_db method")
try:
DAG.bulk_write_to_db(dags, session=session)
DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
# Write Serialized DAGs to DB, capturing errors
# Write Serialized DAGs to DB, capturing errors
for dag in dags:
Expand Down Expand Up @@ -346,6 +351,8 @@ class DagModelOperation(NamedTuple):
"""Collect DAG objects and perform database operations for them."""

dags: dict[str, MaybeSerializedDAG]
bundle_name: str
bundle_version: str | None

def find_orm_dags(self, *, session: Session) -> dict[str, DagModel]:
"""Find existing DagModel objects from DAG objects."""
Expand All @@ -365,7 +372,8 @@ def add_dags(self, *, session: Session) -> dict[str, DagModel]:
orm_dags.update(
(model.dag_id, model)
for model in _create_orm_dags(
(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
bundle_name=self.bundle_name,
dags=(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
session=session,
)
)
Expand Down Expand Up @@ -430,6 +438,8 @@ def update_dags(
dm.timetable_summary = dag.timetable.summary
dm.timetable_description = dag.timetable.description
dm.asset_expression = dag.timetable.asset_condition.as_expression()
dm.bundle_name = self.bundle_name
dm.latest_bundle_version = self.bundle_version

last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id)
if last_automated_run is None:
Expand Down
Loading

0 comments on commit d721806

Please sign in to comment.