Skip to content

Commit ce97b1d

Browse files
KAFKA-16894: Exploit share feature [3/N] (#19542)
This PR uses the v1 of the ShareVersion feature to enable share groups for KIP-932. Previously, there were two potential configs which could be used - `group.share.enable=true` and including "share" in `group.coordinator.rebalance.protocols`. After this PR, the first of these is retained, but the second is not. Instead, the preferred switch is the ShareVersion feature. The `group.share.enable` config is temporarily retained for testing and situations in which it is inconvenient to set the feature, but it should really not be necessary, especially when we get to AK 4.2. The aim is to remove this internal config at that point. No tests should be setting `group.share.enable` any more, because they can use the feature (which is enabled in test environments by default because that's how features work). For tests which need to disable share groups, they now set the share feature to v0. The majority of the code changes were related to correct initialisation of the metadata cache in tests now that a feature is used. Reviewers: Apoorv Mittal <[email protected]>
1 parent 08f6042 commit ce97b1d

File tree

18 files changed

+315
-611
lines changed

18 files changed

+315
-611
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.apache.kafka.common.test.api.ClusterConfigProperty;
5656
import org.apache.kafka.common.test.api.ClusterTest;
5757
import org.apache.kafka.common.test.api.ClusterTestDefaults;
58-
import org.apache.kafka.common.test.api.Type;
5958
import org.apache.kafka.common.utils.Utils;
6059
import org.apache.kafka.coordinator.group.GroupConfig;
6160
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
@@ -113,8 +112,6 @@
113112
@ClusterTestDefaults(
114113
serverProperties = {
115114
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
116-
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
117-
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
118115
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
119116
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
120117
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
@@ -123,8 +120,7 @@
123120
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
124121
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
125122
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1")
126-
},
127-
types = {Type.KRAFT}
123+
}
128124
)
129125
public class ShareConsumerTest {
130126
private final ClusterInstance cluster;
@@ -1851,8 +1847,6 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
18511847
brokers = 3,
18521848
serverProperties = {
18531849
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
1854-
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
1855-
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
18561850
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
18571851
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
18581852
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@@ -2002,8 +1996,6 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception {
20021996
brokers = 3,
20031997
serverProperties = {
20041998
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
2005-
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
2006-
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
20071999
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
20082000
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
20092001
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),

core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class KafkaApisBuilder {
6969
private DelegationTokenManager tokenManager = null;
7070
private ApiVersionManager apiVersionManager = null;
7171
private ClientMetricsManager clientMetricsManager = null;
72-
private Optional<ShareCoordinator> shareCoordinator = Optional.empty();
72+
private ShareCoordinator shareCoordinator = null;
7373
private GroupConfigManager groupConfigManager = null;
7474

7575
public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
@@ -97,7 +97,7 @@ public KafkaApisBuilder setTxnCoordinator(TransactionCoordinator txnCoordinator)
9797
return this;
9898
}
9999

100-
public KafkaApisBuilder setShareCoordinator(Optional<ShareCoordinator> shareCoordinator) {
100+
public KafkaApisBuilder setShareCoordinator(ShareCoordinator shareCoordinator) {
101101
this.shareCoordinator = shareCoordinator;
102102
return this;
103103
}
@@ -194,8 +194,8 @@ public KafkaApis build() {
194194
if (replicaManager == null) throw new RuntimeException("You must set replicaManager");
195195
if (groupCoordinator == null) throw new RuntimeException("You must set groupCoordinator");
196196
if (txnCoordinator == null) throw new RuntimeException("You must set txnCoordinator");
197-
if (autoTopicCreationManager == null)
198-
throw new RuntimeException("You must set autoTopicCreationManager");
197+
if (shareCoordinator == null) throw new RuntimeException("You must set shareCoordinator");
198+
if (autoTopicCreationManager == null) throw new RuntimeException("You must set autoTopicCreationManager");
199199
if (config == null) config = new KafkaConfig(Map.of());
200200
if (configRepository == null) throw new RuntimeException("You must set configRepository");
201201
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
@@ -213,7 +213,7 @@ public KafkaApis build() {
213213
replicaManager,
214214
groupCoordinator,
215215
txnCoordinator,
216-
OptionConverters.toScala(shareCoordinator),
216+
shareCoordinator,
217217
autoTopicCreationManager,
218218
brokerId,
219219
config,

core/src/main/scala/kafka/server/AutoTopicCreationManager.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class DefaultAutoTopicCreationManager(
5959
channelManager: NodeToControllerChannelManager,
6060
groupCoordinator: GroupCoordinator,
6161
txnCoordinator: TransactionCoordinator,
62-
shareCoordinator: Option[ShareCoordinator]
62+
shareCoordinator: ShareCoordinator
6363
) extends AutoTopicCreationManager with Logging {
6464

6565
private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
@@ -198,15 +198,11 @@ class DefaultAutoTopicCreationManager(
198198
.setConfigs(convertToTopicConfigCollections(
199199
txnCoordinator.transactionTopicConfigs))
200200
case SHARE_GROUP_STATE_TOPIC_NAME =>
201-
val props = shareCoordinator match {
202-
case Some(coordinator) => coordinator.shareGroupStateTopicConfigs()
203-
case None => new Properties()
204-
}
205201
new CreatableTopic()
206202
.setName(topic)
207203
.setNumPartitions(config.shareCoordinatorConfig.shareCoordinatorStateTopicNumPartitions())
208204
.setReplicationFactor(config.shareCoordinatorConfig.shareCoordinatorStateTopicReplicationFactor())
209-
.setConfigs(convertToTopicConfigCollections(props))
205+
.setConfigs(convertToTopicConfigCollections(shareCoordinator.shareGroupStateTopicConfigs()))
210206
case topicName =>
211207
new CreatableTopic()
212208
.setName(topicName)

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class BrokerServer(
126126

127127
var transactionCoordinator: TransactionCoordinator = _
128128

129-
var shareCoordinator: Option[ShareCoordinator] = None
129+
var shareCoordinator: ShareCoordinator = _
130130

131131
var clientToControllerChannelManager: NodeToControllerChannelManager = _
132132

@@ -629,49 +629,43 @@ class BrokerServer(
629629
.build()
630630
}
631631

632-
private def createShareCoordinator(): Option[ShareCoordinator] = {
633-
if (config.shareGroupConfig.isShareGroupEnabled &&
634-
config.shareGroupConfig.shareGroupPersisterClassName().nonEmpty) {
635-
val time = Time.SYSTEM
636-
val timer = new SystemTimerReaper(
637-
"share-coordinator-reaper",
638-
new SystemTimer("share-coordinator")
639-
)
632+
private def createShareCoordinator(): ShareCoordinator = {
633+
val time = Time.SYSTEM
634+
val timer = new SystemTimerReaper(
635+
"share-coordinator-reaper",
636+
new SystemTimer("share-coordinator")
637+
)
640638

641-
val serde = new ShareCoordinatorRecordSerde
642-
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
643-
time,
644-
replicaManager,
645-
serde,
646-
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
647-
)
648-
val writer = new CoordinatorPartitionWriter(
649-
replicaManager
650-
)
651-
Some(new ShareCoordinatorService.Builder(config.brokerId, config.shareCoordinatorConfig)
652-
.withTimer(timer)
653-
.withTime(time)
654-
.withLoader(loader)
655-
.withWriter(writer)
656-
.withCoordinatorRuntimeMetrics(new ShareCoordinatorRuntimeMetrics(metrics))
657-
.withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
658-
.build())
659-
} else {
660-
None
661-
}
639+
val serde = new ShareCoordinatorRecordSerde
640+
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
641+
time,
642+
replicaManager,
643+
serde,
644+
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
645+
)
646+
val writer = new CoordinatorPartitionWriter(
647+
replicaManager
648+
)
649+
new ShareCoordinatorService.Builder(config.brokerId, config.shareCoordinatorConfig)
650+
.withTimer(timer)
651+
.withTime(time)
652+
.withLoader(loader)
653+
.withWriter(writer)
654+
.withCoordinatorRuntimeMetrics(new ShareCoordinatorRuntimeMetrics(metrics))
655+
.withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
656+
.build()
662657
}
663658

664659
private def createShareStatePersister(): Persister = {
665-
if (config.shareGroupConfig.isShareGroupEnabled &&
666-
config.shareGroupConfig.shareGroupPersisterClassName.nonEmpty) {
660+
if (config.shareGroupConfig.shareGroupPersisterClassName.nonEmpty) {
667661
val klass = Utils.loadClass(config.shareGroupConfig.shareGroupPersisterClassName, classOf[Object]).asInstanceOf[Class[Persister]]
668662

669663
if (klass.getName.equals(classOf[DefaultStatePersister].getName)) {
670664
klass.getConstructor(classOf[PersisterStateManager])
671665
.newInstance(
672666
new PersisterStateManager(
673667
NetworkUtils.buildNetworkClient("Persister", config, metrics, Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
674-
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.get.partitionFor(key), config.interBrokerListenerName),
668+
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.partitionFor(key), config.interBrokerListenerName),
675669
Time.SYSTEM,
676670
new SystemTimerReaper(
677671
"persister-state-manager-reaper",
@@ -782,8 +776,8 @@ class BrokerServer(
782776
CoreUtils.swallow(groupConfigManager.close(), this)
783777
if (groupCoordinator != null)
784778
CoreUtils.swallow(groupCoordinator.shutdown(), this)
785-
if (shareCoordinator.isDefined)
786-
CoreUtils.swallow(shareCoordinator.get.shutdown(), this)
779+
if (shareCoordinator != null)
780+
CoreUtils.swallow(shareCoordinator.shutdown(), this)
787781

788782
if (assignmentsManager != null)
789783
CoreUtils.swallow(assignmentsManager.close(), this)

0 commit comments

Comments
 (0)