From 1551d10f83258681f8fd5c96cb65ce9ebcb0f3e6 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Mon, 15 Jun 2026 11:31:39 -0400 Subject: [PATCH] ref(producers): clean up issues TaskProducers --- src/sentry/issues/attributes.py | 33 ++++++++----------- src/sentry/issues/producer.py | 25 +++++++------- src/sentry/monitors/tasks/clock_pulse.py | 11 ++----- src/sentry/options/defaults.py | 24 -------------- .../sentry/monitors/tasks/test_clock_pulse.py | 33 ------------------- 5 files changed, 28 insertions(+), 98 deletions(-) diff --git a/src/sentry/issues/attributes.py b/src/sentry/issues/attributes.py index dcf6db513ea70a..2c2f26f2b19041 100644 --- a/src/sentry/issues/attributes.py +++ b/src/sentry/issues/attributes.py @@ -19,7 +19,6 @@ from sentry.models.group import Group from sentry.models.groupassignee import GroupAssignee from sentry.models.groupowner import GroupOwner, GroupOwnerType -from sentry.options.rollout import in_random_rollout from sentry.signals import issue_assigned, issue_deleted, issue_unassigned, post_update from sentry.taskworker.producer import get_task_producer from sentry.utils import json, metrics, snuba @@ -56,13 +55,17 @@ def _get_attribute_snapshot_producer(name: str = "sentry.issues.attributes") -> ) -_attribute_snapshot_producer = SingletonProducer( - _get_attribute_snapshot_producer, max_futures=settings.SENTRY_GROUP_ATTRIBUTES_FUTURES_MAX_LIMIT -) _task_producer_name = "sentry.tasks.issues.attributes" -_attribute_snapshot_task_producer = get_task_producer( - producer_name=_task_producer_name, - producer_factory=partial(_get_attribute_snapshot_producer, name=_task_producer_name), +_attribute_snapshot_producer = ( + get_task_producer( + producer_name=_task_producer_name, + producer_factory=partial(_get_attribute_snapshot_producer, name=_task_producer_name), + ) + if settings.TASKWORKER_USE_TASK_PRODUCER + else SingletonProducer( + _get_attribute_snapshot_producer, + max_futures=settings.SENTRY_GROUP_ATTRIBUTES_FUTURES_MAX_LIMIT, + ) ) @@ -131,18 +134,10 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None: raise snuba.SnubaError(err) else: payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), []) - if settings.TASKWORKER_USE_TASK_PRODUCER and in_random_rollout( - "tasks.producer.snapshots.rollout" - ): - _attribute_snapshot_task_producer.produce( - ArroyoTopic(get_topic_definition(Topic.GROUP_ATTRIBUTES)["real_topic_name"]), - payload, - ) - else: - _attribute_snapshot_producer.produce( - ArroyoTopic(get_topic_definition(Topic.GROUP_ATTRIBUTES)["real_topic_name"]), - payload, - ) + _attribute_snapshot_producer.produce( + ArroyoTopic(get_topic_definition(Topic.GROUP_ATTRIBUTES)["real_topic_name"]), + payload, + ) def _bulk_retrieve_group_values(group_ids: list[int]) -> list[GroupValues]: diff --git a/src/sentry/issues/producer.py b/src/sentry/issues/producer.py index d625a8d32fe271..fcd6b68a2c7e4e 100644 --- a/src/sentry/issues/producer.py +++ b/src/sentry/issues/producer.py @@ -17,7 +17,6 @@ from sentry.issues.issue_occurrence import IssueOccurrence from sentry.issues.run import process_message from sentry.issues.status_change_message import StatusChangeMessage -from sentry.options.rollout import in_random_rollout from sentry.taskworker.producer import get_task_producer from sentry.utils import json from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer @@ -47,13 +46,16 @@ def _get_occurrence_producer(name: str = "sentry.issues.producer") -> KafkaProdu ) -_occurrence_producer = SingletonProducer( - _get_occurrence_producer, max_futures=settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT -) - -_occurrence_task_producer = get_task_producer( - producer_name="sentry.issues.tasks.producer", - producer_factory=partial(_get_occurrence_producer, name="sentry.issues.tasks.producer"), +_task_producer_name = "sentry.issues.tasks.producer" +_occurrence_producer = ( + get_task_producer( + producer_name=_task_producer_name, + producer_factory=partial(_get_occurrence_producer, name=_task_producer_name), + ) + if settings.TASKWORKER_USE_TASK_PRODUCER + else SingletonProducer( + _get_occurrence_producer, max_futures=settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT + ) ) @@ -88,12 +90,7 @@ def produce_occurrence_to_kafka( try: topic = get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"] - if settings.TASKWORKER_USE_TASK_PRODUCER and in_random_rollout( - "tasks.producer.occurrences.rollout" - ): - _occurrence_task_producer.produce(ArroyoTopic(topic), payload) - else: - _occurrence_producer.produce(ArroyoTopic(topic), payload) + _occurrence_producer.produce(ArroyoTopic(topic), payload) except KafkaException: logger.exception( "Failed to send occurrence to issue platform", diff --git a/src/sentry/monitors/tasks/clock_pulse.py b/src/sentry/monitors/tasks/clock_pulse.py index ab4ad2f61d5096..000be34d6b2059 100644 --- a/src/sentry/monitors/tasks/clock_pulse.py +++ b/src/sentry/monitors/tasks/clock_pulse.py @@ -18,12 +18,11 @@ from sentry.conf.types.kafka_definition import Topic, get_topic_codec from sentry.monitors.clock_dispatch import try_monitor_clock_tick -from sentry.options.rollout import in_random_rollout from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task from sentry.taskworker.namespaces import crons_tasks from sentry.taskworker.producer import get_task_producer -from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer +from sentry.utils.arroyo_producer import get_arroyo_producer from sentry.utils.kafka_config import get_kafka_admin_cluster_options, get_topic_definition logger = logging.getLogger("sentry") @@ -39,8 +38,7 @@ def _get_producer(): ) -_checkin_producer = SingletonProducer(_get_producer) -_checkin_task_producer = get_task_producer( +_checkin_producer = get_task_producer( producer_name="sentry.monitors.tasks.clock_pulse", producer_factory=_get_producer, ) @@ -92,7 +90,4 @@ def clock_pulse(current_datetime=None): topic = ArroyoTopic(get_topic_definition(Topic.INGEST_MONITORS)["real_topic_name"]) for partition in _get_partitions().values(): dest = Partition(topic, partition.id) - if in_random_rollout("tasks.producer.clock-pulse.rollout"): - _checkin_task_producer.produce(dest, payload) - else: - _checkin_producer.produce(dest, payload) + _checkin_producer.produce(dest, payload) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 0202306afe1a78..1cd66f05646ce0 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3787,30 +3787,6 @@ flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE, ) -# Rolls out the new TaskProducer to calls of produce_occurrence_to_kafka() from within taskworkers -register( - "tasks.producer.occurrences.rollout", - type=Float, - default=0.0, - flags=FLAG_AUTOMATOR_MODIFIABLE, -) - -# Rolls out the new TaskProducer to the clock_pulse task -register( - "tasks.producer.clock-pulse.rollout", - type=Float, - default=0.0, - flags=FLAG_AUTOMATOR_MODIFIABLE, -) - -# Rolls out the new TaskProducer to calls of produce_snapshot_to_kafka from within taskworkers -register( - "tasks.producer.snapshots.rollout", - type=Float, - default=0.0, - flags=FLAG_AUTOMATOR_MODIFIABLE, -) - # Rolls out the new TaskProducer to profiling tasks register( "tasks.producer.profiles.rollout", diff --git a/tests/sentry/monitors/tasks/test_clock_pulse.py b/tests/sentry/monitors/tasks/test_clock_pulse.py index bab659522f2efe..47b84d1a844479 100644 --- a/tests/sentry/monitors/tasks/test_clock_pulse.py +++ b/tests/sentry/monitors/tasks/test_clock_pulse.py @@ -7,10 +7,8 @@ from django.test import override_settings from sentry.monitors.tasks.clock_pulse import MONITOR_CODEC, clock_pulse -from sentry.testutils.helpers.options import override_options -@override_options({"tasks.producer.clock-pulse.rollout": 0.0}) @override_settings( KAFKA_TOPIC_OVERRIDES={"ingest-monitors": "monitors-test-topic"}, KAFKA_TOPIC_TO_CLUSTER={"monitors-test-topic": "default"}, @@ -39,34 +37,3 @@ def test_clock_pulse(checkin_producer_mock: mock.MagicMock) -> None: [], ), ) - - -@override_options({"tasks.producer.clock-pulse.rollout": 1.0}) -@override_settings( - KAFKA_TOPIC_OVERRIDES={"ingest-monitors": "monitors-test-topic"}, - KAFKA_TOPIC_TO_CLUSTER={"monitors-test-topic": "default"}, -) -@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream") -@mock.patch("sentry.monitors.tasks.clock_pulse._checkin_task_producer") -def test_clock_pulse_task_producer(checkin_task_producer_mock: mock.MagicMock) -> None: - partition_count = 2 - - mock_partitions: MutableMapping[int, PartitionMetadata] = {} - for idx in range(partition_count): - mock_partitions[idx] = PartitionMetadata() - mock_partitions[idx].id = idx - - with mock.patch("sentry.monitors.tasks.clock_pulse._get_partitions", lambda: mock_partitions): - clock_pulse() - - # One clock pulse per partition - assert checkin_task_producer_mock.produce.call_count == len(mock_partitions.items()) - for idx in range(partition_count): - assert checkin_task_producer_mock.produce.mock_calls[idx] == mock.call( - Partition(Topic("monitors-test-topic"), idx), - KafkaPayload( - None, - MONITOR_CODEC.encode({"message_type": "clock_pulse"}), - [], - ), - )