Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions providers/keycloak/docs/auth-manager/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ It enables you to manage users, roles, groups, and permissions entirely within K
:maxdepth: 2

manage/login

**Note on Multi Team Clients**
When using the Keycloak auth manager with a multi-team deployment, you will need to keep the keycloak client up to date by creating the required policies and resources for each new team that is added.

If a team has dags added to Airflow but they are not setup in the keycloak client, keycloak will return a 400 error on the ``/dags`` screen:
``{"error":"invalid_resource", "error_description": "Resource with id [Dag:team-a] does not exist."}``
The keycloak auth manager will handle this error by registering it as a 'deny' response to preserve access to the ``/dags`` screen for other teams.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@
log = logging.getLogger(__name__)

RESOURCE_ID_ATTRIBUTE_NAME = "resource_id"
KEYCLOAK_RESOURCE_NOT_FOUND_ERROR = "resource not found:"


def _is_missing_keycloak_resource_response(status_code: int, text: Any) -> bool:
return status_code == 500 and isinstance(text, str) and KEYCLOAK_RESOURCE_NOT_FOUND_ERROR in text.lower()


TEAM_SCOPED_RESOURCES = frozenset(
Expand Down Expand Up @@ -407,16 +402,19 @@ def _is_authorized(
server_url = conf.get(CONF_SECTION_NAME, CONF_SERVER_URL_KEY)

context_attributes = prune_dict(attributes or {})

is_team_resource = bool(
team_name
and conf.getboolean("core", "multi_team", fallback=False)
and resource_type in TEAM_SCOPED_RESOURCES
)

if resource_id:
context_attributes[RESOURCE_ID_ATTRIBUTE_NAME] = resource_id
elif method == "GET":
method = "LIST"

if (
team_name
and conf.getboolean("core", "multi_team", fallback=False)
and resource_type in TEAM_SCOPED_RESOURCES
):
if is_team_resource:
resource_name = f"{resource_type.value}:{team_name}"
else:
resource_name = resource_type.value
Expand All @@ -438,12 +436,15 @@ def _is_authorized(
return False
if resp.status_code == 400:
error = json.loads(resp.text)
if is_team_resource and error.get("error") == "invalid_resource":
# filter_authorized_dag_ids will return this error if team resources have not been added to the Keycloak Client.
log.warning(
"Keycloak authorization resource is missing; denying access. Response: %s", resp.text
)
return False
raise AirflowException(
f"Request not recognized by Keycloak. {error.get('error')}. {error.get('error_description')}"
)
if _is_missing_keycloak_resource_response(resp.status_code, resp.text):
log.warning("Keycloak authorization resource is missing; denying access. Response: %s", resp.text)
return False
raise AirflowException(f"Unexpected error: {resp.status_code} - {resp.text}")

def filter_authorized_dag_ids(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
VariableDetails,
)

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_7_PLUS, AIRFLOW_V_3_2_PLUS

if AIRFLOW_V_3_2_PLUS:
Expand Down Expand Up @@ -65,8 +66,6 @@
)
from airflow.providers.keycloak.auth_manager.user import KeycloakAuthManagerUser

from tests_common.test_utils.config import conf_vars


def _build_access_token(payload: dict[str, object]) -> str:
header = {"alg": "none", "typ": "JWT"}
Expand Down Expand Up @@ -386,18 +385,22 @@ def test_is_authorized_failure(self, function, auth_manager, user):

assert "Unexpected error" in str(e.value)

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="team_name not supported before Airflow 3.2.0")
@conf_vars({("core", "multi_team"): "True"})
def test_is_authorized_missing_keycloak_resource(self, auth_manager, user, caplog):
resp = Mock()
resp.status_code = 500
resp.text = "resource not found: Dag:team-a"
resp.status_code = 400
resp.text = '{"error": "invalid_resource", "error_description": "Resource with id [Dag:team-a] does not exist."}'
auth_manager.http_session.post = Mock(return_value=resp)
caplog.set_level("WARNING", logger="airflow.providers.keycloak.auth_manager.keycloak_auth_manager")

result = auth_manager.is_authorized_dag(method="GET", details=DagDetails(id="dag_0"), user=user)
result = auth_manager.is_authorized_dag(
method="GET", details=DagDetails(id="dag_0", team_name="team-a"), user=user
)

assert result is False
assert "Keycloak authorization resource is missing; denying access" in caplog.text
assert "resource not found: Dag:team-a" in caplog.text
assert "Resource with id [Dag:team-a] does not exist." in caplog.text

@pytest.mark.parametrize(
"function",
Expand Down Expand Up @@ -666,14 +669,15 @@ def test_filter_authorized_dag_ids_team_match(self, mock_is_authorized, auth_man
assert result == {"dag-a"}

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="team_name not supported before Airflow 3.2.0")
@conf_vars({("core", "multi_team"): "True"})
def test_filter_authorized_dag_ids_missing_keycloak_resource(self, auth_manager_multi_team, user, caplog):
def post_response(*_, data, **__):
claims = json.loads(base64.b64decode(data["claim_token"]).decode())
dag_id = claims[RESOURCE_ID_ATTRIBUTE_NAME][0]
response = Mock()
if dag_id == "dag-missing":
response.status_code = 500
response.text = "resource not found: Dag:team-a"
response.status_code = 400
response.text = '{"error":"invalid_resource", "error_description": "Resource with id [Dag:team-a] does not exist."}'
else:
response.status_code = 200
return response
Expand Down
Loading