Skip to content

Commit dd1a36b

Browse files
committed
requested by review
1 parent 1ae852b commit dd1a36b

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

quixstreams/dataframe/registry.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def register_groupby(
7979
Register a "groupby" SDF, which is one generated with `SDF.group_by()`.
8080
:param source_sdf: the SDF used by `sdf.group_by()`
8181
:param new_sdf: the SDF generated by `sdf.group_by()`.
82+
:param register_new_root: whether to register the new SDF as a root SDF.
8283
"""
8384
if source_sdf.stream_id in self._repartition_origins:
8485
raise GroupByNestingLimit(
@@ -87,7 +88,7 @@ def register_groupby(
8788

8889
if new_sdf.stream_id in self._repartition_origins:
8990
raise GroupByDuplicate(
90-
"A `SDF.group_by()` operation appears to be the same as another, "
91+
"An `SDF.group_by()` operation appears to be the same as another, "
9192
"either from using the same column or name parameter; "
9293
"adjust by setting a unique name with `SDF.group_by(name=<NAME>)` "
9394
)
@@ -99,7 +100,7 @@ def register_groupby(
99100
self.register_root(new_sdf)
100101
except StreamingDataFrameDuplicate:
101102
raise GroupByDuplicate(
102-
"A `SDF.group_by()` operation appears to be the same as another, "
103+
"An `SDF.group_by()` operation appears to be the same as another, "
103104
"either from using the same column or name parameter; "
104105
"adjust by setting a unique name with `SDF.group_by(name=<NAME>)` "
105106
)

tests/test_quixstreams/test_dataframe/test_dataframe.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -1682,10 +1682,11 @@ def test_group_by_column(
16821682
sdf = sdf.group_by(col)
16831683
sdf[col] = col_update
16841684

1685-
groupby_topic = sdf.topics[0]
16861685
if num_partitions == 1:
1686+
groupby_topic = topic
16871687
assert sdf_registry.consumer_topics == [topic]
16881688
else:
1689+
groupby_topic = sdf.topics[0]
16891690
assert sdf_registry.consumer_topics == [topic, groupby_topic]
16901691
assert groupby_topic.name.startswith("repartition__")
16911692

@@ -1702,7 +1703,7 @@ def test_group_by_column(
17021703
if num_partitions == 1:
17031704
post_groupby_branch_result = pre_groupby_branch_result
17041705
else:
1705-
with internal_producer_factory(auto_offset_reset="earliest") as consumer:
1706+
with internal_consumer_factory(auto_offset_reset="earliest") as consumer:
17061707
consumer.subscribe([groupby_topic])
17071708
consumed_row = consumer.poll_row(timeout=5.0)
17081709

@@ -1773,10 +1774,11 @@ def test_group_by_column_with_name(
17731774
sdf = sdf.group_by(col, name=op_name)
17741775
sdf[col] = col_update
17751776

1776-
groupby_topic = sdf.topics[0]
17771777
if num_partitions == 1:
1778+
groupby_topic = topic
17781779
assert sdf_registry.consumer_topics == [topic]
17791780
else:
1781+
groupby_topic = sdf.topics[0]
17801782
assert sdf_registry.consumer_topics == [topic, groupby_topic]
17811783
assert groupby_topic.name.startswith("repartition__")
17821784

@@ -1864,10 +1866,11 @@ def test_group_by_func(
18641866
sdf = sdf.group_by(lambda v: v[col], name=op_name)
18651867
sdf[col] = col_update
18661868

1867-
groupby_topic = sdf.topics[0]
18681869
if num_partitions == 1:
1870+
groupby_topic = topic
18691871
assert sdf_registry.consumer_topics == [topic]
18701872
else:
1873+
groupby_topic = sdf.topics[0]
18711874
assert sdf_registry.consumer_topics == [topic, groupby_topic]
18721875
assert groupby_topic.name.startswith("repartition__")
18731876

0 commit comments

Comments
 (0)