Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions src/sentry/issues/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from sentry.models.group import Group
from sentry.models.groupassignee import GroupAssignee
from sentry.models.groupowner import GroupOwner, GroupOwnerType
from sentry.options.rollout import in_random_rollout
from sentry.signals import issue_assigned, issue_deleted, issue_unassigned, post_update
from sentry.taskworker.producer import get_task_producer
from sentry.utils import json, metrics, snuba
Expand Down Expand Up @@ -56,13 +55,17 @@ def _get_attribute_snapshot_producer(name: str = "sentry.issues.attributes") ->
)


_attribute_snapshot_producer = SingletonProducer(
_get_attribute_snapshot_producer, max_futures=settings.SENTRY_GROUP_ATTRIBUTES_FUTURES_MAX_LIMIT
)
_task_producer_name = "sentry.tasks.issues.attributes"
_attribute_snapshot_task_producer = get_task_producer(
producer_name=_task_producer_name,
producer_factory=partial(_get_attribute_snapshot_producer, name=_task_producer_name),
_attribute_snapshot_producer = (
get_task_producer(
producer_name=_task_producer_name,
producer_factory=partial(_get_attribute_snapshot_producer, name=_task_producer_name),
)
if settings.TASKWORKER_USE_TASK_PRODUCER
else SingletonProducer(
_get_attribute_snapshot_producer,
max_futures=settings.SENTRY_GROUP_ATTRIBUTES_FUTURES_MAX_LIMIT,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import-time task producer selection

High Severity

_attribute_snapshot_producer is chosen once at import using settings.TASKWORKER_USE_TASK_PRODUCER. sentry.issues loads this module in AppConfig.ready() during configure(), before taskworker bootstrap sets that flag to True, so taskworker processes can keep a SingletonProducer for snapshot Kafka writes even when they should use TaskProducer.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 33bdf82. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is valid, so I'll have to rethink how selecting the producer backend should work.
IMO ideally we'd just have a single producer we could use in both taskworker and non-taskworker processes.

)


Expand Down Expand Up @@ -131,18 +134,10 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None:
raise snuba.SnubaError(err)
else:
payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), [])
if settings.TASKWORKER_USE_TASK_PRODUCER and in_random_rollout(
"tasks.producer.snapshots.rollout"
):
_attribute_snapshot_task_producer.produce(
ArroyoTopic(get_topic_definition(Topic.GROUP_ATTRIBUTES)["real_topic_name"]),
payload,
)
else:
_attribute_snapshot_producer.produce(
ArroyoTopic(get_topic_definition(Topic.GROUP_ATTRIBUTES)["real_topic_name"]),
payload,
)
_attribute_snapshot_producer.produce(
ArroyoTopic(get_topic_definition(Topic.GROUP_ATTRIBUTES)["real_topic_name"]),
payload,
)


def _bulk_retrieve_group_values(group_ids: list[int]) -> list[GroupValues]:
Expand Down
25 changes: 11 additions & 14 deletions src/sentry/issues/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.run import process_message
from sentry.issues.status_change_message import StatusChangeMessage
from sentry.options.rollout import in_random_rollout
from sentry.taskworker.producer import get_task_producer
from sentry.utils import json
from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer
Expand Down Expand Up @@ -47,13 +46,16 @@ def _get_occurrence_producer(name: str = "sentry.issues.producer") -> KafkaProdu
)


_occurrence_producer = SingletonProducer(
_get_occurrence_producer, max_futures=settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT
)

