Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Arangodb to new provider structure #46124

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6115777
Move Arangodb to new provider structure
Jan 27, 2025
f3d55e2
Merge branch 'main' into move-arangodb-to-new-provider-strcucture
Prab-27 Jan 30, 2025
6590482
Migrate Druid provider to the new structure (#46294)
o-nikolas Jan 30, 2025
7b9614c
Enable already existing "task.scheduled_duration" metric (#46009)
AutomationDev85 Jan 30, 2025
aa1ef1c
refactor(providers/opensearch): move opensearch provider to new struc…
josix Jan 30, 2025
c8cdab3
refactor(providers/elasticsearch): move elasticsearch provider to new…
josix Jan 30, 2025
7107b73
Add shortcut key support for search dags. (#45908)
tirkarthi Jan 30, 2025
d920bfd
Validate body to be defined when the server/network is not available.…
tirkarthi Jan 30, 2025
4d5a5e6
refactor(providers/github): move github provider to new structure (#4…
josix Jan 30, 2025
f26d792
refactor(providers/dbt.cloud): move dbt cloud provider to new structu…
josix Jan 30, 2025
e7fca30
Migrate Apache Drill Provider Package to new Structure (#46264)
o-nikolas Jan 30, 2025
cc35c91
Mostly use relative fileloc in dag processor (#46290)
dstandish Jan 31, 2025
ee9f2f5
Adding DataSync links (#46292)
ellisms Jan 31, 2025
fb3ddf8
Remove deprecated functionality from google provider (#46235)
moiseenkov Jan 31, 2025
d568bb3
AIP-72: Move DAG Params to Task SDK (#46176)
amoghrajesh Jan 31, 2025
8a20dbe
AIP-38 List providers (#45535)
dauinh Jan 31, 2025
b3ba6b6
Revert "Mostly use relative fileloc in dag processor (#46290)" (#46308)
ashb Jan 31, 2025
8da980e
Nuking extra code branching in task runner for inlets, outlets (#46302)
amoghrajesh Jan 31, 2025
8667b3b
python_named_parameters should be python_named_params (#46299)
lendle Jan 31, 2025
4867074
Only update docs inventory cache from a single docs build matrix job …
ashb Jan 31, 2025
450d536
Swap CeleryExecutor over to use TaskSDK for execution. (#46265)
ashb Jan 31, 2025
4cfd529
Fix branch used for constraints in k8s env creation (#46318)
jedcunningham Jan 31, 2025
134e430
Add autorefresh to dag page (#46296)
bbovenzi Jan 31, 2025
bacde16
Move Sendgrid to new provider structure (#46313)
jason810496 Jan 31, 2025
a244794
Bump version of task-sdk package to alpha (#46324)
ashb Jan 31, 2025
be79cae
Create TaskMap rows when pushing XCom values from the Task Execution …
ashb Jan 31, 2025
feeb44e
Add autorefresh to dags list page (#46326)
bbovenzi Jan 31, 2025
3102529
Set minimum dependencies for apache-beam on Py 3.12+3.13 (#46321)
ashb Jan 31, 2025
e2435a2
Add basic autorefresh to grid and graph (#46330)
bbovenzi Jan 31, 2025
3da5fa0
Move Arangodb to new provider structure --rebase
Jan 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
36 changes: 8 additions & 28 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ labelPRBasedOnFilePath:
- providers/apache/cassandra/**

provider:apache-drill:
- providers/src/airflow/providers/apache/drill/**/*
- docs/apache-airflow-providers-apache-drill/**/*
- providers/tests/apache/drill/**/*
- providers/tests/system/apache/drill/**/*
- providers/apache/drill/**

provider:apache-druid:
- providers/src/airflow/providers/apache/druid/**/*
- docs/apache-airflow-providers-apache-druid/**/*
- providers/tests/apache/druid/**/*
- providers/tests/system/apache/druid/**/*
- providers/apache/druid/**

provider:apache-flink:
- providers/src/airflow/providers/apache/flink/**/*
Expand Down Expand Up @@ -94,9 +88,7 @@ labelPRBasedOnFilePath:
- providers/apprise/**

provider:arangodb:
- providers/src/airflow/providers/arangodb/**/*
- docs/apache-airflow-providers-arangodb/**/*
- providers/tests/arangodb/**/*
- providers/arangodb/**

provider:asana:
- providers/asana/**
Expand Down Expand Up @@ -147,10 +139,7 @@ labelPRBasedOnFilePath:
- providers/datadog/**

provider:dbt-cloud:
- providers/src/airflow/providers/dbt/cloud/**/*
- docs/apache-airflow-providers-dbt-cloud/**/*
- providers/tests/dbt/cloud/**/*
- providers/tests/system/dbt/cloud/**/*
- providers/dbt/cloud/**

provider:dingding:
- providers/src/airflow/providers/dingding/**/*
Expand All @@ -168,10 +157,7 @@ labelPRBasedOnFilePath:
- providers/edge/**

provider:elasticsearch:
- providers/src/airflow/providers/elasticsearch/**/*
- docs/apache-airflow-providers-elasticsearch/**/*
- providers/tests/elasticsearch/**/*
- providers/tests/system/elasticsearch/**/*
- providers/elasticsearch/**

provider:exasol:
- providers/exasol/**
Expand All @@ -188,10 +174,7 @@ labelPRBasedOnFilePath:
- providers/ftp/**

provider:github:
- providers/src/airflow/providers/github/**/*
- docs/apache-airflow-providers-github/**/*
- providers/tests/github/**/*
- providers/tests/system/github/**/*
- providers/github/**

provider:google:
- providers/src/airflow/providers/google/**/*
Expand Down Expand Up @@ -270,10 +253,7 @@ labelPRBasedOnFilePath:
- providers/openlineage/**

provider:opensearch:
- providers/src/airflow/providers/opensearch/**/*
- docs/apache-airflow-providers-opensearch/**/*
- providers/tests/opensearch/**/*
- providers/tests/system/opensearch/**/*
- providers/opensearch/**

provider:opsgenie:
- providers/opsgenie/**
Expand Down Expand Up @@ -322,7 +302,7 @@ labelPRBasedOnFilePath:
- providers/segment/**

provider:sendgrid:
- providers/segment/**
- providers/sendgrid/**

provider:sftp:
- providers/sftp/**
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/ci-image-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ jobs:
key: cache-docs-inventory-v1-${{ hashFiles('pyproject.toml') }}
if-no-files-found: 'error'
retention-days: '2'
if: steps.restore-docs-inventory-cache != 'true'
# If we upload from multiple matrix jobs we could end up with a race condition. so just pick one job
# to be responsible for updating it. https://github.com/actions/upload-artifact/issues/506
if: steps.restore-docs-inventory-cache != 'true' && matrix.flag == '--docs-only'
- name: "Upload build docs"
uses: actions/upload-artifact@v4
with:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Meta:
priority_weight = auto_field()
operator = auto_field()
queued_dttm = auto_field(data_key="queued_when")
scheduled_dttm = auto_field(data_key="scheduled_when")
pid = auto_field()
executor = auto_field()
executor_config = auto_field()
Expand Down Expand Up @@ -102,6 +103,7 @@ class Meta:
priority_weight = auto_field()
operator = auto_field()
queued_dttm = auto_field(data_key="queued_when")
scheduled_dttm = auto_field(data_key="scheduled_when")
pid = auto_field()
executor = auto_field()
executor_config = auto_field()
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class TaskInstanceResponse(BaseModel):
priority_weight: int | None
operator: str | None
queued_dttm: datetime | None = Field(alias="queued_when")
scheduled_dttm: datetime | None = Field(alias="scheduled_when")
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
Expand Down Expand Up @@ -147,6 +148,7 @@ class TaskInstanceHistoryResponse(BaseModel):
priority_weight: int | None
operator: str | None
queued_dttm: datetime | None = Field(alias="queued_when")
scheduled_dttm: datetime | None = Field(alias="scheduled_when")
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
Expand Down
14 changes: 14 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9643,6 +9643,12 @@ components:
format: date-time
- type: 'null'
title: Queued When
scheduled_when:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Scheduled When
pid:
anyOf:
- type: integer
Expand Down Expand Up @@ -9677,6 +9683,7 @@ components:
- priority_weight
- operator
- queued_when
- scheduled_when
- pid
- executor
- executor_config
Expand Down Expand Up @@ -9770,6 +9777,12 @@ components:
format: date-time
- type: 'null'
title: Queued When
scheduled_when:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Scheduled When
pid:
anyOf:
- type: integer
Expand Down Expand Up @@ -9828,6 +9841,7 @@ components:
- priority_weight
- operator
- queued_when
- scheduled_when
- pid
- executor
- executor_config
Expand Down
57 changes: 47 additions & 10 deletions airflow/api_fastapi/execution_api/routes/xcoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.api_fastapi.execution_api import deps
from airflow.api_fastapi.execution_api.datamodels.token import TIToken
from airflow.api_fastapi.execution_api.datamodels.xcom import XComResponse
from airflow.models.taskmap import TaskMap
from airflow.models.xcom import BaseXCom

# TODO: Add dependency on JWT token
Expand Down Expand Up @@ -55,7 +56,7 @@ def get_xcom(
map_index: Annotated[int, Query()] = -1,
) -> XComResponse:
"""Get an Airflow XCom from database - not other XCom Backends."""
if not has_xcom_access(key, token):
if not has_xcom_access(dag_id, run_id, task_id, key, token):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
Expand Down Expand Up @@ -104,6 +105,8 @@ def get_xcom(
return XComResponse(key=key, value=xcom_value)


# TODO: once we have JWT tokens, then remove dag_id/run_id/task_id from the URL and just use the info in
# the token
@router.post(
"/{dag_id}/{run_id}/{task_id}/{key}",
status_code=status.HTTP_201_CREATED,
Expand Down Expand Up @@ -139,8 +142,23 @@ def set_xcom(
token: deps.TokenDep,
session: SessionDep,
map_index: Annotated[int, Query()] = -1,
mapped_length: Annotated[
int | None, Query(description="Number of mapped tasks this value expands into")
] = None,
):
"""Set an Airflow XCom."""
from airflow.configuration import conf

if not has_xcom_access(dag_id, run_id, task_id, key, token, write=True):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"reason": "access_denied",
"message": f"Task does not have access to set XCom key '{key}'",
},
)

# TODO: This is in-efficient. We json.loads it here for BaseXCom.set to then json.dump it!
try:
json.loads(value)
except json.JSONDecodeError:
Expand All @@ -152,14 +170,30 @@ def set_xcom(
},
)

if not has_xcom_access(key, token):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"reason": "access_denied",
"message": f"Task does not have access to set XCom key '{key}'",
},
if mapped_length is not None:
task_map = TaskMap(
dag_id=dag_id,
task_id=task_id,
run_id=run_id,
map_index=map_index,
length=mapped_length,
keys=None,
)
max_map_length = conf.getint("core", "max_map_length", fallback=1024)
if task_map.length > max_map_length:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "unmappable_return_value_length",
"message": "pushed value is too large to map as a downstream's dependency",
},
)
session.add(task_map)

# else:
# TODO: Can/should we check if a client _hasn't_ provided this for an upstream of a mapped task? That
# means loading the serialized dag and that seems like a relatively costly operation for minimal benefit
# (the mapped task would fail in a moment as it can't be expanded anyway.)

# We use `BaseXCom.set` to set XComs directly to the database, bypassing the XCom Backend.
try:
Expand All @@ -184,13 +218,16 @@ def set_xcom(
return {"message": "XCom successfully set"}


def has_xcom_access(xcom_key: str, token: TIToken) -> bool:
def has_xcom_access(
dag_id: str, run_id: str, task_id: str, xcom_key: str, token: TIToken, write: bool = False
) -> bool:
"""Check if the task has access to the XCom."""
# TODO: Placeholder for actual implementation

ti_key = token.ti_key
log.debug(
"Checking access for task instance with key '%s' to XCom '%s'",
"Checking %s XCom access for xcom from TaskInstance with key '%s' to XCom '%s'",
"write" if write else "read",
ti_key,
xcom_key,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
from airflow.models import TaskInstance
from airflow.models.dag import DAG, _run_inline_trigger
from airflow.models.dagrun import DagRun
from airflow.models.param import ParamsDict
from airflow.models.taskinstance import TaskReturnCode
from airflow.sdk.definitions.param import ParamsDict
from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_params_trigger_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.models.param import Param, ParamsDict
from airflow.sdk import Param, ParamsDict
from airflow.utils.trigger_rule import TriggerRule

# [START params_trigger]
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_params_ui_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.models.param import Param, ParamsDict
from airflow.sdk import Param, ParamsDict

with (
DAG(
Expand Down
Loading
Loading