diff --git a/src/sentry/relocation/api/endpoints/index.py b/src/sentry/relocation/api/endpoints/index.py index e6a44d5cbcdbc2..cd463d15741c78 100644 --- a/src/sentry/relocation/api/endpoints/index.py +++ b/src/sentry/relocation/api/endpoints/index.py @@ -3,6 +3,7 @@ from datetime import timedelta from functools import reduce from string import Template +from uuid import uuid4 from django.db import router from django.db.models import Q @@ -22,12 +23,13 @@ from sentry.api.serializers import serialize from sentry.auth.elevated_mode import has_elevated_mode from sentry.models.files.file import File +from sentry.models.files.utils import get_relocation_storage from sentry.options import get from sentry.relocation.api.endpoints import ERR_FEATURE_DISABLED from sentry.relocation.api.serializers.relocation import RelocationSerializer from sentry.relocation.models.relocation import Relocation, RelocationFile from sentry.relocation.tasks.process import uploading_start -from sentry.relocation.utils import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE +from sentry.relocation.utils import RELOCATION_FILE_TYPE, relocation_raw_data_path from sentry.search.utils import tokenize_query from sentry.signals import relocation_link_promo_code from sentry.users.models.user import MAX_USERNAME_LENGTH, User @@ -53,7 +55,9 @@ RELOCATION_FILE_SIZE_MEDIUM = 100 * 1024**2 -def get_relocation_size_category(size) -> str: +def get_relocation_size_category(size: int | None) -> str: + if size is None: + return "medium" if size < RELOCATION_FILE_SIZE_SMALL: return "small" elif size < RELOCATION_FILE_SIZE_MEDIUM: @@ -62,6 +66,7 @@ def get_relocation_size_category(size) -> str: def should_throttle_relocation(relocation_bucket_size: str) -> bool: + # TODO(cells) Instead of doing this by size we could use the number of inflight relocations. recent_relocation_files = RelocationFile.objects.filter( date_added__gte=(timezone.now() - timedelta(days=1)) ) @@ -273,14 +278,20 @@ def post(self, request: Request) -> Response: if err is not None: return err - file = File.objects.create(name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE) - file.putfile(fileobj, blob_size=RELOCATION_BLOB_SIZE, logger=logger) + relocation_uuid = uuid4() + path = relocation_raw_data_path(relocation_uuid) + relocation_storage = get_relocation_storage() + relocation_storage.save(path, fileobj) + + # TODO(cells) Make RelocationFile.file nullable + file = File.objects.create(name="stub", type=RELOCATION_FILE_TYPE, size=file_size) with atomic_transaction( using=(router.db_for_write(Relocation), router.db_for_write(RelocationFile)) ): provenance = Relocation.Provenance.SELF_HOSTED relocation: Relocation = Relocation.objects.create( + uuid=relocation_uuid, creator_id=request.user.id, owner_id=owner.id, want_org_slugs=org_slugs, @@ -290,8 +301,9 @@ def post(self, request: Request) -> Response: ) RelocationFile.objects.create( relocation=relocation, - file=file, kind=RelocationFile.Kind.RAW_USER_DATA.value, + bucket_path=path, + file=file, ) relocation_link_promo_code.send_robust( relocation_uuid=relocation.uuid, promo_code=promo_code, sender=self.__class__ diff --git a/src/sentry/relocation/api/endpoints/retry.py b/src/sentry/relocation/api/endpoints/retry.py index 989ce280207774..b890ea2792a866 100644 --- a/src/sentry/relocation/api/endpoints/retry.py +++ b/src/sentry/relocation/api/endpoints/retry.py @@ -15,7 +15,6 @@ from sentry.api.exceptions import ResourceDoesNotExist from sentry.api.permissions import SentryIsAuthenticated from sentry.api.serializers import serialize -from sentry.models.files.file import File from sentry.relocation.api.endpoints.index import ( get_autopause_value, validate_new_relocation_request, @@ -75,6 +74,7 @@ def post(self, request: Request, relocation_uuid: str) -> Response: status=status.HTTP_400_BAD_REQUEST, ) + # TODO(cells) Update this when file becomes nullable/removed. relocation_file = ( RelocationFile.objects.filter(relocation=relocation).select_related("file").first() ) @@ -85,10 +85,7 @@ def post(self, request: Request, relocation_uuid: str) -> Response: ) # We can re-use the same `File` instance in the database, avoiding duplicating data. - try: - file = File.objects.get(id=relocation_file.file_id) - fileobj = file.getfile() - except (File.DoesNotExist, FileNotFoundError): + if not relocation_file.file: return Response( {"detail": ERR_FILE_NO_LONGER_EXISTS}, status=status.HTTP_400_BAD_REQUEST, @@ -102,7 +99,7 @@ def post(self, request: Request, relocation_uuid: str) -> Response: ) err = validate_new_relocation_request( - request, owner.username, relocation.want_org_slugs, fileobj.size + request, owner.username, relocation.want_org_slugs, relocation_file.file.size ) or validate_relocation_uniqueness(owner) if err is not None: return err @@ -126,10 +123,15 @@ def post(self, request: Request, relocation_uuid: str) -> Response: new_relocation_uuid=new_relocation.uuid, sender=self.__class__, ) + # Create a new RelocationFile. Initially the bucket_path + # will point to the original relocation. During preprocessing_transfer + # we will copy the raw data into `runs/{uuid}/in` so it can be copied + # into cloudbuild. RelocationFile.objects.create( relocation=new_relocation, - file=file, kind=RelocationFile.Kind.RAW_USER_DATA.value, + bucket_path=relocation_file.bucket_path, + file=relocation_file.file, ) uploading_start.delay(str(new_relocation.uuid), None, None) diff --git a/src/sentry/relocation/services/relocation_export/impl.py b/src/sentry/relocation/services/relocation_export/impl.py index 2a6447e62f4905..c376263b600346 100644 --- a/src/sentry/relocation/services/relocation_export/impl.py +++ b/src/sentry/relocation/services/relocation_export/impl.py @@ -6,7 +6,7 @@ import base64 import logging from datetime import UTC, datetime -from io import BytesIO +from uuid import UUID from django.db import router from django.db.utils import IntegrityError @@ -14,7 +14,6 @@ from sentry_sdk import capture_exception from sentry.models.files.file import File -from sentry.models.files.utils import get_relocation_storage from sentry.relocation.models.relocation import Relocation, RelocationFile from sentry.relocation.models.relocationtransfer import ( RETRY_BACKOFF, @@ -25,7 +24,11 @@ CellRelocationExportService, ControlRelocationExportService, ) -from sentry.relocation.utils import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE +from sentry.relocation.utils import ( + RELOCATION_FILE_TYPE, + get_relocation_storage, + relocation_raw_data_path, +) from sentry.utils.db import atomic_transaction logger = logging.getLogger("sentry.relocation") @@ -77,8 +80,7 @@ def reply_with_export( requesting_region_name: str, replying_region_name: str, org_slug: str, - encrypted_bytes: list[int], - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. + encrypted_bytes: list[int] | None = None, encrypted_contents: bytes | None = None, ) -> None: from sentry.relocation.tasks.process import uploading_complete @@ -95,23 +97,24 @@ def reply_with_export( "requesting_region_name": requesting_region_name, "replying_region_name": replying_region_name, "org_slug": org_slug, - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. - "encrypted_bytes_size": len(encrypted_bytes or []), } logger.info("SaaS -> SaaS reply received in triggering region", extra=logger_data) + uuid = UUID(relocation_uuid) try: - relocation: Relocation = Relocation.objects.get(uuid=relocation_uuid) + relocation: Relocation = Relocation.objects.get(uuid=uuid) except Relocation.DoesNotExist as e: logger.exception("Could not locate Relocation model by UUID: %s", relocation_uuid) capture_exception(e) return - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. - fp = BytesIO(bytes(encrypted_bytes or [])) - file = File.objects.create(name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE) - file.putfile(fp, blob_size=RELOCATION_BLOB_SIZE, logger=logger) - logger.info("SaaS -> SaaS relocation underlying File created", extra=logger_data) + # Now that we have confirmation that the export file + # was stored in the shared bucket, create a RelocationFile record + # so that the import process can begin. + relocation_storage = get_relocation_storage() + blobsize = relocation_storage.size(relocation_raw_data_path(uuid)) + # TODO(cells) Remove this once RelocationFile.file is optional. + file = File.objects.create(name="stub", type=RELOCATION_FILE_TYPE, size=blobsize) # This write ensures that the entire chain triggered by `uploading_start` remains # idempotent, since only one (relocation_uuid, relocation_file_kind) pairing can exist @@ -122,11 +125,11 @@ def reply_with_export( relocation=relocation, file=file, kind=RelocationFile.Kind.RAW_USER_DATA.value, + bucket_path=relocation_raw_data_path(uuid), ) except IntegrityError: # We already have the file, we can proceed. pass - logger.info("SaaS -> SaaS relocation RelocationFile saved", extra=logger_data) uploading_complete.apply_async(args=[str(relocation.uuid)]) @@ -173,8 +176,7 @@ def reply_with_export( requesting_region_name: str, replying_region_name: str, org_slug: str, - encrypted_bytes: list[int], - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. + encrypted_bytes: list[int] | None = None, encrypted_contents: bytes | None = None, ) -> None: from sentry.relocation.tasks.transfer import process_relocation_transfer_control @@ -184,20 +186,9 @@ def reply_with_export( "requesting_region_name": requesting_region_name, "replying_region_name": replying_region_name, "org_slug": org_slug, - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. - "encrypted_bytes_size": len(encrypted_bytes or []), } logger.info("SaaS -> SaaS reply received on proxy", extra=logger_data) - # Save the payload into the control silo's "relocation" GCS bucket. This bucket is only used - # for temporary storage of `encrypted_bytes` being shuffled between cells like this. - path = f"runs/{relocation_uuid}/saas_to_saas_export/{org_slug}.tar" - relocation_storage = get_relocation_storage() - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. - fp = BytesIO(bytes(encrypted_bytes or [])) - relocation_storage.save(path, fp) - logger.info("SaaS -> SaaS export contents retrieved", extra=logger_data) - # Save transfer record so we can push state to the requesting cell transfer = ControlRelocationTransfer.objects.create( relocation_uuid=relocation_uuid, diff --git a/src/sentry/relocation/services/relocation_export/service.py b/src/sentry/relocation/services/relocation_export/service.py index a7854fa3e7d991..a1478f5724774f 100644 --- a/src/sentry/relocation/services/relocation_export/service.py +++ b/src/sentry/relocation/services/relocation_export/service.py @@ -73,8 +73,8 @@ def reply_with_export( requesting_region_name: str, replying_region_name: str, org_slug: str, - encrypted_bytes: list[int], - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. + # TODO(cells) bytes/contents are deprecated + encrypted_bytes: list[int] | None = None, encrypted_contents: bytes | None = None, ) -> None: """ @@ -132,8 +132,8 @@ def reply_with_export( requesting_region_name: str, replying_region_name: str, org_slug: str, - encrypted_bytes: list[int], - # TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`. + # TODO(cells) bytes/contents are deprecated + encrypted_bytes: list[int] | None = None, encrypted_contents: bytes | None = None, ) -> None: """ diff --git a/src/sentry/relocation/tasks/process.py b/src/sentry/relocation/tasks/process.py index f229c1a482e191..4e0a060752cfdf 100644 --- a/src/sentry/relocation/tasks/process.py +++ b/src/sentry/relocation/tasks/process.py @@ -20,7 +20,7 @@ from google.cloud.devtools.cloudbuild_v1 import Build from google.cloud.devtools.cloudbuild_v1 import CloudBuildClient as CloudBuildClient from sentry_sdk import capture_exception -from taskbroker_client.retry import LastAction, Retry +from taskbroker_client.retry import LastAction, Retry, retry_task from taskbroker_client.task import Task from sentry import analytics @@ -45,7 +45,6 @@ ) from sentry.backup.helpers import ImportFlags from sentry.backup.imports import import_in_organization_scope -from sentry.models.files.file import File from sentry.models.files.utils import get_relocation_storage from sentry.models.importchunk import ControlImportChunkReplica, RegionImportChunk from sentry.models.organization import Organization, OrganizationStatus @@ -74,6 +73,7 @@ create_cloudbuild_yaml, fail_relocation, get_relocations_bucket_name, + relocation_raw_data_path, retry_task_or_fail_relocation, send_relocation_update_email, start_relocation_task, @@ -364,7 +364,6 @@ def fulfill_cross_region_export_request( log_gcp_credentials_details(logger) uuid = UUID(uuid_str) - path = f"runs/{uuid}/saas_to_saas_export/{org_slug}.tar" relocation_storage = get_relocation_storage() fp = BytesIO() logger.info( @@ -393,9 +392,11 @@ def fulfill_cross_region_export_request( extra=logger_data, ) + # Rewind the file pointer and write to storage. fp.seek(0) + path = relocation_raw_data_path(uuid) relocation_storage.save(path, fp) - logger_data["encrypted_contents_size"] = fp.tell() + logger.info( "fulfill_cross_region_export_request: saved", extra=logger_data, @@ -492,10 +493,11 @@ def uploading_complete(uuid: str) -> None: if relocation is None: return - # Pull down the `RelocationFile` associated with this `Relocation`. Fallibility is expected - # here: we're pushing a potentially very large file with many blobs to a cloud store, so it is - # possible (likely, even) that not all of the blobs are yet available. If this segment fails, - # we'll just allow the Exception to bubble up and retry the task if possible. + # Ensure that the `RelocationFile` associated with this `Relocation` exists. + # Fallibility is expected here: we're pushing a potentially very large file with many blobs + # to a cloud store, so it is possible (likely, even) that not all of the blobs are yet + # available. If this segment fails, we'll just allow the Exception to bubble up and retry + # the task if possible. with retry_task_or_fail_relocation( relocation, OrderedTask.UPLOADING_COMPLETE, @@ -510,9 +512,15 @@ def uploading_complete(uuid: str) -> None: .select_related("file") .get() ) - fp = raw_relocation_file.file.getfile() - with fp: - preprocessing_scan.apply_async(args=[uuid]) + relocation_storage = get_relocation_storage() + if not relocation_storage.exists(raw_relocation_file.bucket_path): + retry_task() + + logger.info( + "uploading_complete.relocation_file_present", + extra={"uuid": uuid, "id": raw_relocation_file.id}, + ) + preprocessing_scan.apply_async(args=[uuid]) @instrumented_task( @@ -564,7 +572,8 @@ def preprocessing_scan(uuid: str) -> None: .select_related("file") .get() ) - fp = raw_relocation_file.file.getfile() + relocation_storage = get_relocation_storage() + fp = relocation_storage.open(raw_relocation_file.bucket_path) with fp: try: @@ -747,12 +756,11 @@ def preprocessing_transfer(uuid: str) -> None: kms_config_bytes = json.dumps(get_default_crypto_key_version()).encode("utf-8") relocation_storage.save(f"runs/{uuid}/in/kms-config.json", BytesIO(kms_config_bytes)) - # Now, upload the relocation data proper. - kind = RelocationFile.Kind.RAW_USER_DATA + # Ensure that the RelocationFile exists. raw_relocation_file = ( RelocationFile.objects.filter( relocation=relocation, - kind=kind.value, + kind=RelocationFile.Kind.RAW_USER_DATA.value, ) .select_related("file") .prefetch_related("file__blobs") @@ -761,14 +769,14 @@ def preprocessing_transfer(uuid: str) -> None: if raw_relocation_file is None: raise FileNotFoundError("User-supplied relocation data not found.") - file: File = raw_relocation_file.file - path = f"runs/{uuid}/in/{kind.to_filename('tar')}" - - # Copy all of the files from Django's abstract filestore into an isolated, - # backend-specific filestore for relocation operations only. - fp = file.getfile() - fp.seek(0) - relocation_storage.save(path, fp) + # If the currrent UUID is not in the bucket_path, we are processing + # a retry and need to copy the file. + if raw_relocation_file.bucket_path and uuid not in raw_relocation_file.bucket_path: + new_path = relocation_raw_data_path(UUID(uuid)) + with relocation_storage.open(raw_relocation_file.bucket_path) as f: + relocation_storage.save(new_path, f) + raw_relocation_file.bucket_path = new_path + raw_relocation_file.save() preprocessing_baseline_config.apply_async(args=[uuid]) @@ -1401,7 +1409,9 @@ def importing(uuid: str) -> None: .select_related("file") .get() ) - relocation_data_fp = raw_relocation_file.file.getfile() + relocation_storage = get_relocation_storage() + relocation_data_fp = relocation_storage.open(raw_relocation_file.bucket_path) + log_gcp_credentials_details(logger) kms_config_fp = BytesIO(json.dumps(get_default_crypto_key_version()).encode("utf-8")) diff --git a/src/sentry/relocation/tasks/transfer.py b/src/sentry/relocation/tasks/transfer.py index 6e6b314b4e3e4e..78cf30ddda22f0 100644 --- a/src/sentry/relocation/tasks/transfer.py +++ b/src/sentry/relocation/tasks/transfer.py @@ -5,7 +5,6 @@ from sentry_sdk import capture_exception from taskbroker_client.task import Task -from sentry.models.files.utils import get_relocation_storage from sentry.relocation.models.relocationtransfer import ( MAX_AGE, RETRY_BACKOFF, @@ -130,40 +129,19 @@ def process_relocation_transfer_control(transfer_id: int) -> None: capture_exception(err) elif transfer.state == RelocationTransferState.Reply: # We expect the `ProxyRelocationExportService::reply_with_export` implementation to have - # written the export data to the control silo's local relocation-specific GCS bucket. Here, - # we just read it into memory and attempt the RPC back to the requesting cell. - uuid = transfer.relocation_uuid - slug = transfer.org_slug - - relocation_storage = get_relocation_storage() - path = f"runs/{uuid}/saas_to_saas_export/{slug}.tar" + # written the export data to the shared relocation bucket. Now, we forward the reply request + # to the requesting cell. try: - encrypted_bytes = relocation_storage.open(path) - except Exception as err: - logger.warning( - "relocation.failed_open_reply", - extra={ - **log_context, - "error": str(err), - }, + cell_relocation_export_service.reply_with_export( + relocation_uuid=str(transfer.relocation_uuid), + requesting_region_name=transfer.requesting_cell, + replying_region_name=transfer.exporting_cell, + org_slug=transfer.org_slug, + encrypted_contents=None, + encrypted_bytes=None, ) - capture_exception(err) - return - - try: - with encrypted_bytes: - # Move encrypted bytes to the requesting cell. - cell_relocation_export_service.reply_with_export( - relocation_uuid=str(transfer.relocation_uuid), - requesting_region_name=transfer.requesting_cell, - replying_region_name=transfer.exporting_cell, - org_slug=slug, - # TODO(mark): finish transfer from `encrypted_contents` -> `encrypted_bytes`. - encrypted_contents=None, - encrypted_bytes=[int(byte) for byte in encrypted_bytes.read()], - ) - # We are done with this stage of the transfer - transfer.delete() + # We are done with this stage of the transfer + transfer.delete() except Exception as err: logger.warning( "relocation.failed_rpc_reply", @@ -200,31 +178,14 @@ def process_relocation_transfer_region(transfer_id: int) -> None: logger.info("relocation.transfer.processing", extra=log_context) if transfer.state == RelocationTransferState.Reply: - relocation_storage = get_relocation_storage() - path = f"runs/{uuid}/saas_to_saas_export/{slug}.tar" - try: - encrypted_bytes = relocation_storage.open(path) - except Exception as err: - logger.warning( - "relocation.failed_open.export", - extra={ - **log_context, - "error": str(err), - }, - ) - capture_exception(err) - return - - with encrypted_bytes: - control_relocation_export_service.reply_with_export( - relocation_uuid=uuid, - requesting_region_name=transfer.requesting_cell, - replying_region_name=transfer.exporting_cell, - org_slug=slug, - # TODO(mark): finish transfer from `encrypted_contents` -> `encrypted_bytes`. - encrypted_contents=None, - encrypted_bytes=[int(byte) for byte in encrypted_bytes.read()], - ) + control_relocation_export_service.reply_with_export( + relocation_uuid=uuid, + requesting_region_name=transfer.requesting_cell, + replying_region_name=transfer.exporting_cell, + org_slug=slug, + encrypted_contents=None, + encrypted_bytes=None, + ) # Remove the transfer once the reply is sent. transfer.delete() else: diff --git a/src/sentry/relocation/utils.py b/src/sentry/relocation/utils.py index 0b42c92dd7b882..5a6a1e82399452 100644 --- a/src/sentry/relocation/utils.py +++ b/src/sentry/relocation/utils.py @@ -694,6 +694,12 @@ def get_relocations_bucket_name(): return "default" +def relocation_raw_data_path(relocation_uuid: UUID) -> str: + """Generate a file path for relocation/import raw data""" + filename = RelocationFile.Kind.RAW_USER_DATA.to_filename("tar") + return f"runs/{relocation_uuid}/in/{filename}" + + def create_cloudbuild_yaml(relocation: Relocation) -> bytes: bucket_root = f"gs://{get_relocations_bucket_name()}" filter_org_slugs_args = ["--filter-org-slugs", ",".join(relocation.want_org_slugs)] diff --git a/tests/sentry/relocation/api/endpoints/test_index.py b/tests/sentry/relocation/api/endpoints/test_index.py index 8381636df2a6b2..af095e27f475da 100644 --- a/tests/sentry/relocation/api/endpoints/test_index.py +++ b/tests/sentry/relocation/api/endpoints/test_index.py @@ -328,7 +328,6 @@ def test_good_simple( ) -> None: self.login_as(user=self.owner, superuser=False) relocation_count = Relocation.objects.count() - relocation_file_count = RelocationFile.objects.count() with tempfile.TemporaryDirectory() as tmp_dir: (_, tmp_pub_key_path) = self.tmp_keys(tmp_dir) @@ -358,11 +357,13 @@ def test_good_simple( assert response.data["owner"]["email"] == str(self.owner.email) assert response.data["owner"]["username"] == str(self.owner.username) - relocation: Relocation = Relocation.objects.get(owner_id=self.owner.id) + relocation = Relocation.objects.get(owner_id=self.owner.id) assert str(relocation.uuid) == response.data["uuid"] assert relocation.want_org_slugs == ["testing"] assert Relocation.objects.count() == relocation_count + 1 - assert RelocationFile.objects.count() == relocation_file_count + 1 + assert RelocationFile.objects.count() == 1 + relocation_file = RelocationFile.objects.get(relocation=relocation) + assert relocation_file.bucket_path == f"runs/{relocation.uuid}/in/raw-relocation-data.tar" assert uploading_start_mock.call_count == 1 uploading_start_mock.assert_called_with(args=[response.data["uuid"], None, None]) diff --git a/tests/sentry/relocation/api/endpoints/test_retry.py b/tests/sentry/relocation/api/endpoints/test_retry.py index 7a80086b014672..0318ae58646ca2 100644 --- a/tests/sentry/relocation/api/endpoints/test_retry.py +++ b/tests/sentry/relocation/api/endpoints/test_retry.py @@ -17,7 +17,7 @@ ERR_OWNER_NO_LONGER_EXISTS, ) from sentry.relocation.models.relocation import Relocation, RelocationFile -from sentry.relocation.utils import RELOCATION_FILE_TYPE, OrderedTask +from sentry.relocation.utils import RELOCATION_FILE_TYPE, OrderedTask, relocation_raw_data_path from sentry.silo.base import SiloMode from sentry.testutils.cases import APITestCase from sentry.testutils.factories import get_fixture_path @@ -69,19 +69,16 @@ def setUp(self) -> None: ) # Make two files - one to be referenced by our existing `Relocation`, the other not. - self.file: File = File.objects.create( - name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE - ) + self.file = File.objects.create(name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE) self.file.putfile(get_test_tarball()) - other_file: File = File.objects.create( - name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE - ) + other_file = File.objects.create(name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE) other_file.putfile(get_test_tarball()) self.relocation_file = RelocationFile.objects.create( relocation=self.relocation, file=self.file, kind=RelocationFile.Kind.RAW_USER_DATA.value, + bucket_path=relocation_raw_data_path(self.relocation.uuid), ) @override_options( @@ -116,14 +113,19 @@ def test_good_simple(self, uploading_start_mock: Mock, analytics_record_mock: Mo assert response.data["importedUserIds"] == [] assert response.data["importedOrgIds"] == [] - assert ( + new_relocation = ( Relocation.objects.filter(owner_id=self.owner.id) .exclude(uuid=self.relocation.uuid) - .exists() + .first() ) + assert new_relocation assert Relocation.objects.count() == relocation_count + 1 assert RelocationFile.objects.count() == relocation_file_count + 1 assert File.objects.count() == file_count + new_relocation_file = RelocationFile.objects.get(relocation=new_relocation) + # The new RelocationFile should reference the old bucket path. + + assert new_relocation_file.bucket_path == relocation_raw_data_path(self.relocation.uuid) assert uploading_start_mock.call_count == 1 diff --git a/tests/sentry/relocation/tasks/test_process.py b/tests/sentry/relocation/tasks/test_process.py index 56354dc3767ebf..8aa995b0fdd69d 100644 --- a/tests/sentry/relocation/tasks/test_process.py +++ b/tests/sentry/relocation/tasks/test_process.py @@ -1,5 +1,4 @@ from datetime import timedelta -from functools import cached_property from io import BytesIO from pathlib import Path from tempfile import TemporaryDirectory @@ -83,10 +82,10 @@ validating_start, ) from sentry.relocation.utils import ( - RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE, OrderedTask, StorageBackedCheckpointExporter, + relocation_raw_data_path, ) from sentry.silo.base import SiloMode from sentry.testutils.cases import TestCase, TransactionTestCase @@ -137,45 +136,52 @@ def setUp(self) -> None: ) self.relocation_file = RelocationFile.objects.create( relocation=self.relocation, - file=self.file, + file=File.objects.create(name="export.tar", type=RELOCATION_FILE_TYPE), kind=RelocationFile.Kind.RAW_USER_DATA.value, + bucket_path=relocation_raw_data_path(self.relocation.uuid), ) + self.create_import_tarball() self.uuid = str(self.relocation.uuid) - @cached_property - def file(self): + def create_import_tarball(self): with TemporaryDirectory() as tmp_dir: - (priv_key_pem, pub_key_pem) = generate_rsa_key_pair() + self.priv_key_pem, self.pub_key_pem = generate_rsa_key_pair() tmp_priv_key_path = Path(tmp_dir).joinpath("key") - self.priv_key_pem = priv_key_pem with open(tmp_priv_key_path, "wb") as f: - f.write(priv_key_pem) + f.write(self.priv_key_pem) tmp_pub_key_path = Path(tmp_dir).joinpath("key.pub") - self.pub_key_pem = pub_key_pem with open(tmp_pub_key_path, "wb") as f: - f.write(pub_key_pem) + f.write(self.pub_key_pem) with open(IMPORT_JSON_FILE_PATH, "rb") as f: data = json.load(f) with open(tmp_pub_key_path, "rb") as p: - file = File.objects.create(name="export.tar", type=RELOCATION_FILE_TYPE) self.tarball = create_encrypted_export_tarball( data, LocalFileEncryptor(p) ).getvalue() - file.putfile(BytesIO(self.tarball)) - return file + path = relocation_raw_data_path(self.relocation.uuid) + relocation_storage = get_relocation_storage() + relocation_storage.save(path, BytesIO(self.tarball)) def swap_relocation_file_with_data_from_fixture( - self, file: File, fixture_name: str, blob_size: int = RELOCATION_BLOB_SIZE + self, + relocation_file: RelocationFile, + fixture_name: str, ) -> None: with open(get_fixture_path("backup", fixture_name), "rb") as fp: - return self.swap_relocation_file(file, BytesIO(fp.read()), blob_size) + return self.swap_relocation_file(relocation_file, BytesIO(fp.read())) def swap_relocation_file( - self, file: File, contents: BytesIO, blob_size: int = RELOCATION_BLOB_SIZE + self, + relocation_file: RelocationFile, + contents: BytesIO, ) -> None: + assert relocation_file.bucket_path + relocation_storage = get_relocation_storage() + relocation_storage.delete(relocation_file.bucket_path) + with TemporaryDirectory() as tmp_dir: tmp_priv_key_path = Path(tmp_dir).joinpath("key") tmp_pub_key_path = Path(tmp_dir).joinpath("key.pub") @@ -189,11 +195,11 @@ def swap_relocation_file( self.tarball = create_encrypted_export_tarball( data, LocalFileEncryptor(p) ).getvalue() - file.putfile(BytesIO(self.tarball), blob_size=blob_size) + relocation_storage.save(relocation_file.bucket_path, BytesIO(self.tarball)) def mock_kms_client(self, fake_kms_client: Mock): if not hasattr(self, "tarball"): - _ = self.file + self.create_import_tarball() unwrapped = unwrap_encrypted_export_tarball(BytesIO(self.tarball)) plaintext_dek = LocalFileDecryptor.from_bytes( @@ -661,9 +667,14 @@ def test_fail_if_no_attempts_left( def test_fail_invalid_tarball( self, preprocessing_transfer_mock: Mock, fake_message_builder: Mock, fake_kms_client: Mock ): - file = RelocationFile.objects.get(relocation=self.relocation).file - corrupted_tarball_bytes = bytearray(file.getfile().read())[9:] - file.putfile(BytesIO(bytes(corrupted_tarball_bytes))) + relocation_storage = get_relocation_storage() + assert self.relocation_file.bucket_path + with relocation_storage.open(self.relocation_file.bucket_path) as f: + blob = f.read() + corrupted_tarball_bytes = blob[9:] + relocation_storage.delete(self.relocation_file.bucket_path) + relocation_storage.save(self.relocation_file.bucket_path, BytesIO(corrupted_tarball_bytes)) + self.mock_message_builder(fake_message_builder) self.mock_kms_client(fake_kms_client) @@ -716,8 +727,7 @@ def test_fail_decryption_failure( def test_fail_invalid_json( self, preprocessing_transfer_mock: Mock, fake_message_builder: Mock, fake_kms_client: Mock ): - file = RelocationFile.objects.get(relocation=self.relocation).file - self.swap_relocation_file_with_data_from_fixture(file, "invalid-user.json") + self.swap_relocation_file_with_data_from_fixture(self.relocation_file, "invalid-user.json") self.mock_message_builder(fake_message_builder) self.mock_kms_client(fake_kms_client) @@ -739,8 +749,7 @@ def test_fail_invalid_json( def test_fail_no_users( self, preprocessing_transfer_mock: Mock, fake_message_builder: Mock, fake_kms_client: Mock ): - file = RelocationFile.objects.get(relocation=self.relocation).file - self.swap_relocation_file_with_data_from_fixture(file, "single-option.json") + self.swap_relocation_file_with_data_from_fixture(self.relocation_file, "single-option.json") self.mock_message_builder(fake_message_builder) self.mock_kms_client(fake_kms_client) @@ -784,8 +793,9 @@ def test_fail_too_many_users( def test_fail_no_orgs( self, preprocessing_transfer_mock: Mock, fake_message_builder: Mock, fake_kms_client: Mock ): - file = RelocationFile.objects.get(relocation=self.relocation).file - self.swap_relocation_file_with_data_from_fixture(file, "user-with-minimum-privileges.json") + self.swap_relocation_file_with_data_from_fixture( + self.relocation_file, "user-with-minimum-privileges.json" + ) self.mock_message_builder(fake_message_builder) self.mock_kms_client(fake_kms_client) @@ -895,6 +905,43 @@ def setUp(self) -> None: self.create_user("importing") self.relocation_storage = get_relocation_storage() + def test_copy_file_for_retry( + self, + preprocessing_baseline_config_mock: Mock, + fake_message_builder: Mock, + ): + self.mock_message_builder(fake_message_builder) + + # Simulate a retry relocation which references another + # relocation's bucket_path + retry_relocation = Relocation.objects.create( + creator_id=self.staff_user.id, + owner_id=self.owner.id, + want_org_slugs=[self.requested_org_slug], + want_usernames=[], + step=Relocation.Step.UPLOADING.value, + latest_task=OrderedTask.PREPROCESSING_SCAN.name, + ) + retry_file = RelocationFile.objects.create( + relocation=retry_relocation, + file=File.objects.create(name="export.tar", type=RELOCATION_FILE_TYPE), + kind=RelocationFile.Kind.RAW_USER_DATA.value, + bucket_path=self.relocation_file.bucket_path, + ) + + preprocessing_transfer(retry_relocation.uuid) + + assert fake_message_builder.call_count == 0 + assert preprocessing_baseline_config_mock.call_count == 1 + + reload_file = RelocationFile.objects.get(id=retry_file.id) + assert reload_file.bucket_path + assert self.relocation_file.bucket_path + + assert reload_file.bucket_path != self.relocation_file.bucket_path + assert str(self.relocation.uuid) not in reload_file.bucket_path + assert str(retry_relocation.uuid) in reload_file.bucket_path + def test_retry_if_attempts_left( self, preprocessing_baseline_config_mock: Mock, @@ -971,8 +1018,9 @@ def test_success( assert preprocessing_colliding_users_mock.call_count == 1 (_, files) = self.relocation_storage.listdir(f"runs/{self.uuid}/in") - assert len(files) == 1 + assert len(files) == 2 assert "baseline-config.tar" in files + assert "raw-relocation-data.tar" in files with self.relocation_storage.open(f"runs/{self.uuid}/in/baseline-config.tar") as fp: json_models = json.loads( @@ -1077,8 +1125,9 @@ def test_success( assert preprocessing_complete_mock.call_count == 1 (_, files) = self.relocation_storage.listdir(f"runs/{self.uuid}/in") - assert len(files) == 1 + assert len(files) == 2 assert "colliding-users.tar" in files + assert "raw-relocation-data.tar" in files with self.relocation_storage.open(f"runs/{self.uuid}/in/colliding-users.tar") as fp: json_models = json.loads( @@ -1910,13 +1959,17 @@ def test_success_saas_to_saas(self, postprocessing_mock: Mock, fake_kms_client: assert export_contents.getvalue() == reexport_contents.getvalue() export_contents.seek(0) - # Convert this into a `SAAS_TO_SAAS` relocation, and use the data we just exported as the - # import blob. - file = RelocationFile.objects.get(relocation=self.relocation).file + # Save the export into the expected RelocationFile path so it can be imported. + relocation_file = RelocationFile.objects.get(relocation=self.relocation) self.tarball = create_encrypted_export_tarball( json.load(export_contents), encryptor ).getvalue() - file.putfile(BytesIO(self.tarball), blob_size=RELOCATION_BLOB_SIZE) + + assert relocation_file.bucket_path + relocation_storage = get_relocation_storage() + relocation_storage.delete(relocation_file.bucket_path) + relocation_storage.save(relocation_file.bucket_path, BytesIO(self.tarball)) + self.mock_kms_client(fake_kms_client) self.relocation.provenance = Relocation.Provenance.SAAS_TO_SAAS self.relocation.save() diff --git a/tests/sentry/relocation/tasks/test_transfer.py b/tests/sentry/relocation/tasks/test_transfer.py index 77fe9dd6835df9..c804667734f78e 100644 --- a/tests/sentry/relocation/tasks/test_transfer.py +++ b/tests/sentry/relocation/tasks/test_transfer.py @@ -19,6 +19,7 @@ process_relocation_transfer_control, process_relocation_transfer_region, ) +from sentry.relocation.utils import relocation_raw_data_path from sentry.silo.base import SiloMode from sentry.testutils.cases import TestCase from sentry.testutils.silo import ( @@ -165,11 +166,9 @@ def test_transfer_reply_state(self, mock_uploading_complete: MagicMock) -> None: state=RelocationTransferState.Reply, public_key=b"public_key_data", ) + # Save a file as we use it to get size metadata relocation_storage = get_relocation_storage() - relocation_storage.save( - f"runs/{relocation.uuid}/saas_to_saas_export/{organization.slug}.tar", - BytesIO(b"export data"), - ) + relocation_storage.save(relocation_raw_data_path(relocation.uuid), BytesIO(b"some bytes")) process_relocation_transfer_control(transfer_id=transfer.id) @@ -178,7 +177,10 @@ def test_transfer_reply_state(self, mock_uploading_complete: MagicMock) -> None: assert not ControlRelocationTransfer.objects.filter(id=transfer.id).exists() # the relocation RPC call should create a file on the cell with assume_test_silo_mode(SiloMode.CELL): - assert RelocationFile.objects.filter(relocation=relocation).exists() + relocation_file = RelocationFile.objects.get(relocation=relocation) + assert relocation_file + assert relocation_file.file + assert relocation_file.file.size == len("some bytes") @cell_silo_test(cells=TEST_REGIONS) @@ -209,12 +211,6 @@ def test_transfer_reply_state(self) -> None: relocation_uuid=relocation.uuid, state=RelocationTransferState.Reply, ) - relocation_storage = get_relocation_storage() - relocation_storage.save( - f"runs/{relocation.uuid}/saas_to_saas_export/{organization.slug}.tar", - BytesIO(b"export data"), - ) - process_relocation_transfer_region(transfer_id=transfer.id) # Should be removed on completion.