Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RHCLOUD-37152] Add locks on Roles, CAR and groups in migrator #1441

Merged
merged 8 commits into from
Jan 22, 2025
22 changes: 11 additions & 11 deletions rbac/management/group/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ class GroupViewSet(

def get_queryset(self):
"""Obtain queryset for requesting user based on access."""
add_principals_method = self.action == "principals" and self.request.method == "POST"
principals_method = self.action == "principals" and (self.request.method != "GET")
destroy_method = self.action == "destroy"

if add_principals_method or destroy_method:
if principals_method or destroy_method:
# In this case, the group must be locked to prevent principal changes during deletion.
# If not locked, replication to relations may be out of sync due to phantom reads.
# We have to modify the starting queryset to support locking because
Expand Down Expand Up @@ -937,19 +937,19 @@ def principals(self, request: Request, uuid: Optional[UUID] = None):
page = self.paginate_queryset(resp.get("data"))
response = self.get_paginated_response(page)
else:
group = self.get_object()
with transaction.atomic():
group = self.get_object()

self.protect_system_groups("remove principals")
self.protect_system_groups("remove principals")

if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_principals")
if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_principals")

if SERVICE_ACCOUNTS_KEY not in request.query_params and USERNAMES_KEY not in request.query_params:
key = "detail"
message = "Query parameter {} or {} is required.".format(SERVICE_ACCOUNTS_KEY, USERNAMES_KEY)
raise serializers.ValidationError({key: _(message)})
if SERVICE_ACCOUNTS_KEY not in request.query_params and USERNAMES_KEY not in request.query_params:
key = "detail"
message = "Query parameter {} or {} is required.".format(SERVICE_ACCOUNTS_KEY, USERNAMES_KEY)
raise serializers.ValidationError({key: _(message)})

with transaction.atomic():
service_accounts_to_remove = []
# Remove the service accounts from the group.
if SERVICE_ACCOUNTS_KEY in request.query_params:
Expand Down
2 changes: 1 addition & 1 deletion rbac/management/role/relation_api_dual_write_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from management.relation_replicator.relation_replicator import ReplicationEvent
from management.relation_replicator.relation_replicator import ReplicationEventType
from management.role.model import BindingMapping, Role
from migration_tool.migrate import migrate_role
from migration_tool.migrate_role import migrate_role
from migration_tool.sharedSystemRolesReplicatedRoleBindings import v1_perm_to_v2_perm
from migration_tool.utils import create_relationship

Expand Down
160 changes: 53 additions & 107 deletions rbac/migration_tool/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,140 +16,80 @@
"""

import logging
from typing import Iterable

from django.conf import settings
from django.db import transaction
from kessel.relations.v1beta1 import common_pb2
from management.group.relation_api_dual_write_group_handler import RelationApiDualWriteGroupHandler
from management.models import Workspace
from management.models import Group
from management.principal.model import Principal
from management.relation_replicator.logging_replicator import LoggingReplicator
from management.relation_replicator.outbox_replicator import OutboxReplicator
from management.relation_replicator.relation_replicator import (
PartitionKey,
RelationReplicator,
ReplicationEvent,
ReplicationEventType,
)
from management.relation_replicator.relations_api_replicator import RelationsApiReplicator
from management.role.model import BindingMapping, Role
from migration_tool.models import V2rolebinding
from migration_tool.sharedSystemRolesReplicatedRoleBindings import v1_role_to_v2_bindings
from migration_tool.utils import create_relationship
from management.role.model import Role
from management.role.relation_api_dual_write_handler import RelationApiDualWriteHandler

from api.cross_access.relation_api_dual_write_cross_access_handler import RelationApiDualWriteCrossAccessHandler
from api.models import CrossAccountRequest, Tenant

logger = logging.getLogger(__name__) # pylint: disable=invalid-name


def get_kessel_relation_tuples(
v2_role_bindings: Iterable[V2rolebinding],
default_workspace: Workspace,
) -> list[common_pb2.Relationship]:
"""Generate a set of relationships and BindingMappings for the given set of v2 role bindings."""
relationships: list[common_pb2.Relationship] = list()

for v2_role_binding in v2_role_bindings:
relationships.extend(v2_role_binding.as_tuples())

bound_resource = v2_role_binding.resource

# Is this a workspace binding, but not to the root workspace?
# If so, ensure this workspace is a child of the root workspace.
# All other resource-resource or resource-workspace relations
# which may be implied or necessary are intentionally ignored.
# These should come from the apps that own the resource.
if bound_resource.resource_type == ("rbac", "workspace") and not bound_resource.resource_id == str(
default_workspace.id
):
# This is not strictly necessary here and the relation may be a duplicate.
# Once we have more Workspace API / Inventory Group migration progress,
# this block can and probably should be removed.
# One of those APIs will add it themselves.
relationships.append(
create_relationship(
bound_resource.resource_type,
bound_resource.resource_id,
("rbac", "workspace"),
str(default_workspace.id),
"parent",
)
)

return relationships


def migrate_role(
role: Role,
default_workspace: Workspace,
current_bindings: Iterable[BindingMapping] = [],
) -> tuple[list[common_pb2.Relationship], list[BindingMapping]]:
"""
Migrate a role from v1 to v2, returning the tuples and mappings.

The mappings are returned so that we can reconstitute the corresponding tuples for a given role.
This is needed so we can remove those tuples when the role changes if needed.
"""
v2_role_bindings = v1_role_to_v2_bindings(role, default_workspace, current_bindings)
relationships = get_kessel_relation_tuples([m.get_role_binding() for m in v2_role_bindings], default_workspace)
return relationships, v2_role_bindings


def migrate_groups_for_tenant(tenant: Tenant, replicator: RelationReplicator):
"""Generate user relationships and system role assignments for groups in a tenant."""
groups = tenant.group_set.all()
groups = tenant.group_set.only("pk").values("pk")
for group in groups:
principals: list[Principal] = []
system_roles: list[Role] = []
if not group.platform_default:
principals = group.principals.all()
if group.system is False and group.admin_default is False:
system_roles = group.roles().public_tenant_only()
if any(True for _ in system_roles) or any(True for _ in principals):
# The migrator does not generally deal with concurrency control,
# but we require an atomic block due to use of select_for_update in the dual write handler.
with transaction.atomic():
# The migrator deals with concurrency control.
# We need an atomic block because the select_for_update is used in the dual write handler,
# and the group must be locked to add principals to the groups.
# NOTE: The lock on the group is not necessary when adding system roles to the group,
# as the binding mappings are locked during this process to ensure concurrency control.
# Start of transaction for group operations
with transaction.atomic():
# Requery the group with a lock
group = Group.objects.select_for_update().get(pk=group["pk"])
principals: list[Principal] = []
system_roles: list[Role] = []
if not group.platform_default:
principals = group.principals.all()
if group.system is False and group.admin_default is False:
system_roles = group.roles().public_tenant_only()
if any(True for _ in system_roles) or any(True for _ in principals):
dual_write_handler = RelationApiDualWriteGroupHandler(
group, ReplicationEventType.MIGRATE_TENANT_GROUPS, replicator=replicator
)
# this operation requires lock on group as well as in view,
# more details in GroupViewSet#get_queryset method which is used to add principals.
dual_write_handler.generate_relations_to_add_principals(principals)
# lock on group is not required to add system role, only binding mappings which is included in
# dual_write_handler
dual_write_handler.generate_relations_to_add_roles(system_roles)
dual_write_handler.replicate()
# End of transaction for group operations, locks are released


def migrate_roles_for_tenant(tenant, exclude_apps, replicator):
"""Migrate all roles for a given tenant."""
default_workspace = Workspace.objects.get(type=Workspace.Types.DEFAULT, tenant=tenant)

roles = tenant.role_set.all()
roles = tenant.role_set.only("pk")
if exclude_apps:
roles = roles.exclude(access__permission__application__in=exclude_apps)

for role in roles:
logger.info(f"Migrating role: {role.name} with UUID {role.uuid}.")

tuples, mappings = migrate_role(role, default_workspace)

# Conflicts are not ignored in order to prevent this from
# accidentally running concurrently with dual-writes.
# If migration should be rerun, then the bindings table should be dropped.
# If changing this to allow updates,
# always ensure writes are paused before running.
# This must always be the case, but this should at least start failing you if you forget.
BindingMapping.objects.bulk_create(mappings, ignore_conflicts=False)

replicator.replicate(
ReplicationEvent(
event_type=ReplicationEventType.MIGRATE_CUSTOM_ROLE,
info={"role_uuid": str(role.uuid)},
partition_key=PartitionKey.byEnvironment(),
add=tuples,
role_pks = roles.values_list("pk", flat=True)
for role in role_pks:
# The migrator deals with concurrency control and roles needs to be locked.
with transaction.atomic():
# Requery and lock role
role = Role.objects.select_for_update().get(pk=role)
logger.info(f"Migrating role: {role.name} with UUID {role.uuid}.")
dual_write_handler = RelationApiDualWriteHandler(
role, ReplicationEventType.MIGRATE_CUSTOM_ROLE, replicator
)
)

dual_write_handler.prepare_for_update()
dual_write_handler.replicate_new_or_updated_role(role)
# End of transaction, locks on role is released.
logger.info(f"Migration completed for role: {role.name} with UUID {role.uuid}.")

logger.info(f"Migrated {roles.count()} roles for tenant: {tenant.org_id}")


Expand All @@ -171,32 +111,38 @@ def migrate_data_for_tenant(tenant: Tenant, exclude_apps: list, replicator: Rela
logger.info("Finished relations of cross account requests.")


# The migrator does not generally deal with concurrency control,
# but we require an atomic block due to use of select_for_update in the dual write handler.
def migrate_cross_account_requests(tenant: Tenant, replicator: RelationReplicator):
"""Migrate approved account requests."""
cross_account_requests = CrossAccountRequest.objects.filter(status="approved", target_org=tenant.org_id)
for cross_account_request in cross_account_requests:
# The migrator deals with concurrency control.
# We need an atomic block because the select_for_update is used in the dual write handler,
# and cross account request must be locked to add roles.
# Start of transaction for approved cross account request and "add roles" operation
with transaction.atomic():
# Lock cross account request
cross_account_request = CrossAccountRequest.objects.select_for_update().get(pk=cross_account_request.pk)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good I think. I can't think of a situation where this has a problem. When any CAR attribute changes (role, status, etc) it is locked, so those changes can't happen concurrently. The CAR used for replication is always the one queried with the lock, as is done here, so by the time a select returns, it will always be the latest state. So if dual write and migrator interleave, I don't think there should be a problem.

Looks good!

cross_account_roles = cross_account_request.roles.all()
if any(True for _ in cross_account_roles):
dual_write_handler = RelationApiDualWriteCrossAccessHandler(
cross_account_request, ReplicationEventType.MIGRATE_CROSS_ACCOUNT_REQUEST, replicator
)
# This operation requires lock on cross account request as is done
# in CrossAccountRequestViewSet#get_queryset
# This also locks binding mapping if exists for passed system roles.
dual_write_handler.generate_relations_to_add_roles(cross_account_request.roles.all())
dual_write_handler.replicate()
# End of transaction for approved cross account request and its add role operation
# Locks on cross account request and eventually on default workspace are released.
# Default workspace is locked when related binding mapping did not exist yet
# (Considering the position of this algorithm,the binding mappings for system roles should already exist,
# as they are tied to the system roles.)


def migrate_data(
exclude_apps: list = [], orgs: list = [], write_relationships: str = "False", skip_roles: bool = False
):
"""Migrate all data for all tenants."""
# Only run this in maintanence mode or
# if we don't write relationships (testing out the migration and clean up the created bindingmappings)
if not settings.READ_ONLY_API_MODE and write_relationships != "False":
logger.fatal("Read-only API mode is required. READ_ONLY_API_MODE must be set to true.")
return

count = 0
tenants = Tenant.objects.filter(ready=True).exclude(tenant_name="public")
replicator = _get_replicator(write_relationships)
Expand Down
78 changes: 78 additions & 0 deletions rbac/migration_tool/migrate_role.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Copyright 2019 Red Hat, Inc.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""

from typing import Iterable

from kessel.relations.v1beta1 import common_pb2
from management.role.model import BindingMapping, Role
from management.workspace.model import Workspace
from migration_tool.models import V2rolebinding
from migration_tool.sharedSystemRolesReplicatedRoleBindings import v1_role_to_v2_bindings
from migration_tool.utils import create_relationship


def get_kessel_relation_tuples(
v2_role_bindings: Iterable[V2rolebinding],
default_workspace: Workspace,
) -> list[common_pb2.Relationship]:
"""Generate a set of relationships and BindingMappings for the given set of v2 role bindings."""
relationships: list[common_pb2.Relationship] = list()

for v2_role_binding in v2_role_bindings:
relationships.extend(v2_role_binding.as_tuples())

bound_resource = v2_role_binding.resource

# Is this a workspace binding, but not to the root workspace?
# If so, ensure this workspace is a child of the root workspace.
# All other resource-resource or resource-workspace relations
# which may be implied or necessary are intentionally ignored.
# These should come from the apps that own the resource.
if bound_resource.resource_type == ("rbac", "workspace") and not bound_resource.resource_id == str(
default_workspace.id
):
# This is not strictly necessary here and the relation may be a duplicate.
# Once we have more Workspace API / Inventory Group migration progress,
# this block can and probably should be removed.
# One of those APIs will add it themselves.
relationships.append(
create_relationship(
bound_resource.resource_type,
bound_resource.resource_id,
("rbac", "workspace"),
str(default_workspace.id),
"parent",
)
)

return relationships


def migrate_role(
role: Role,
default_workspace: Workspace,
current_bindings: Iterable[BindingMapping] = [],
) -> tuple[list[common_pb2.Relationship], list[BindingMapping]]:
"""
Migrate a role from v1 to v2, returning the tuples and mappings.

The mappings are returned so that we can reconstitute the corresponding tuples for a given role.
This is needed so we can remove those tuples when the role changes if needed.
"""
v2_role_bindings = v1_role_to_v2_bindings(role, default_workspace, current_bindings)
relationships = get_kessel_relation_tuples([m.get_role_binding() for m in v2_role_bindings], default_workspace)
return relationships, v2_role_bindings
Loading