Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
self._source_manager.register(
source,
topic,
self._get_internal_producer(transactional=False),
self._get_internal_producer(),
self._get_internal_consumer(
extra_config_overrides=consumer_extra_config_overrides
),
Expand Down
31 changes: 31 additions & 0 deletions tests/test_quixstreams/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2354,6 +2354,37 @@ def _run_app(source, done):
)
_run_app(source, finished)

@pytest.mark.parametrize("processing_guarantee", ["at-least-once", "exactly-once"])
def test_source_producer_transactional_setting(
self, app_factory, processing_guarantee
):
"""
Test that source producers respect the processing_guarantee setting.

When processing_guarantee="exactly-once", the source producer should be transactional.
When processing_guarantee="at-least-once", the source producer should not be transactional.
"""
app = app_factory(processing_guarantee=processing_guarantee)

# Create a dummy source
source = DummySource(values=[{"test": "data"}])

# Add the source to the app - this should create a producer with the correct transactional setting
app.add_source(source)

# Get the source process from the source manager
source_process = app._source_manager.processes[0]

# Verify the producer's transactional setting matches the processing guarantee
expected_transactional = processing_guarantee == "exactly-once"
assert (
source_process._producer._producer.transactional == expected_transactional
), (
f"Expected producer.transactional={expected_transactional} for "
f"processing_guarantee='{processing_guarantee}', but got "
f"producer.transactional={source_process._producer._producer.transactional}"
)


class TestApplicationMultipleSdf:
def test_multiple_sdfs(
Expand Down