From eca63fafc99e33e4ea1778b5d38d41c80cfa6243 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 17 Jun 2026 15:45:42 +0200 Subject: [PATCH 1/3] ref(profiles): Remove the ingest-profiles Arroyo consumer The profiles consumer has been replaced by taskbroker passthrough mode (STREAM-1041), which consumes the profiles topic directly and dispatches process_profile_from_kafka tasks. This removes the now-dead Arroyo consumer: - Drop the ingest-profiles entry from KAFKA_CONSUMERS - Remove ProcessProfileStrategyFactory and the process_message Arroyo wrapper, keeping the shared passthrough logic (renamed _process_profile_message -> process_profile_message) and simplifying header handling to the dict-only path the passthrough uses - Update tests to exercise process_profile_message directly The profiling.killswitch.ingest-profiles option is retained since it is still enforced in the passthrough path. ref STREAM-1194 --- src/sentry/consumers/__init__.py | 4 - src/sentry/killswitches.py | 2 +- .../profiles/consumers/process/factory.py | 57 ++---------- src/sentry/profiles/task.py | 4 +- src/sentry/runner/commands/run.py | 4 +- tests/sentry/consumers/test_run.py | 1 - .../processing/backpressure/test_checking.py | 88 +++---------------- .../sentry/profiles/consumers/test_process.py | 56 ++---------- 8 files changed, 31 insertions(+), 185 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index d36e738abf7585..bb3e9cf3c009f7 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -303,10 +303,6 @@ def ingest_events_options() -> list[click.Option]: # consumer name -> consumer definition KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = { - "ingest-profiles": { - "topic": Topic.PROFILES, - "strategy_factory": "sentry.profiles.consumers.process.factory.ProcessProfileStrategyFactory", - }, "ingest-replay-recordings": { "topic": Topic.INGEST_REPLAYS_RECORDINGS, "strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory", diff --git a/src/sentry/killswitches.py b/src/sentry/killswitches.py index bf71fcb38e5b33..d0df756c3db8c4 100644 --- a/src/sentry/killswitches.py +++ b/src/sentry/killswitches.py @@ -230,7 +230,7 @@ class KillswitchInfo: ), "profiling.killswitch.ingest-profiles": KillswitchInfo( description=""" - Drop profiles in the ingest-profiles consumer. + Drop profiles during ingest-profiles taskbroker passthrough. This happens after relay produces profiles to the topic but before a task is started to process/ingest to profile. diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index 9fd1e5f510a573..be6744b69592bd 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -1,26 +1,17 @@ -from collections.abc import Iterable, Mapping - -from arroyo.backends.kafka.consumer import KafkaPayload -from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory -from arroyo.processing.strategies.commit import CommitOffsets -from arroyo.processing.strategies.run_task import RunTask -from arroyo.types import Commit, Message, Partition - from sentry import options from sentry.killswitches import killswitch_matches_context -from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step from sentry.profiles.task import process_profile_task -# Headers from consumer are Iterable[tuple[str, str | bytes]], from taskbroker are dict[str, str] -Headers = Iterable[tuple[str, str | bytes]] | dict[str, str] +# Headers from taskbroker passthrough are dict[str, str] +Headers = dict[str, str] -def _process_profile_message( +def process_profile_message( message_bytes: bytes, headers: Headers, inline: bool = False, ) -> None: - """Process a profile message from Kafka. Used by both consumer and taskbroker passthrough.""" + """Process a profile message from Kafka (taskbroker passthrough).""" if should_drop(headers): return @@ -35,48 +26,12 @@ def _process_profile_message( process_profile_task.delay(payload=message_bytes, sampled=sampled) -def process_message(message: Message[KafkaPayload]) -> None: - _process_profile_message(message.payload.value, message.payload.headers) - - -class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): - def __init__(self) -> None: - super().__init__() - self.health_checker = HealthChecker("profiles") - - def create_with_partitions( - self, - commit: Commit, - partitions: Mapping[Partition, int], - ) -> ProcessingStrategy[KafkaPayload]: - next_step = RunTask( - function=process_message, - next_step=CommitOffsets(commit), - ) - return create_backpressure_step( - health_checker=self.health_checker, - next_step=next_step, - ) - - def is_sampled(headers: Headers) -> bool: - if isinstance(headers, dict): - return headers.get("sampled", "true") == "true" - for k, v in headers: - if k == "sampled": - if isinstance(v, bytes): - return v.decode("utf-8") == "true" - return True + return headers.get("sampled", "true") == "true" def should_drop(headers: Headers) -> bool: - if isinstance(headers, dict): - context = {"project_id": headers["project_id"]} if "project_id" in headers else {} - else: - context = {} - for k, v in headers: - if k == "project_id" and isinstance(v, bytes): - context[k] = v.decode("utf-8") + context = {"project_id": headers["project_id"]} if "project_id" in headers else {} if "project_id" in context and killswitch_matches_context( "profiling.killswitch.ingest-profiles", context diff --git a/src/sentry/profiles/task.py b/src/sentry/profiles/task.py index 6c164c24270d14..f71980724963e4 100644 --- a/src/sentry/profiles/task.py +++ b/src/sentry/profiles/task.py @@ -160,9 +160,9 @@ def process_profile_from_kafka( headers: dict[str, str], ) -> None: """Process a profile from raw Kafka message bytes (taskbroker passthrough mode).""" - from sentry.profiles.consumers.process.factory import _process_profile_message + from sentry.profiles.consumers.process.factory import process_profile_message - _process_profile_message(message_bytes, headers, inline=True) + process_profile_message(message_bytes, headers, inline=True) @instrumented_task( diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 289c3fc8f12b07..e7deb384c8cf14 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -544,9 +544,9 @@ def basic_consumer( Example: - sentry run consumer ingest-profiles --consumer-group ingest-profiles + sentry run consumer ingest-occurrences --consumer-group ingest-occurrences - runs the ingest-profiles consumer with the consumer group ingest-profiles. + runs the ingest-occurrences consumer with the consumer group ingest-occurrences. Consumers are defined in 'sentry.consumers'. Each consumer can take additional CLI options. Those can be passed after '--': diff --git a/tests/sentry/consumers/test_run.py b/tests/sentry/consumers/test_run.py index 143b7fef5dcfeb..30142df976a908 100644 --- a/tests/sentry/consumers/test_run.py +++ b/tests/sentry/consumers/test_run.py @@ -47,7 +47,6 @@ def test_dlq(consumer_def) -> None: "metrics-last-seen-updater", "generic-metrics-last-seen-updater", "billing-metrics-consumer", - "ingest-profiles", "ingest-occurrences", "ingest-replay-recordings", "ingest-replay-recordings-two-step", diff --git a/tests/sentry/processing/backpressure/test_checking.py b/tests/sentry/processing/backpressure/test_checking.py index eaaa7a10e62a61..c9ecd1045e0d32 100644 --- a/tests/sentry/processing/backpressure/test_checking.py +++ b/tests/sentry/processing/backpressure/test_checking.py @@ -11,11 +11,9 @@ from sentry.ingest.consumer.factory import IngestStrategyFactory from sentry.ingest.types import ConsumerType from sentry.processing.backpressure.health import record_consumer_health -from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory from sentry.testutils.helpers.options import override_options from sentry.utils import json -PROFILES_MSG = json.dumps({"platform": "android", "profile": ""}) EVENTS_MSG = json.dumps( { "message": "test-event", @@ -24,45 +22,6 @@ ) -@override_options( - { - "backpressure.checking.enabled": True, - "backpressure.checking.interval": 5, - "backpressure.monitoring.enabled": False, - "backpressure.status_ttl": 60, - "profiling.process.raw_bytes_payload.enabled": False, - } -) -def test_bad_config() -> None: - with raises(MessageRejected): - process_one_message(consumer_type="profiles", topic="profiles", payload=PROFILES_MSG) - - -@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") -@override_options( - { - "backpressure.checking.enabled": True, - "backpressure.checking.interval": 5, - "backpressure.monitoring.enabled": True, - "backpressure.status_ttl": 60, - "profiling.process.raw_bytes_payload.enabled": False, - } -) -def test_backpressure_healthy_profiles(process_profile_task: MagicMock) -> None: - record_consumer_health( - { - "attachments-store": [], - "processing-store": [], - "processing-store-transactions": [], - "processing-locks": [], - "post-process-locks": [], - } - ) - process_one_message(consumer_type="profiles", topic="profiles", payload=PROFILES_MSG) - - process_profile_task.assert_called_once() - - @override_options( { "backpressure.checking.enabled": True, @@ -82,7 +41,7 @@ def test_backpressure_unhealthy_events() -> None: } ) with raises(MessageRejected): - process_one_message(consumer_type="ingest", topic="ingest-events", payload=EVENTS_MSG) + process_one_message(payload=EVENTS_MSG) @patch("sentry.ingest.consumer.factory.maybe_multiprocess_step") @@ -104,41 +63,22 @@ def test_backpressure_healthy_events(preprocess_event: MagicMock) -> None: "post-process-locks": [], } ) - process_one_message(consumer_type="ingest", topic="ingest-events", payload=EVENTS_MSG) + process_one_message(payload=EVENTS_MSG) preprocess_event.assert_called_once() -@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") -@override_options( - { - "backpressure.checking.enabled": False, - "backpressure.checking.interval": 5, - "profiling.process.raw_bytes_payload.enabled": False, - } -) -def test_backpressure_not_enabled(process_profile_task: MagicMock) -> None: - process_one_message(consumer_type="profiles", topic="profiles", payload=PROFILES_MSG) - - process_profile_task.assert_called_once() - - -def process_one_message(consumer_type: str, topic: str, payload: str) -> None: - if consumer_type == "profiles": - processing_strategy = ProcessProfileStrategyFactory().create_with_partitions( - commit=Mock(), partitions={} - ) - elif consumer_type == "ingest": - processing_strategy = IngestStrategyFactory( - consumer_type=ConsumerType.Events, - reprocess_only_stuck_events=False, - stop_at_timestamp=None, - num_processes=1, - max_batch_size=10, - max_batch_time=10, - input_block_size=None, - output_block_size=None, - ).create_with_partitions(commit=Mock(), partitions={}) +def process_one_message(payload: str) -> None: + processing_strategy = IngestStrategyFactory( + consumer_type=ConsumerType.Events, + reprocess_only_stuck_events=False, + stop_at_timestamp=None, + num_processes=1, + max_batch_size=10, + max_batch_time=10, + input_block_size=None, + output_block_size=None, + ).create_with_partitions(commit=Mock(), partitions={}) message_dict = { "organization_id": 1, "project_id": 1, @@ -156,7 +96,7 @@ def process_one_message(consumer_type: str, topic: str, payload: str) -> None: msgpack_payload, [], ), - Partition(Topic(topic), 1), + Partition(Topic("ingest-events"), 1), 1, datetime.now(), ) diff --git a/tests/sentry/profiles/consumers/test_process.py b/tests/sentry/profiles/consumers/test_process.py index 22bffaf1e0a0a2..7634e9fa802d89 100644 --- a/tests/sentry/profiles/consumers/test_process.py +++ b/tests/sentry/profiles/consumers/test_process.py @@ -1,17 +1,13 @@ from __future__ import annotations -from collections.abc import MutableSequence -from datetime import datetime from typing import Any -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import MagicMock, patch import msgpack import pytest -from arroyo.backends.kafka import KafkaPayload -from arroyo.types import BrokerValue, Message, Partition, Topic from django.utils import timezone -from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory +from sentry.profiles.consumers.process.factory import process_profile_message from sentry.profiles.task import _prepare_frames_from_profile from sentry.testutils.helpers.options import override_options from sentry.testutils.pytest.fixtures import django_db_all @@ -19,15 +15,10 @@ @override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "2"}]}) -@pytest.mark.parametrize("headers", [[], [("project_id", b"1")]]) +@pytest.mark.parametrize("headers", [{}, {"project_id": "1"}]) @patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") @django_db_all -def test_basic_profile_to_task( - process_profile_task: MagicMock, headers: MutableSequence[tuple[str, bytes]] -) -> None: - processing_strategy = ProcessProfileStrategyFactory().create_with_partitions( - commit=Mock(), partitions={} - ) +def test_basic_profile_to_task(process_profile_task: MagicMock, headers: dict[str, str]) -> None: message_dict = { "organization_id": 1, "project_id": 1, @@ -37,23 +28,7 @@ def test_basic_profile_to_task( } payload = msgpack.packb(message_dict) - processing_strategy.submit( - Message( - BrokerValue( - KafkaPayload( - b"key", - payload, - headers, - ), - Partition(Topic("profiles"), 1), - 1, - datetime.now(), - ) - ) - ) - processing_strategy.poll() - processing_strategy.join(1) - processing_strategy.terminate() + process_profile_message(payload, headers) process_profile_task.assert_called_with( payload=payload, @@ -65,9 +40,6 @@ def test_basic_profile_to_task( @override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "1"}]}) @django_db_all def test_killswitch_project(process_profile_task: MagicMock) -> None: - processing_strategy = ProcessProfileStrategyFactory().create_with_partitions( - commit=Mock(), partitions={} - ) message_dict = { "organization_id": 1, "project_id": 1, @@ -77,23 +49,7 @@ def test_killswitch_project(process_profile_task: MagicMock) -> None: } payload = msgpack.packb(message_dict) - processing_strategy.submit( - Message( - BrokerValue( - KafkaPayload( - b"key", - payload, - [("project_id", b"1")], - ), - Partition(Topic("profiles"), 1), - 1, - datetime.now(), - ) - ) - ) - processing_strategy.poll() - processing_strategy.join(1) - processing_strategy.terminate() + process_profile_message(payload, {"project_id": "1"}) process_profile_task.assert_not_called() From 2bbf25bf5c7fe0b7fe08b36ac3a4c571c1b36124 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 17 Jun 2026 15:56:20 +0200 Subject: [PATCH 2/3] ref(profiles): Inline factory and demote process_profile_task With the consumer gone, the profiles/consumers package had only the taskbroker-passthrough glue left. Fold it into the task module: - Delete the sentry.profiles.consumers package; move the killswitch and sampling helpers (_should_drop / _is_sampled) into task.py, inlined into process_profile_from_kafka (always inline, no .delay()). - Demote process_profile_task from an @instrumented_task to a plain function. It is no longer dispatched as its own task (only called inline from the passthrough task), so the decorator and the ingest_profiling_tasks namespace are no longer needed. The namespace definition is retained for now since removing it requires coordinated getsentry worker-config changes. - Move the relocated tests into tests/sentry/profiles/test_task.py. ref STREAM-1194 --- src/sentry/killswitches.py | 2 +- src/sentry/profiles/consumers/__init__.py | 0 .../profiles/consumers/process/__init__.py | 0 .../profiles/consumers/process/factory.py | 41 ------- src/sentry/profiles/task.py | 33 ++++-- .../sentry/profiles/consumers/test_process.py | 104 ------------------ tests/sentry/profiles/test_task.py | 95 ++++++++++++++++ 7 files changed, 118 insertions(+), 157 deletions(-) delete mode 100644 src/sentry/profiles/consumers/__init__.py delete mode 100644 src/sentry/profiles/consumers/process/__init__.py delete mode 100644 src/sentry/profiles/consumers/process/factory.py delete mode 100644 tests/sentry/profiles/consumers/test_process.py diff --git a/src/sentry/killswitches.py b/src/sentry/killswitches.py index d0df756c3db8c4..a707cca8aeb4d1 100644 --- a/src/sentry/killswitches.py +++ b/src/sentry/killswitches.py @@ -230,7 +230,7 @@ class KillswitchInfo: ), "profiling.killswitch.ingest-profiles": KillswitchInfo( description=""" - Drop profiles during ingest-profiles taskbroker passthrough. + Drop profiles in the sentry.profiles.task.process_profile_from_kafka task. This happens after relay produces profiles to the topic but before a task is started to process/ingest to profile. diff --git a/src/sentry/profiles/consumers/__init__.py b/src/sentry/profiles/consumers/__init__.py deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/src/sentry/profiles/consumers/process/__init__.py b/src/sentry/profiles/consumers/process/__init__.py deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py deleted file mode 100644 index be6744b69592bd..00000000000000 --- a/src/sentry/profiles/consumers/process/factory.py +++ /dev/null @@ -1,41 +0,0 @@ -from sentry import options -from sentry.killswitches import killswitch_matches_context -from sentry.profiles.task import process_profile_task - -# Headers from taskbroker passthrough are dict[str, str] -Headers = dict[str, str] - - -def process_profile_message( - message_bytes: bytes, - headers: Headers, - inline: bool = False, -) -> None: - """Process a profile message from Kafka (taskbroker passthrough).""" - if should_drop(headers): - return - - sampled = is_sampled(headers) - - if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - return - - if inline: - process_profile_task(payload=message_bytes, sampled=sampled) - else: - process_profile_task.delay(payload=message_bytes, sampled=sampled) - - -def is_sampled(headers: Headers) -> bool: - return headers.get("sampled", "true") == "true" - - -def should_drop(headers: Headers) -> bool: - context = {"project_id": headers["project_id"]} if "project_id" in headers else {} - - if "project_id" in context and killswitch_matches_context( - "profiling.killswitch.ingest-profiles", context - ): - return True - - return False diff --git a/src/sentry/profiles/task.py b/src/sentry/profiles/task.py index f71980724963e4..efe7df1cf70b96 100644 --- a/src/sentry/profiles/task.py +++ b/src/sentry/profiles/task.py @@ -28,6 +28,7 @@ from sentry import features, options, quotas from sentry.conf.types.kafka_definition import Topic from sentry.constants import DataCategory +from sentry.killswitches import killswitch_matches_context from sentry.lang.javascript.processing import _handles_frame as is_valid_javascript_frame from sentry.lang.native.processing import _merge_image from sentry.lang.native.symbolicator import ( @@ -63,7 +64,7 @@ from sentry.signals import first_profile_received from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task -from sentry.taskworker.namespaces import ingest_profiling_passthrough_tasks, ingest_profiling_tasks +from sentry.taskworker.namespaces import ingest_profiling_passthrough_tasks from sentry.taskworker.producer import get_task_producer from sentry.utils import json, metrics from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer @@ -160,19 +161,29 @@ def process_profile_from_kafka( headers: dict[str, str], ) -> None: """Process a profile from raw Kafka message bytes (taskbroker passthrough mode).""" - from sentry.profiles.consumers.process.factory import process_profile_message + if _should_drop(headers): + return - process_profile_message(message_bytes, headers, inline=True) + sampled = _is_sampled(headers) + + if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"): + return + + process_profile_task(payload=message_bytes, sampled=sampled) + + +def _is_sampled(headers: dict[str, str]) -> bool: + return headers.get("sampled", "true") == "true" + + +def _should_drop(headers: dict[str, str]) -> bool: + context = {"project_id": headers["project_id"]} if "project_id" in headers else {} + + return bool(context) and killswitch_matches_context( + "profiling.killswitch.ingest-profiles", context + ) -@instrumented_task( - name="sentry.profiles.task.process_profile", - namespace=ingest_profiling_tasks, - processing_deadline_duration=60, - retry=Retry(times=2, delay=5), - compression_type=CompressionType.ZSTD, - silo_mode=SiloMode.CELL, -) def process_profile_task( profile: Profile | None = None, payload: bytes | str | None = None, diff --git a/tests/sentry/profiles/consumers/test_process.py b/tests/sentry/profiles/consumers/test_process.py deleted file mode 100644 index 7634e9fa802d89..00000000000000 --- a/tests/sentry/profiles/consumers/test_process.py +++ /dev/null @@ -1,104 +0,0 @@ -from __future__ import annotations - -from typing import Any -from unittest.mock import MagicMock, patch - -import msgpack -import pytest -from django.utils import timezone - -from sentry.profiles.consumers.process.factory import process_profile_message -from sentry.profiles.task import _prepare_frames_from_profile -from sentry.testutils.helpers.options import override_options -from sentry.testutils.pytest.fixtures import django_db_all -from sentry.utils import json - - -@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "2"}]}) -@pytest.mark.parametrize("headers", [{}, {"project_id": "1"}]) -@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") -@django_db_all -def test_basic_profile_to_task(process_profile_task: MagicMock, headers: dict[str, str]) -> None: - message_dict = { - "organization_id": 1, - "project_id": 1, - "key_id": 1, - "received": int(timezone.now().timestamp()), - "payload": json.dumps({"platform": "android", "profile": ""}), - } - payload = msgpack.packb(message_dict) - - process_profile_message(payload, headers) - - process_profile_task.assert_called_with( - payload=payload, - sampled=True, - ) - - -@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") -@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "1"}]}) -@django_db_all -def test_killswitch_project(process_profile_task: MagicMock) -> None: - message_dict = { - "organization_id": 1, - "project_id": 1, - "key_id": 1, - "received": int(timezone.now().timestamp()), - "payload": json.dumps({"platform": "android", "profile": ""}), - } - payload = msgpack.packb(message_dict) - - process_profile_message(payload, {"project_id": "1"}) - - process_profile_task.assert_not_called() - - -def test_adjust_instruction_addr_sample_format() -> None: - original_frames = [ - {"instruction_addr": "0xdeadbeef"}, - {"instruction_addr": "0xbeefdead"}, - {"instruction_addr": "0xfeedface"}, - ] - profile: dict[str, Any] = { - "version": "1", - "platform": "cocoa", - "profile": { - "frames": original_frames.copy(), - "stacks": [[1, 0], [0, 1, 2]], - }, - "debug_meta": {"images": []}, - } - - _, stacktraces, _ = _prepare_frames_from_profile(profile, profile["platform"]) - assert profile["profile"]["stacks"] == [[3, 0], [4, 1, 2]] - frames = stacktraces[0]["frames"] - - for i in range(3): - assert frames[i] == original_frames[i] - - assert frames[3] == {"instruction_addr": "0xbeefdead", "adjust_instruction_addr": False} - assert frames[4] == {"instruction_addr": "0xdeadbeef", "adjust_instruction_addr": False} - - -def test_adjust_instruction_addr_original_format() -> None: - profile = { - "platform": "cocoa", - "sampled_profile": { - "samples": [ - { - "frames": [ - {"instruction_addr": "0xdeadbeef", "platform": "native"}, - {"instruction_addr": "0xbeefdead", "platform": "native"}, - ], - } - ] - }, - "debug_meta": {"images": []}, - } - - _, stacktraces, _ = _prepare_frames_from_profile(profile, str(profile["platform"])) - frames = stacktraces[0]["frames"] - - assert not frames[0]["adjust_instruction_addr"] - assert "adjust_instruction_addr" not in frames[1] diff --git a/tests/sentry/profiles/test_task.py b/tests/sentry/profiles/test_task.py index 481cf399341667..5f3abdab85f30c 100644 --- a/tests/sentry/profiles/test_task.py +++ b/tests/sentry/profiles/test_task.py @@ -25,9 +25,11 @@ _deobfuscate, _deobfuscate_using_symbolicator, _normalize, + _prepare_frames_from_profile, _process_symbolicator_results_for_sample, _set_frames_platform, _symbolicate_profile, + process_profile_from_kafka, process_profile_task, ) from sentry.profiles.utils import Profile @@ -1222,3 +1224,96 @@ def test_process_profile_task_should_flip_project_flag( ) project.refresh_from_db() assert project.flags.has_profiles + + +@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "2"}]}) +@pytest.mark.parametrize("headers", [{}, {"project_id": "1"}]) +@patch("sentry.profiles.task.process_profile_task") +@django_db_all +def test_process_profile_from_kafka( + mock_process_profile_task: mock.MagicMock, headers: dict[str, str] +) -> None: + payload = msgpack.packb( + { + "organization_id": 1, + "project_id": 1, + "key_id": 1, + "received": 1700000000, + "payload": json.dumps({"platform": "android", "profile": ""}), + } + ) + + process_profile_from_kafka(payload, headers) + + mock_process_profile_task.assert_called_with(payload=payload, sampled=True) + + +@patch("sentry.profiles.task.process_profile_task") +@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "1"}]}) +@django_db_all +def test_process_profile_from_kafka_killswitch( + mock_process_profile_task: mock.MagicMock, +) -> None: + payload = msgpack.packb( + { + "organization_id": 1, + "project_id": 1, + "key_id": 1, + "received": 1700000000, + "payload": json.dumps({"platform": "android", "profile": ""}), + } + ) + + process_profile_from_kafka(payload, {"project_id": "1"}) + + mock_process_profile_task.assert_not_called() + + +def test_adjust_instruction_addr_sample_format() -> None: + original_frames = [ + {"instruction_addr": "0xdeadbeef"}, + {"instruction_addr": "0xbeefdead"}, + {"instruction_addr": "0xfeedface"}, + ] + profile: dict[str, Any] = { + "version": "1", + "platform": "cocoa", + "profile": { + "frames": original_frames.copy(), + "stacks": [[1, 0], [0, 1, 2]], + }, + "debug_meta": {"images": []}, + } + + _, stacktraces, _ = _prepare_frames_from_profile(profile, profile["platform"]) + assert profile["profile"]["stacks"] == [[3, 0], [4, 1, 2]] + frames = stacktraces[0]["frames"] + + for i in range(3): + assert frames[i] == original_frames[i] + + assert frames[3] == {"instruction_addr": "0xbeefdead", "adjust_instruction_addr": False} + assert frames[4] == {"instruction_addr": "0xdeadbeef", "adjust_instruction_addr": False} + + +def test_adjust_instruction_addr_original_format() -> None: + profile = { + "platform": "cocoa", + "sampled_profile": { + "samples": [ + { + "frames": [ + {"instruction_addr": "0xdeadbeef", "platform": "native"}, + {"instruction_addr": "0xbeefdead", "platform": "native"}, + ], + } + ] + }, + "debug_meta": {"images": []}, + } + + _, stacktraces, _ = _prepare_frames_from_profile(profile, str(profile["platform"])) + frames = stacktraces[0]["frames"] + + assert not frames[0]["adjust_instruction_addr"] + assert "adjust_instruction_addr" not in frames[1] From 77d7792c71d42d7ce2199ab644a7d8ef603f3030 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 17 Jun 2026 16:00:38 +0200 Subject: [PATCH 3/3] ref(profiles): Remove the now-empty ingest.profiling namespace With process_profile_task demoted to a plain function, no tasks remain in the ingest.profiling namespace (the live passthrough task uses ingest.profiling.passthrough). Nothing else references it, so remove it. ref STREAM-1194 --- src/sentry/taskworker/namespaces.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/sentry/taskworker/namespaces.py b/src/sentry/taskworker/namespaces.py index 55c276cbe23a7e..b3459a9c8ab0c7 100644 --- a/src/sentry/taskworker/namespaces.py +++ b/src/sentry/taskworker/namespaces.py @@ -82,11 +82,6 @@ app_feature="hybrid_cloud", ) -ingest_profiling_tasks = app.taskregistry.create_namespace( - "ingest.profiling", - app_feature="profiles", -) - ingest_profiling_passthrough_tasks = app.taskregistry.create_namespace( "ingest.profiling.passthrough", app_feature="profiles",