diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py index ba00260b16b2d..3ff321def8720 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py @@ -17,10 +17,10 @@ """ Execution API routes for asset state. -Asset state is keyed by asset *name* (not integer id) in the URL — asset names -are unique, and callers (task SDK accessors) have the name from their Asset -object without needing a DB lookup. The route resolves name → asset_id -internally for the state backend scope. +Routes are split into ``/by-name`` and ``/by-uri`` sub-prefixes mirroring the +existing ``/assets/by-name`` and ``/assets/by-uri`` pattern. Callers pass +whichever identifier their inlet type carries: ``Asset``/``AssetNameRef`` use +the name routes, ``AssetUriRef`` uses the URI routes. Per-task asset registration checks are intentionally not implemented here (deferred to AIP-93 — see TODO comment below). @@ -59,65 +59,121 @@ ) -def _resolve_asset_id(name: str, session: SessionDep) -> int: - """Resolve asset name → integer asset_id, 404 if not found.""" +def _resolve_asset_id_by_name(name: str, session: SessionDep) -> int: asset_id = session.scalar(select(AssetModel.id).where(AssetModel.name == name, AssetModel.active.has())) if asset_id is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail={"reason": "not_found", "message": f"Asset {name!r} not found"}, + detail={"reason": "not_found", "message": f"Asset with name={name!r} not found"}, ) return asset_id -@router.get("/value") -def get_asset_state( +def _resolve_asset_id_by_uri(uri: str, session: SessionDep) -> int: + asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == uri, AssetModel.active.has())) + if asset_id is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"reason": "not_found", "message": f"Asset with uri={uri!r} not found"}, + ) + return asset_id + + +@router.get("/by-name/value") +def get_asset_state_by_name( name: Annotated[str, Query(min_length=1)], key: Annotated[str, Query(min_length=1)], session: SessionDep, ) -> AssetStateResponse: - """Get an asset state value.""" - asset_id = _resolve_asset_id(name, session) + """Get an asset state value by asset name.""" + asset_id = _resolve_asset_id_by_name(name, session) value = get_state_backend().get(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it if value is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail={ - "reason": "not_found", - "message": f"Asset state key {key!r} not found", - }, + detail={"reason": "not_found", "message": f"Asset state key {key!r} not found"}, ) return AssetStateResponse(value=value) -@router.put("/value", status_code=status.HTTP_204_NO_CONTENT) -def set_asset_state( +@router.put("/by-name/value", status_code=status.HTTP_204_NO_CONTENT) +def set_asset_state_by_name( name: Annotated[str, Query(min_length=1)], key: Annotated[str, Query(min_length=1)], body: AssetStatePutBody, session: SessionDep, ) -> None: - """Set an asset state value.""" - asset_id = _resolve_asset_id(name, session) + """Set an asset state value by asset name.""" + asset_id = _resolve_asset_id_by_name(name, session) get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it -@router.delete("/value", status_code=status.HTTP_204_NO_CONTENT) -def delete_asset_state( +@router.delete("/by-name/value", status_code=status.HTTP_204_NO_CONTENT) +def delete_asset_state_by_name( name: Annotated[str, Query(min_length=1)], key: Annotated[str, Query(min_length=1)], session: SessionDep, ) -> None: - """Delete a single asset state key.""" - asset_id = _resolve_asset_id(name, session) + """Delete a single asset state key by asset name.""" + asset_id = _resolve_asset_id_by_name(name, session) get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it -@router.delete("/clear", status_code=status.HTTP_204_NO_CONTENT) -def clear_asset_state( +@router.delete("/by-name/clear", status_code=status.HTTP_204_NO_CONTENT) +def clear_asset_state_by_name( name: Annotated[str, Query(min_length=1)], session: SessionDep, ) -> None: - """Delete all state keys for an asset.""" - asset_id = _resolve_asset_id(name, session) + """Delete all state keys for an asset by asset name.""" + asset_id = _resolve_asset_id_by_name(name, session) + get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it + + +@router.get("/by-uri/value") +def get_asset_state_by_uri( + uri: Annotated[str, Query(min_length=1)], + key: Annotated[str, Query(min_length=1)], + session: SessionDep, +) -> AssetStateResponse: + """Get an asset state value by asset URI.""" + asset_id = _resolve_asset_id_by_uri(uri, session) + value = get_state_backend().get(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it + if value is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"reason": "not_found", "message": f"Asset state key {key!r} not found"}, + ) + return AssetStateResponse(value=value) + + +@router.put("/by-uri/value", status_code=status.HTTP_204_NO_CONTENT) +def set_asset_state_by_uri( + uri: Annotated[str, Query(min_length=1)], + key: Annotated[str, Query(min_length=1)], + body: AssetStatePutBody, + session: SessionDep, +) -> None: + """Set an asset state value by asset URI.""" + asset_id = _resolve_asset_id_by_uri(uri, session) + get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it + + +@router.delete("/by-uri/value", status_code=status.HTTP_204_NO_CONTENT) +def delete_asset_state_by_uri( + uri: Annotated[str, Query(min_length=1)], + key: Annotated[str, Query(min_length=1)], + session: SessionDep, +) -> None: + """Delete a single asset state key by asset URI.""" + asset_id = _resolve_asset_id_by_uri(uri, session) + get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it + + +@router.delete("/by-uri/clear", status_code=status.HTTP_204_NO_CONTENT) +def clear_asset_state_by_uri( + uri: Annotated[str, Query(min_length=1)], + session: SessionDep, +) -> None: + """Delete all state keys for an asset by asset URI.""" + asset_id = _resolve_asset_id_by_uri(uri, session) get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py index c08454c4786b8..dd45b11825d80 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py @@ -46,8 +46,12 @@ class AddStateEndpoints(VersionChange): endpoint("/state/ti/{task_instance_id}/{key}", ["PUT"]).didnt_exist, endpoint("/state/ti/{task_instance_id}/{key}", ["DELETE"]).didnt_exist, endpoint("/state/ti/{task_instance_id}", ["DELETE"]).didnt_exist, - endpoint("/state/asset/value", ["GET"]).didnt_exist, - endpoint("/state/asset/value", ["PUT"]).didnt_exist, - endpoint("/state/asset/value", ["DELETE"]).didnt_exist, - endpoint("/state/asset/clear", ["DELETE"]).didnt_exist, + endpoint("/state/asset/by-name/value", ["GET"]).didnt_exist, + endpoint("/state/asset/by-name/value", ["PUT"]).didnt_exist, + endpoint("/state/asset/by-name/value", ["DELETE"]).didnt_exist, + endpoint("/state/asset/by-name/clear", ["DELETE"]).didnt_exist, + endpoint("/state/asset/by-uri/value", ["GET"]).didnt_exist, + endpoint("/state/asset/by-uri/value", ["PUT"]).didnt_exist, + endpoint("/state/asset/by-uri/value", ["DELETE"]).didnt_exist, + endpoint("/state/asset/by-uri/clear", ["DELETE"]).didnt_exist, ) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py index 8bdf6ac859965..6041d01e7f1a6 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py @@ -31,6 +31,11 @@ pytestmark = pytest.mark.db_test +_BY_NAME_VALUE = "/execution/state/asset/by-name/value" +_BY_NAME_CLEAR = "/execution/state/asset/by-name/clear" +_BY_URI_VALUE = "/execution/state/asset/by-uri/value" +_BY_URI_CLEAR = "/execution/state/asset/by-uri/clear" + @pytest.fixture(autouse=True) def reset_state_tables(): @@ -58,21 +63,19 @@ def inactive_asset(session: Session) -> AssetModel: return asset -_VALUE_URL = "/execution/state/asset/value" -_CLEAR_URL = "/execution/state/asset/clear" - - -class TestGetAssetState: +class TestGetAssetStateByName: def test_get_returns_value(self, client: TestClient, asset: AssetModel): - client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"}) + client.put( + _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} + ) - response = client.get(_VALUE_URL, params={"name": asset.name, "key": "watermark"}) + response = client.get(_BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}) assert response.status_code == 200 assert response.json() == {"value": "2026-04-29"} def test_get_missing_key_returns_404(self, client: TestClient, asset: AssetModel): - response = client.get(_VALUE_URL, params={"name": asset.name, "key": "never_set"}) + response = client.get(_BY_NAME_VALUE, params={"name": asset.name, "key": "never_set"}) assert response.status_code == 404 assert response.json()["detail"]["reason"] == "not_found" @@ -84,84 +87,84 @@ def test_get_asset_name_with_slashes(self, client: TestClient, session): session.add(AssetActive.for_asset(slashed)) session.commit() - client.put(_VALUE_URL, params={"name": slashed.name, "key": "wm"}, json={"value": "x"}) - response = client.get(_VALUE_URL, params={"name": slashed.name, "key": "wm"}) + client.put(_BY_NAME_VALUE, params={"name": slashed.name, "key": "wm"}, json={"value": "x"}) + response = client.get(_BY_NAME_VALUE, params={"name": slashed.name, "key": "wm"}) assert response.status_code == 200 assert response.json() == {"value": "x"} + def test_get_unknown_asset_returns_404(self, client: TestClient): + response = client.get(_BY_NAME_VALUE, params={"name": "nonexistent", "key": "wm"}) -class TestPutAssetState: - def test_put_creates_row(self, client: TestClient, asset: AssetModel): + assert response.status_code == 404 + + +class TestPutAssetStateByName: + def test_put_creates_row(self, client: TestClient, asset: AssetModel, session: Session): response = client.put( - _VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} + _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} ) assert response.status_code == 204 - with create_session() as session: - row = session.scalar( - select(AssetStateModel).where( - AssetStateModel.asset_id == asset.id, - AssetStateModel.key == "watermark", - ) + row = session.scalar( + select(AssetStateModel).where( + AssetStateModel.asset_id == asset.id, + AssetStateModel.key == "watermark", ) - assert row is not None - assert row.value == "2026-04-29" + ) + assert row is not None + assert row.value == "2026-04-29" def test_put_overwrites_existing(self, client: TestClient, asset: AssetModel): - client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-28"}) + client.put( + _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-28"} + ) response = client.put( - _VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} + _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} ) assert response.status_code == 204 - assert client.get(_VALUE_URL, params={"name": asset.name, "key": "watermark"}).json() == { + assert client.get(_BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}).json() == { "value": "2026-04-29" } def test_put_empty_body_returns_422(self, client: TestClient, asset: AssetModel): - response = client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={}) - - assert response.status_code == 422 - - def test_put_extra_field_returns_422(self, client: TestClient, asset: AssetModel): - response = client.put( - _VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "x", "extra": "y"} - ) + response = client.put(_BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, json={}) assert response.status_code == 422 def test_put_unknown_asset_returns_404(self, client: TestClient): response = client.put( - _VALUE_URL, params={"name": "nonexistent", "key": "watermark"}, json={"value": "x"} + _BY_NAME_VALUE, params={"name": "nonexistent", "key": "watermark"}, json={"value": "x"} ) assert response.status_code == 404 - assert "nonexistent" in response.json()["detail"]["message"] -class TestDeleteAssetState: +class TestDeleteAssetStateByName: def test_delete_removes_key(self, client: TestClient, asset: AssetModel): - client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"}) + client.put( + _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} + ) - response = client.delete(_VALUE_URL, params={"name": asset.name, "key": "watermark"}) + response = client.delete(_BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}) assert response.status_code == 204 - assert client.get(_VALUE_URL, params={"name": asset.name, "key": "watermark"}).status_code == 404 + assert client.get(_BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}).status_code == 404 def test_delete_missing_key_is_noop(self, client: TestClient, asset: AssetModel): - response = client.delete(_VALUE_URL, params={"name": asset.name, "key": "never_existed"}) + response = client.delete(_BY_NAME_VALUE, params={"name": asset.name, "key": "never_existed"}) assert response.status_code == 204 -class TestClearAssetState: +class TestClearAssetStateByName: def test_clear_removes_all_keys(self, client: TestClient, asset: AssetModel): for k, v in [("watermark", "a"), ("last_id", "b"), ("schema_hash", "c")]: - client.put(_VALUE_URL, params={"name": asset.name, "key": k}, json={"value": v}) + client.put(_BY_NAME_VALUE, params={"name": asset.name, "key": k}, json={"value": v}) - response = client.delete(_CLEAR_URL, params={"name": asset.name}) + response = client.delete(_BY_NAME_CLEAR, params={"name": asset.name}) assert response.status_code == 204 with create_session() as session: @@ -169,15 +172,82 @@ def test_clear_removes_all_keys(self, client: TestClient, asset: AssetModel): assert row is None -class TestInactiveAssetRejected: - """An asset row without a corresponding asset_active entry is treated as not found.""" +class TestGetAssetStateByUri: + def test_get_returns_value(self, client: TestClient, asset: AssetModel): + client.put( + _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} + ) + + response = client.get(_BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}) + + assert response.status_code == 200 + assert response.json() == {"value": "2026-04-29"} + + def test_get_missing_key_returns_404(self, client: TestClient, asset: AssetModel): + response = client.get(_BY_URI_VALUE, params={"uri": asset.uri, "key": "never_set"}) - def test_get_inactive_asset_returns_404(self, client: TestClient, inactive_asset: AssetModel): - response = client.get(_VALUE_URL, params={"name": inactive_asset.name, "key": "watermark"}) assert response.status_code == 404 - def test_put_inactive_asset_returns_404(self, client: TestClient, inactive_asset: AssetModel): + def test_get_unknown_uri_returns_404(self, client: TestClient): + response = client.get(_BY_URI_VALUE, params={"uri": "s3://nonexistent/path", "key": "wm"}) + + assert response.status_code == 404 + + +class TestPutAssetStateByUri: + def test_put_creates_row(self, client: TestClient, asset: AssetModel, session: Session): response = client.put( - _VALUE_URL, params={"name": inactive_asset.name, "key": "watermark"}, json={"value": "x"} + _BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}, json={"value": "2026-04-29"} ) + + assert response.status_code == 204 + row = session.scalar( + select(AssetStateModel).where( + AssetStateModel.asset_id == asset.id, + AssetStateModel.key == "watermark", + ) + ) + assert row is not None + assert row.value == "2026-04-29" + + def test_put_unknown_uri_returns_404(self, client: TestClient): + response = client.put( + _BY_URI_VALUE, params={"uri": "s3://nonexistent/path", "key": "wm"}, json={"value": "x"} + ) + + assert response.status_code == 404 + + +class TestDeleteAssetStateByUri: + def test_delete_removes_key(self, client: TestClient, asset: AssetModel): + client.put(_BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}, json={"value": "2026-04-29"}) + + response = client.delete(_BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}) + + assert response.status_code == 204 + assert client.get(_BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}).status_code == 404 + + +class TestClearAssetStateByUri: + def test_clear_removes_all_keys(self, client: TestClient, asset: AssetModel): + for k, v in [("watermark", "a"), ("last_id", "b")]: + client.put(_BY_URI_VALUE, params={"uri": asset.uri, "key": k}, json={"value": v}) + + response = client.delete(_BY_URI_CLEAR, params={"uri": asset.uri}) + + assert response.status_code == 204 + with create_session() as session: + row = session.scalar(select(AssetStateModel).where(AssetStateModel.asset_id == asset.id)) + assert row is None + + +class TestInactiveAssetRejected: + """An asset row without a corresponding asset_active entry is treated as not found.""" + + def test_get_inactive_asset_by_name_returns_404(self, client: TestClient, inactive_asset: AssetModel): + response = client.get(_BY_NAME_VALUE, params={"name": inactive_asset.name, "key": "watermark"}) + assert response.status_code == 404 + + def test_get_inactive_asset_by_uri_returns_404(self, client: TestClient, inactive_asset: AssetModel): + response = client.get(_BY_URI_VALUE, params={"uri": inactive_asset.uri, "key": "watermark"}) assert response.status_code == 404