Skip to content

Commit aa56dfb

Browse files
committed
KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()
Author: Guozhang Wang <[email protected]> Reviewers: Jason Gustafson, Jun Rao Closes apache#352 from guozhangwang/K2686
1 parent bf292a6 commit aa56dfb

File tree

9 files changed

+142
-90
lines changed

9 files changed

+142
-90
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,9 @@ public Set<String> subscription() {
629629
* assign partitions. Topic subscriptions are not incremental. This list will replace the current
630630
* assignment (if there is one). Note that it is not possible to combine topic subscription with group management
631631
* with manual partition assignment through {@link #assign(List)}.
632+
*
633+
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
634+
*
632635
* <p>
633636
* As part of group management, the consumer will keep track of the list of consumers that belong to a particular
634637
* group and will trigger a rebalance operation if one of the following events trigger -
@@ -653,9 +656,14 @@ public Set<String> subscription() {
653656
public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
654657
acquire();
655658
try {
656-
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
657-
this.subscriptions.subscribe(topics, listener);
658-
metadata.setTopics(subscriptions.groupSubscription());
659+
if (topics.isEmpty()) {
660+
// treat subscribing to empty topic list as the same as unsubscribing
661+
this.unsubscribe();
662+
} else {
663+
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
664+
this.subscriptions.subscribe(topics, listener);
665+
metadata.setTopics(subscriptions.groupSubscription());
666+
}
659667
} finally {
660668
release();
661669
}
@@ -666,6 +674,9 @@ public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
666674
* assign partitions. Topic subscriptions are not incremental. This list will replace the current
667675
* assignment (if there is one). It is not possible to combine topic subscription with group management
668676
* with manual partition assignment through {@link #assign(List)}.
677+
*
678+
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
679+
*
669680
* <p>
670681
* This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which
671682
* uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
@@ -715,6 +726,7 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
715726
public void unsubscribe() {
716727
acquire();
717728
try {
729+
log.debug("Unsubscribed all topics or patterns and assigned partitions");
718730
this.subscriptions.unsubscribe();
719731
this.coordinator.resetGeneration();
720732
this.metadata.needMetadataForAllTopics(false);
@@ -739,7 +751,7 @@ public void assign(List<TopicPartition> partitions) {
739751
acquire();
740752
try {
741753
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
742-
this.subscriptions.assign(partitions);
754+
this.subscriptions.assignFromUser(partitions);
743755
Set<String> topics = new HashSet<>();
744756
for (TopicPartition tp : partitions)
745757
topics.add(tp.topic());

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public Set<TopicPartition> assignment() {
7676
public void rebalance(Collection<TopicPartition> newAssignment) {
7777
// TODO: Rebalance callbacks
7878
this.records.clear();
79-
this.subscriptions.changePartitionAssignment(newAssignment);
79+
this.subscriptions.assignFromSubscribed(newAssignment);
8080
}
8181

8282
@Override
@@ -112,7 +112,7 @@ public void subscribe(List<String> topics, final ConsumerRebalanceListener liste
112112
@Override
113113
public void assign(List<TopicPartition> partitions) {
114114
ensureNotClosed();
115-
this.subscriptions.assign(partitions);
115+
this.subscriptions.assignFromUser(partitions);
116116
}
117117

118118
@Override

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ protected void onJoin(int generation,
169169
subscriptions.needRefreshCommits();
170170

171171
// update partition assignment
172-
subscriptions.changePartitionAssignment(assignment.partitions());
172+
subscriptions.assignFromSubscribed(assignment.partitions());
173173

174174
// give the assignor a chance to update internal state based on the received assignment
175175
assignor.onAssignment(assignment);

clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

+24-14
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828

2929
/**
3030
* A class for tracking the topics, partitions, and offsets for the consumer. A partition
31-
* is "assigned" either directly with {@link #assign(List)} (manual assignment)
32-
* or with {@link #changePartitionAssignment(Collection)} (automatic assignment).
31+
* is "assigned" either directly with {@link #assignFromUser(Collection)} (manual assignment)
32+
* or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription).
3333
*
3434
* Once assigned, the partition is not considered "fetchable" until its initial position has
3535
* been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
@@ -129,12 +129,16 @@ public void groupSubscribe(Collection<String> topics) {
129129
}
130130

131131
public void needReassignment() {
132-
//
133132
this.groupSubscription.retainAll(subscription);
134133
this.needsPartitionAssignment = true;
135134
}
136135

137-
public void assign(List<TopicPartition> partitions) {
136+
/**
137+
* Change the assignment to the specified partitions provided by the user,
138+
* note this is different from {@link #assignFromSubscribed(Collection)}
139+
* whose input partitions are provided from the subscribed topics.
140+
*/
141+
public void assignFromUser(Collection<TopicPartition> partitions) {
138142
if (!this.subscription.isEmpty() || this.subscribedPattern != null)
139143
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
140144

@@ -146,6 +150,22 @@ public void assign(List<TopicPartition> partitions) {
146150
addAssignedPartition(partition);
147151

148152
this.assignment.keySet().retainAll(this.userAssignment);
153+
154+
this.needsPartitionAssignment = false;
155+
}
156+
157+
/**
158+
* Change the assignment to the specified partitions returned from the coordinator,
159+
* note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
160+
*/
161+
public void assignFromSubscribed(Collection<TopicPartition> assignments) {
162+
for (TopicPartition tp : assignments)
163+
if (!this.subscription.contains(tp.topic()))
164+
throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
165+
this.assignment.clear();
166+
for (TopicPartition tp: assignments)
167+
addAssignedPartition(tp);
168+
this.needsPartitionAssignment = false;
149169
}
150170

151171
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -306,16 +326,6 @@ public boolean partitionAssignmentNeeded() {
306326
return this.needsPartitionAssignment;
307327
}
308328

309-
public void changePartitionAssignment(Collection<TopicPartition> assignments) {
310-
for (TopicPartition tp : assignments)
311-
if (!this.subscription.contains(tp.topic()))
312-
throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
313-
this.assignment.clear();
314-
for (TopicPartition tp: assignments)
315-
addAssignedPartition(tp);
316-
this.needsPartitionAssignment = false;
317-
}
318-
319329
public boolean isAssigned(TopicPartition tp) {
320330
return assignment.containsKey(tp);
321331
}

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

+32
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@
1717
package org.apache.kafka.clients.consumer;
1818

1919
import org.apache.kafka.common.KafkaException;
20+
import org.apache.kafka.common.TopicPartition;
2021
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
2122
import org.apache.kafka.test.MockMetricsReporter;
2223
import org.junit.Assert;
2324
import org.junit.Test;
2425

26+
import java.util.Collections;
2527
import java.util.Properties;
2628

2729
public class KafkaConsumerTest {
2830

31+
private final String topic = "test";
32+
private final TopicPartition tp0 = new TopicPartition("test", 0);
33+
2934
@Test
3035
public void testConstructorClose() throws Exception {
3136
Properties props = new Properties();
@@ -46,4 +51,31 @@ public void testConstructorClose() throws Exception {
4651
}
4752
Assert.fail("should have caught an exception and returned");
4853
}
54+
55+
@Test
56+
public void testSubscription() {
57+
Properties props = new Properties();
58+
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSubscription");
59+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
60+
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
61+
62+
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
63+
props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
64+
65+
consumer.subscribe(Collections.singletonList(topic));
66+
Assert.assertEquals(Collections.singleton(topic), consumer.subscription());
67+
Assert.assertTrue(consumer.assignment().isEmpty());
68+
69+
consumer.subscribe(Collections.<String>emptyList());
70+
Assert.assertTrue(consumer.subscription().isEmpty());
71+
Assert.assertTrue(consumer.assignment().isEmpty());
72+
73+
consumer.assign(Collections.singletonList(tp0));
74+
Assert.assertTrue(consumer.subscription().isEmpty());
75+
Assert.assertEquals(Collections.singleton(tp0), consumer.assignment());
76+
77+
consumer.unsubscribe();
78+
Assert.assertTrue(consumer.subscription().isEmpty());
79+
Assert.assertTrue(consumer.assignment().isEmpty());
80+
}
4981
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public void testIllegalGeneration() {
193193

194194
// illegal_generation will cause re-partition
195195
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
196-
subscriptions.changePartitionAssignment(Collections.singletonList(tp));
196+
subscriptions.assignFromSubscribed(Collections.singletonList(tp));
197197

198198
time.sleep(sessionTimeoutMs);
199199
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
@@ -217,7 +217,7 @@ public void testUnknownConsumerId() {
217217

218218
// illegal_generation will cause re-partition
219219
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
220-
subscriptions.changePartitionAssignment(Collections.singletonList(tp));
220+
subscriptions.assignFromSubscribed(Collections.singletonList(tp));
221221

222222
time.sleep(sessionTimeoutMs);
223223
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
@@ -413,7 +413,7 @@ public void testInvalidSessionTimeout() {
413413

414414
@Test
415415
public void testCommitOffsetOnly() {
416-
subscriptions.assign(Arrays.asList(tp));
416+
subscriptions.assignFromUser(Arrays.asList(tp));
417417

418418
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
419419
coordinator.ensureCoordinatorKnown();
@@ -430,7 +430,7 @@ public void testCommitOffsetOnly() {
430430

431431
@Test
432432
public void testCommitOffsetMetadata() {
433-
subscriptions.assign(Arrays.asList(tp));
433+
subscriptions.assignFromUser(Arrays.asList(tp));
434434

435435
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
436436
coordinator.ensureCoordinatorKnown();
@@ -473,7 +473,7 @@ public void testResetGeneration() {
473473
// now switch to manual assignment
474474
subscriptions.unsubscribe();
475475
coordinator.resetGeneration();
476-
subscriptions.assign(Arrays.asList(tp));
476+
subscriptions.assignFromUser(Arrays.asList(tp));
477477

478478
// the client should not reuse generation/memberId from auto-subscribed generation
479479
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -612,7 +612,7 @@ public void testRefreshOffset() {
612612
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
613613
coordinator.ensureCoordinatorKnown();
614614

615-
subscriptions.assign(Arrays.asList(tp));
615+
subscriptions.assignFromUser(Arrays.asList(tp));
616616
subscriptions.needRefreshCommits();
617617
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
618618
coordinator.refreshCommittedOffsetsIfNeeded();
@@ -625,7 +625,7 @@ public void testRefreshOffsetLoadInProgress() {
625625
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
626626
coordinator.ensureCoordinatorKnown();
627627

628-
subscriptions.assign(Arrays.asList(tp));
628+
subscriptions.assignFromUser(Arrays.asList(tp));
629629
subscriptions.needRefreshCommits();
630630
client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
631631
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
@@ -639,7 +639,7 @@ public void testRefreshOffsetNotCoordinatorForConsumer() {
639639
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
640640
coordinator.ensureCoordinatorKnown();
641641

642-
subscriptions.assign(Arrays.asList(tp));
642+
subscriptions.assignFromUser(Arrays.asList(tp));
643643
subscriptions.needRefreshCommits();
644644
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
645645
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -654,7 +654,7 @@ public void testRefreshOffsetWithNoFetchableOffsets() {
654654
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
655655
coordinator.ensureCoordinatorKnown();
656656

657-
subscriptions.assign(Arrays.asList(tp));
657+
subscriptions.assignFromUser(Arrays.asList(tp));
658658
subscriptions.needRefreshCommits();
659659
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
660660
coordinator.refreshCommittedOffsetsIfNeeded();

0 commit comments

Comments
 (0)