_occurrence_task_producer = get_task_producer(
producer_name="sentry.issues.tasks.producer",
producer_factory=partial(_get_occurrence_producer, name="sentry.issues.tasks.producer"),
_task_producer_name = "sentry.issues.tasks.producer"
_occurrence_producer = (
get_task_producer(
producer_name=_task_producer_name,
producer_factory=partial(_get_occurrence_producer, name=_task_producer_name),
)
if settings.TASKWORKER_USE_TASK_PRODUCER
else SingletonProducer(
_get_occurrence_producer, max_futures=settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT
)
)


Expand Down Expand Up @@ -88,12 +90,7 @@ def produce_occurrence_to_kafka(

try:
topic = get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"]
if settings.TASKWORKER_USE_TASK_PRODUCER and in_random_rollout(
"tasks.producer.occurrences.rollout"
):
_occurrence_task_producer.produce(ArroyoTopic(topic), payload)
else:
_occurrence_producer.produce(ArroyoTopic(topic), payload)
_occurrence_producer.produce(ArroyoTopic(topic), payload)
except KafkaException:
logger.exception(
"Failed to send occurrence to issue platform",
Expand Down
11 changes: 3 additions & 8 deletions src/sentry/monitors/tasks/clock_pulse.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.clock_dispatch import try_monitor_clock_tick
from sentry.options.rollout import in_random_rollout
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import crons_tasks
from sentry.taskworker.producer import get_task_producer
from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer
from sentry.utils.arroyo_producer import get_arroyo_producer
from sentry.utils.kafka_config import get_kafka_admin_cluster_options, get_topic_definition

logger = logging.getLogger("sentry")
Expand All @@ -39,8 +38,7 @@ def _get_producer():
)


_checkin_producer = SingletonProducer(_get_producer)
_checkin_task_producer = get_task_producer(
_checkin_producer = get_task_producer(
producer_name="sentry.monitors.tasks.clock_pulse",
producer_factory=_get_producer,
)
Comment thread
bmckerry marked this conversation as resolved.
Expand Down Expand Up @@ -92,7 +90,4 @@ def clock_pulse(current_datetime=None):
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_MONITORS)["real_topic_name"])
for partition in _get_partitions().values():
dest = Partition(topic, partition.id)
if in_random_rollout("tasks.producer.clock-pulse.rollout"):
_checkin_task_producer.produce(dest, payload)
else:
_checkin_producer.produce(dest, payload)
_checkin_producer.produce(dest, payload)
24 changes: 0 additions & 24 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3787,30 +3787,6 @@
flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
)

# Rolls out the new TaskProducer to calls of produce_occurrence_to_kafka() from within taskworkers
register(
"tasks.producer.occurrences.rollout",
type=Float,
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
Comment thread
bmckerry marked this conversation as resolved.

# Rolls out the new TaskProducer to the clock_pulse task
register(
"tasks.producer.clock-pulse.rollout",
type=Float,
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Rolls out the new TaskProducer to calls of produce_snapshot_to_kafka from within taskworkers
register(
"tasks.producer.snapshots.rollout",
type=Float,
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Rolls out the new TaskProducer to profiling tasks
register(
"tasks.producer.profiles.rollout",
Expand Down
33 changes: 0 additions & 33 deletions tests/sentry/monitors/tasks/test_clock_pulse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
from django.test import override_settings

from sentry.monitors.tasks.clock_pulse import MONITOR_CODEC, clock_pulse
from sentry.testutils.helpers.options import override_options


@override_options({"tasks.producer.clock-pulse.rollout": 0.0})
@override_settings(
KAFKA_TOPIC_OVERRIDES={"ingest-monitors": "monitors-test-topic"},
KAFKA_TOPIC_TO_CLUSTER={"monitors-test-topic": "default"},
Expand Down Expand Up @@ -39,34 +37,3 @@ def test_clock_pulse(checkin_producer_mock: mock.MagicMock) -> None:
[],
),
)


@override_options({"tasks.producer.clock-pulse.rollout": 1.0})
@override_settings(
KAFKA_TOPIC_OVERRIDES={"ingest-monitors": "monitors-test-topic"},
KAFKA_TOPIC_TO_CLUSTER={"monitors-test-topic": "default"},
)
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
@mock.patch("sentry.monitors.tasks.clock_pulse._checkin_task_producer")
def test_clock_pulse_task_producer(checkin_task_producer_mock: mock.MagicMock) -> None:
partition_count = 2

mock_partitions: MutableMapping[int, PartitionMetadata] = {}
for idx in range(partition_count):
mock_partitions[idx] = PartitionMetadata()
mock_partitions[idx].id = idx

with mock.patch("sentry.monitors.tasks.clock_pulse._get_partitions", lambda: mock_partitions):
clock_pulse()

# One clock pulse per partition
assert checkin_task_producer_mock.produce.call_count == len(mock_partitions.items())
for idx in range(partition_count):
assert checkin_task_producer_mock.produce.mock_calls[idx] == mock.call(
Partition(Topic("monitors-test-topic"), idx),
KafkaPayload(
None,
MONITOR_CODEC.encode({"message_type": "clock_pulse"}),
[],
),
)
Loading