Skip to content

Commit

Permalink
KREST-4067 Introduce import control to Kafka REST. (confluentinc#1022)
Browse files Browse the repository at this point in the history
This change enables Checkstyle's ImportControl module so that
it's easier to detect when a non-public Kafka API is being
used. This is needed as non-public APIs provide much fewer
guarantees, at least in terms of API stability, and can be
source of breakages or subtle behavior changes.

For that purpose, Checkstyle has also been enabled on test
sources and a main Checkstyle config has been added.
- All Checkstyle rules other than ImportControl rules have
  been disabled for test sources though, in order to minimize
  the scope of this change.

All pre-existing usages of non-public APIs have been
grandfathered (i.e. allowlisted in the ImportControl ruleset).
Subsequent usages of non-public APIs will be much easier to
spot from here on though.
  • Loading branch information
dimitarndimitrov authored Jun 7, 2022
1 parent dc77403 commit 5ab4ff0
Show file tree
Hide file tree
Showing 49 changed files with 652 additions and 158 deletions.
12 changes: 12 additions & 0 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">

<module name="Checker">
<module name="TreeWalker">
<module name="ImportControl">
<property name="file" value="checkstyle/import_control.xml"/>
</module>
</module>
</module>
149 changes: 149 additions & 0 deletions checkstyle/import_control.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
<?xml version="1.0"?>
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.4//EN"
"http://www.puppycrawl.com/dtds/import_control_1_4.dtd">

<import-control pkg="io.confluent.kafkarest">

<!-- Apache Kafka 3.2.0 public API packages per https://kafka.apache.org/32/javadoc/index.html -->
<allow pkg="org.apache.kafka.clients.admin" exact-match="true" />
<allow pkg="org.apache.kafka.clients.consumer" exact-match="true" />
<allow pkg="org.apache.kafka.clients.producer" exact-match="true" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.acl" exact-match="true" />
<allow pkg="org.apache.kafka.common.annotation" exact-match="true" />
<allow pkg="org.apache.kafka.common.config" exact-match="true" />
<allow pkg="org.apache.kafka.common.config.provider" exact-match="true" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.header" exact-match="true" />
<allow pkg="org.apache.kafka.common.metrics" exact-match="true" />
<allow pkg="org.apache.kafka.common.metrics.stats" exact-match="true" />
<allow pkg="org.apache.kafka.common.quota" exact-match="true" />
<allow pkg="org.apache.kafka.common.resource" exact-match="true" />
<allow pkg="org.apache.kafka.common.security.auth" exact-match="true" />
<allow pkg="org.apache.kafka.common.security.oauthbearer" exact-match="true" />
<allow pkg="org.apache.kafka.common.security.oauthbearer.secured" exact-match="true" />
<allow pkg="org.apache.kafka.common.security.plain" exact-match="true" />
<allow pkg="org.apache.kafka.common.security.scram" exact-match="true" />
<allow pkg="org.apache.kafka.common.security.token.delegation" exact-match="true" />
<allow pkg="org.apache.kafka.common.serialization" exact-match="true" />
<allow pkg="org.apache.kafka.connect.components" exact-match="true" />
<allow pkg="org.apache.kafka.connect.connector" exact-match="true" />
<allow pkg="org.apache.kafka.connect.connector.policy" exact-match="true" />
<allow pkg="org.apache.kafka.connect.data" exact-match="true" />
<allow pkg="org.apache.kafka.connect.errors" exact-match="true" />
<allow pkg="org.apache.kafka.connect.header" exact-match="true" />
<allow pkg="org.apache.kafka.connect.health" exact-match="true" />
<allow pkg="org.apache.kafka.connect.mirror" exact-match="true" />
<allow pkg="org.apache.kafka.connect.rest" exact-match="true" />
<allow pkg="org.apache.kafka.connect.sink" exact-match="true" />
<allow pkg="org.apache.kafka.connect.source" exact-match="true" />
<allow pkg="org.apache.kafka.connect.storage" exact-match="true" />
<allow pkg="org.apache.kafka.connect.transforms" exact-match="true" />
<allow pkg="org.apache.kafka.connect.transforms.predicates" exact-match="true" />
<allow pkg="org.apache.kafka.connect.util" exact-match="true" />
<allow pkg="org.apache.kafka.server.authorizer" exact-match="true" />
<allow pkg="org.apache.kafka.server.log.remote.storage" exact-match="true" />
<allow pkg="org.apache.kafka.server.policy" exact-match="true" />
<allow pkg="org.apache.kafka.server.quota" exact-match="true" />
<allow pkg="org.apache.kafka.streams" exact-match="true" />
<allow pkg="org.apache.kafka.streams.errors" exact-match="true" />
<allow pkg="org.apache.kafka.streams.kstream" exact-match="true" />
<allow pkg="org.apache.kafka.streams.processor" exact-match="true" />
<allow pkg="org.apache.kafka.streams.processor.api" exact-match="true" />
<allow pkg="org.apache.kafka.streams.query" exact-match="true" />
<allow pkg="org.apache.kafka.streams.state" exact-match="true" />
<allow pkg="org.apache.kafka.streams.test" exact-match="true" />
<!-- A ctor for a public API type (Metrics) takes a parameter of this type, so it is essentially
a public API as well... -->
<allow class="org.apache.kafka.common.utils.Time" />
<!-- Nested classes, not handled by allow package rules with exact-match="true" -->
<allow class="org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult" />
<allow class="org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults" />
<allow class="org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo" />
<allow class="org.apache.kafka.common.config.ConfigDef.Importance" />
<allow class="org.apache.kafka.common.config.ConfigDef.Range" />
<allow class="org.apache.kafka.common.config.ConfigDef.Type" />
<allow class="org.apache.kafka.common.config.ConfigResource.Type" />
<!-- Deprecated, previously part of the public API -->
<allow class="org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo" />
<allow class="org.apache.kafka.common.requests.DescribeLogDirsResponse.ReplicaInfo" />
<!-- Non-public APIs that are better imported than reimplemented -->
<allow class="org.apache.kafka.common.utils.AppInfoParser" />
<allow class="org.apache.kafka.common.internals.KafkaFutureImpl" />
<allow class="org.apache.kafka.common.header.internals.RecordHeader" />

<!-- Various Kafka serialization classes -->
<allow class="io\.confluent\.kafka\.serializers\..*Config" regex="true" />
<allow class="io\.confluent\.kafka\.serializers\..*Deserializer" regex="true" />
<allow class="io\.confluent\.kafka\.serializers\..*Serializer" regex="true" />
<allow class="io\.confluent\.kafka\.serializers.subject\..*Strategy" regex="true" />

<!-- common external library dependencies -->
<allow pkg="com.fasterxml.jackson" />
<allow pkg="com.google.auto.value" />
<allow pkg="com.google.common" />
<allow pkg="com.google.protobuf" />
<allow pkg="io.github.resilience4j" />
<allow pkg="java" />
<allow pkg="javax.annotation" />
<allow pkg="javax.inject" />
<allow pkg="javax.management" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.validation" />
<allow pkg="javax.ws.rs" />
<allow pkg="org.apache.avro" />
<allow pkg="org.easymock" />
<allow pkg="org.eclipse.jetty" />
<allow pkg="org.glassfish.hk2" />
<allow pkg="org.glassfish.jersey" />
<allow pkg="org.hamcrest" />
<allow pkg="org.junit" />
<allow pkg="org.slf4j" />

<!-- Kafka REST's own base package -->
<allow pkg="io.confluent.kafkarest" />
<!-- common's base package -->
<allow pkg="io.confluent.common" />
<!-- rest-utils' base package -->
<allow pkg="io.confluent.rest" />
<!-- schema-registry's core base package -->
<allow pkg="io.confluent.kafka.schemaregistry" />

<!-- Various one-off classes -->
<allow class="edu.umd.cs.findbugs.annotations.SuppressFBWarnings" />
<allow class="javax.xml.bind.DatatypeConverter" />
<allow class="org.everit.json.schema.ValidationException" />
<allow class="org.hibernate.validator.constraints.URL" />

<!-- Static field imports -->
<allow class="io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS" />
<allow class="io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT" />
<allow class="io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG" />
<allow class="io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION" />
<allow class="org.apache.kafka.clients.CommonClientConfigs.METRICS_CONTEXT_PREFIX" />

<!-- Test-specific imports -->

<!-- a. One-off classes from common external dependencies -->
<allow class="javax.security.auth.login.Configuration" />
<allow class="scala.Option" />
<allow class="scala.collection.JavaConverters" />
<!-- b. Non-public Apache Kafka APIs -->
<allow class="org.apache.kafka.clients.CommonClientConfigs" />
<allow class="org.apache.kafka.common.config.types.Password" />
<allow class="org.apache.kafka.common.network.ListenerName" />
<allow class="org.apache.kafka.common.protocol.Errors" />
<allow class="org.apache.kafka.common.security.JaasUtils" />
<allow class="org.apache.kafka.test.TestSslUtils" />
<!-- c. Non-public Confluent Community Kafka APIs -->
<allow class="kafka.admin.AclCommand" />
<allow class="kafka.security.authorizer.AclAuthorizer" />
<allow class="kafka.server.KafkaConfig" />
<allow class="kafka.server.KafkaServer" />
<allow class="kafka.utils.CoreUtils" />
<allow class="kafka.utils.MockTime" />
<allow class="kafka.utils.TestUtils" />
<allow class="kafka.zk.EmbeddedZookeeper" />

</import-control>
46 changes: 42 additions & 4 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,57 @@

<!-- This class is a legacy from the V2 API. It currently has too many responsibilities, and needs
to be broken down. -->
<suppress checks="ClassDataAbstractionCoupling" files="KafkaConsumerManager.java" />
<suppress checks="ClassDataAbstractionCoupling" files="KafkaConsumerManager" />

<!-- This class is the main point of configuration for the Kafka REST application. It needs to
reference multiple parts of the codebase by nature. Maybe some of the configuration can be
broken out. -->
<suppress checks="ClassDataAbstractionCoupling" files="KafkaRestApplication.java" />
<suppress checks="ClassDataAbstractionCoupling" files="KafkaRestApplication" />

<!-- Setting up the metrics involves a lot of intermediate collaborators. Possibly solvable by
moving some of the logic to ProducerMetricsFactory. -->
<suppress checks="ClassDataAbstractionCoupling" files="ProducerMetrics.java" />
<suppress checks="ClassDataAbstractionCoupling" files="ProducerMetrics" />

<!-- KafkaRestConfig#baseKafkaRestConfigDef contains the definition of all Kafka REST configs.
Maybe it can be split in multiple methods. -->
<suppress checks="MethodLength" files="KafkaRestConfig.java" />
<suppress checks="MethodLength" files="KafkaRestConfig" />

<!-- With KREST-4067, ImportControl checks will be applied to both the main sources and the test
sources, which requires also applying Checkstyle to the test sources. To maintain the
pre-existing behavior without additional changes, all other (non-ImportControl) checks
should be explicitly suppressed. -->

<!-- That check is not great for test files - e.g. in many cases the declared variable takes the
result of an action, to be asserted against later. We'll ignore it for all tests. -->
<suppress checks="VariableDeclarationUsageDistance" files="[A-Za-z]*Test" />

<!-- That check flags names that are better kept as-is - e.g. mBeanServer. -->
<suppress checks="LocalVariableName" files="TestUtils|ProducerMetricsTest|ProduceActionTest" />

<!-- That check is not great - it complains about RecordType as a type parameter name. -->
<suppress checks="MethodTypeParameterName" files="AbstractConsumerTest" />

<!-- The tests below are fairly big and complex, so they go over the NCSS threshold. -->
<suppress checks="JavaNCSS" files="ProtobufConverterTest|ProduceActionTest" />

<!-- TestUtils#encodeComparable contains a complex, switch-like if-else structure. -->
<suppress checks="CyclomaticComplexity" files="TestUtils" />

<suppress
checks="ClassDataAbstractionCoupling"
files="ClusterTestHarness|LoadTest|SchemaRegistryFixture|RecordSerializerFacadeTest|SchemaManagerImplTest|Produce[A-Za-z]*Test" />

<!-- The tests below contain methods that use builders to create fairly complex result objects -->
<suppress
checks="MethodLength"
files="ConsumerAssignmentsResourceIntegrationTest|TopicsResourceIntegrationTest|[A-Za-z]*ConfigsResourceIntegrationTest" />

<!-- The tests below contain lines for which there's no good way of splitting in two. -->
<suppress
checks="LineLength"
files="UrlFactoryImplTest|PartitionsResourceTest|ConsumerGroupsResourceTest" />

<!-- AutoValue-generated classes can contain various violations which should be ignored. -->
<suppress checks="[A-za-z]*" files="AutoValue[A-Za-z_]*" />

</suppressions>
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest;

import static io.confluent.kafkarest.KafkaRestConfig.METRICS_REPORTER_CLASSES_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest;

import static java.util.Objects.requireNonNull;
Expand Down
11 changes: 8 additions & 3 deletions kafka-rest/src/test/java/io/confluent/kafkarest/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -140,7 +141,8 @@ public static JsonNode jsonTree(String jsonData) {
public static void assertPartitionsEqual(List<PartitionOffset> a, List<PartitionOffset> b) {
assertEquals(a.size(), b.size());
for (int i = 0; i < a.size(); i++) {
PartitionOffset aOffset = a.get(i), bOffset = b.get(i);
PartitionOffset aOffset = a.get(i);
PartitionOffset bOffset = b.get(i);
assertEquals(aOffset.getPartition(), bOffset.getPartition());
}
}
Expand All @@ -150,7 +152,8 @@ public static void assertPartitionOffsetsEqual(List<PartitionOffset> a, List<Par
// exception vs. non-exception responses match up
assertEquals(a.size(), b.size());
for (int i = 0; i < a.size(); i++) {
PartitionOffset aOffset = a.get(i), bOffset = b.get(i);
PartitionOffset aOffset = a.get(i);
PartitionOffset bOffset = b.get(i);
assertEquals(aOffset.getError() != null, bOffset.getError() != null);
assertEquals(aOffset.getOffset() != null, bOffset.getOffset() != null);
}
Expand Down Expand Up @@ -393,7 +396,9 @@ private static <V, K> Map<Object, Integer> topicCounts(
if (partition == null || record.partition() == partition) {
Object msg = encodeComparable(record.value());
msgCounts.put(msg, (msgCounts.get(msg) == null ? 0 : msgCounts.get(msg)) + 1);
if (counter.incrementAndGet() == records.size()) return true;
if (counter.incrementAndGet() == records.size()) {
return true;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@

import io.confluent.kafkarest.entities.Acl;
import io.confluent.kafkarest.entities.Cluster;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.NotFoundException;
import org.apache.kafka.clients.admin.Admin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void noSchemaRegistryClientConfigured() {
/* isKey= */ true));

assertEquals(
"Error serializing message. Schema Registry not defined, no Schema Registry client available to serialize message.",
"Error serializing message. Schema Registry not defined, "
+ "no Schema Registry client available to serialize message.",
rcve.getMessage());
assertEquals(42207, rcve.getErrorCode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ public void getSchema_avro_schemaId_schemaIdNotInSubject() throws Exception {
/* rawSchema= */ Optional.empty(),
/* isKey= */ true));
assertEquals(
"Error serializing message. Error when fetching schema version. subject = topic-1-key, schema = \"int\"\n"
"Error serializing message. Error when fetching schema version. "
+ "subject = topic-1-key, schema = \"int\"\n"
+ "Subject Not Found; error code: 40401",
rcve.getMessage());
assertEquals(42207, rcve.getErrorCode());
Expand Down Expand Up @@ -517,7 +518,8 @@ public void getSchema_avro_latestSchema_noSchema() {
/* rawSchema= */ Optional.empty(),
/* isKey= */ true));
assertEquals(
"Error serializing message. Error when fetching latest schema version. subject = topic-1-key\n"
"Error serializing message. Error when fetching latest schema version. "
+ "subject = topic-1-key\n"
+ "Subject Not Found; error code: 40401",
rcve.getMessage());
assertEquals(42207, rcve.getErrorCode());
Expand Down Expand Up @@ -570,7 +572,9 @@ public void getSchema_avro_schemaVersion_subjectNameStrategy_strategyReturnsNull
assertTrue(
rcve.getMessage()
.startsWith(
"Cannot use schema_subject_strategy=io.confluent.kafkarest.controllers.SchemaManagerImplTest$NullReturningSubjectNameStrategy@"));
"Cannot use schema_subject_strategy="
+ "io.confluent.kafkarest.controllers.SchemaManagerImplTest"
+ "$NullReturningSubjectNameStrategy@"));
assertTrue(rcve.getMessage().endsWith(" without schema_id or schema."));
}

Expand Down Expand Up @@ -766,7 +770,8 @@ public void errorRawSchemaCantParseSchema() {
/* rawSchema= */ Optional.of(TextNode.valueOf("rawSchema").toString()),
/* isKey= */ true));
assertEquals(
"Raw schema not supported with format = EasyMock for class io.confluent.kafkarest.entities.EmbeddedFormat",
"Raw schema not supported with format = "
+ "EasyMock for class io.confluent.kafkarest.entities.EmbeddedFormat",
rcve.getMessage());
assertEquals(400, rcve.getCode());
}
Expand Down Expand Up @@ -801,7 +806,8 @@ public void errorRawSchemaNotSupportedWithSchema() {
/* rawSchema= */ Optional.of(TextNode.valueOf("rawSchema").toString()),
/* isKey= */ true));
assertEquals(
"Raw schema not supported with format = EasyMock for class io.confluent.kafkarest.entities.EmbeddedFormat",
"Raw schema not supported with format = "
+ "EasyMock for class io.confluent.kafkarest.entities.EmbeddedFormat",
bre.getMessage());
assertEquals(400, bre.getCode());
}
Expand Down Expand Up @@ -844,7 +850,10 @@ public void errorRegisteringSchema() throws RestClientException, IOException {
/* rawSchema= */ Optional.of(TextNode.valueOf("rawString").toString()),
/* isKey= */ true));
assertEquals(
"Error serializing message. Error when registering schema. format = EasyMock for class io.confluent.kafkarest.entities.EmbeddedFormat, subject = subject1, schema = null\n"
"Error serializing message. Error when registering schema. "
+ "format = EasyMock for class io.confluent.kafkarest.entities.EmbeddedFormat, "
+ "subject = subject1, "
+ "schema = null\n"
+ "Can't register Schema",
rcve.getMessage());
assertEquals(42207, rcve.getErrorCode());
Expand Down
Loading

0 comments on commit 5ab4ff0

Please sign in to comment.