Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
"""
Execution API routes for asset state.

Asset state is keyed by asset *name* (not integer id) in the URL — asset names
are unique, and callers (task SDK accessors) have the name from their Asset
object without needing a DB lookup. The route resolves name → asset_id
internally for the state backend scope.
Routes are split into ``/by-name`` and ``/by-uri`` sub-prefixes mirroring the
existing ``/assets/by-name`` and ``/assets/by-uri`` pattern. Callers pass
whichever identifier their inlet type carries: ``Asset``/``AssetNameRef`` use
the name routes, ``AssetUriRef`` uses the URI routes.

Per-task asset registration checks are intentionally not implemented here
(deferred to AIP-93 — see TODO comment below).
Expand Down Expand Up @@ -59,65 +59,121 @@
)


def _resolve_asset_id(name: str, session: SessionDep) -> int:
"""Resolve asset name → integer asset_id, 404 if not found."""
def _resolve_asset_id_by_name(name: str, session: SessionDep) -> int:
asset_id = session.scalar(select(AssetModel.id).where(AssetModel.name == name, AssetModel.active.has()))
if asset_id is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"reason": "not_found", "message": f"Asset {name!r} not found"},
detail={"reason": "not_found", "message": f"Asset with name={name!r} not found"},
)
return asset_id


@router.get("/value")
def get_asset_state(
def _resolve_asset_id_by_uri(uri: str, session: SessionDep) -> int:
asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == uri, AssetModel.active.has()))
if asset_id is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"reason": "not_found", "message": f"Asset with uri={uri!r} not found"},
)
return asset_id


@router.get("/by-name/value")
def get_asset_state_by_name(
name: Annotated[str, Query(min_length=1)],
key: Annotated[str, Query(min_length=1)],
session: SessionDep,
) -> AssetStateResponse:
"""Get an asset state value."""
asset_id = _resolve_asset_id(name, session)
"""Get an asset state value by asset name."""
asset_id = _resolve_asset_id_by_name(name, session)
value = get_state_backend().get(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it
if value is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"reason": "not_found",
"message": f"Asset state key {key!r} not found",
},
detail={"reason": "not_found", "message": f"Asset state key {key!r} not found"},
)
return AssetStateResponse(value=value)


@router.put("/value", status_code=status.HTTP_204_NO_CONTENT)
def set_asset_state(
@router.put("/by-name/value", status_code=status.HTTP_204_NO_CONTENT)
def set_asset_state_by_name(
name: Annotated[str, Query(min_length=1)],
key: Annotated[str, Query(min_length=1)],
body: AssetStatePutBody,
session: SessionDep,
) -> None:
"""Set an asset state value."""
asset_id = _resolve_asset_id(name, session)
"""Set an asset state value by asset name."""
asset_id = _resolve_asset_id_by_name(name, session)
get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it


@router.delete("/value", status_code=status.HTTP_204_NO_CONTENT)
def delete_asset_state(
@router.delete("/by-name/value", status_code=status.HTTP_204_NO_CONTENT)
def delete_asset_state_by_name(
name: Annotated[str, Query(min_length=1)],
key: Annotated[str, Query(min_length=1)],
session: SessionDep,
) -> None:
"""Delete a single asset state key."""
asset_id = _resolve_asset_id(name, session)
"""Delete a single asset state key by asset name."""
asset_id = _resolve_asset_id_by_name(name, session)
get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it


@router.delete("/clear", status_code=status.HTTP_204_NO_CONTENT)
def clear_asset_state(
@router.delete("/by-name/clear", status_code=status.HTTP_204_NO_CONTENT)
def clear_asset_state_by_name(
name: Annotated[str, Query(min_length=1)],
session: SessionDep,
) -> None:
"""Delete all state keys for an asset."""
asset_id = _resolve_asset_id(name, session)
"""Delete all state keys for an asset by asset name."""
asset_id = _resolve_asset_id_by_name(name, session)
get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it


@router.get("/by-uri/value")
def get_asset_state_by_uri(
uri: Annotated[str, Query(min_length=1)],
key: Annotated[str, Query(min_length=1)],
session: SessionDep,
) -> AssetStateResponse:
"""Get an asset state value by asset URI."""
asset_id = _resolve_asset_id_by_uri(uri, session)
value = get_state_backend().get(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it
if value is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"reason": "not_found", "message": f"Asset state key {key!r} not found"},
)
return AssetStateResponse(value=value)


@router.put("/by-uri/value", status_code=status.HTTP_204_NO_CONTENT)
def set_asset_state_by_uri(
uri: Annotated[str, Query(min_length=1)],
key: Annotated[str, Query(min_length=1)],
body: AssetStatePutBody,
session: SessionDep,
) -> None:
"""Set an asset state value by asset URI."""
asset_id = _resolve_asset_id_by_uri(uri, session)
get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it


@router.delete("/by-uri/value", status_code=status.HTTP_204_NO_CONTENT)
def delete_asset_state_by_uri(
uri: Annotated[str, Query(min_length=1)],
key: Annotated[str, Query(min_length=1)],
session: SessionDep,
) -> None:
"""Delete a single asset state key by asset URI."""
asset_id = _resolve_asset_id_by_uri(uri, session)
get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it


@router.delete("/by-uri/clear", status_code=status.HTTP_204_NO_CONTENT)
def clear_asset_state_by_uri(
uri: Annotated[str, Query(min_length=1)],
session: SessionDep,
) -> None:
"""Delete all state keys for an asset by asset URI."""
asset_id = _resolve_asset_id_by_uri(uri, session)
get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ class AddStateEndpoints(VersionChange):
endpoint("/state/ti/{task_instance_id}/{key}", ["PUT"]).didnt_exist,
endpoint("/state/ti/{task_instance_id}/{key}", ["DELETE"]).didnt_exist,
endpoint("/state/ti/{task_instance_id}", ["DELETE"]).didnt_exist,
endpoint("/state/asset/value", ["GET"]).didnt_exist,
endpoint("/state/asset/value", ["PUT"]).didnt_exist,
endpoint("/state/asset/value", ["DELETE"]).didnt_exist,
endpoint("/state/asset/clear", ["DELETE"]).didnt_exist,
endpoint("/state/asset/by-name/value", ["GET"]).didnt_exist,
endpoint("/state/asset/by-name/value", ["PUT"]).didnt_exist,
endpoint("/state/asset/by-name/value", ["DELETE"]).didnt_exist,
endpoint("/state/asset/by-name/clear", ["DELETE"]).didnt_exist,
endpoint("/state/asset/by-uri/value", ["GET"]).didnt_exist,
endpoint("/state/asset/by-uri/value", ["PUT"]).didnt_exist,
endpoint("/state/asset/by-uri/value", ["DELETE"]).didnt_exist,
endpoint("/state/asset/by-uri/clear", ["DELETE"]).didnt_exist,
)
Loading
Loading