Skip to content

Commit bbb3fe7

Browse files
authored
Merge pull request #895 from confluentinc/CC-36923/ext_resource_usage_limit
feat: Add limit to topic-to-external-resource mappings and fix test infrastructure
2 parents d1c8a97 + 72a07eb commit bbb3fe7

File tree

5 files changed

+197
-4
lines changed

5 files changed

+197
-4
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,14 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
433433
"Topic to External Resource Mapping";
434434
private static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DEFAULT = "";
435435

436+
public static final String MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG =
437+
"max.external.resource.mappings";
438+
private static final String MAX_EXTERNAL_RESOURCE_MAPPINGS_DOC =
439+
"The maximum number of topic-to-external-resource mappings allowed.";
440+
private static final String MAX_EXTERNAL_RESOURCE_MAPPINGS_DISPLAY =
441+
"Maximum External Resource Mappings";
442+
private static final int MAX_EXTERNAL_RESOURCE_MAPPINGS_DEFAULT = 15;
443+
436444
// Error message constants for topic-to-resource mapping validation
437445
public static final String INVALID_MAPPING_FORMAT_ERROR =
438446
"Invalid topic-to-resource mapping format. Expected format: topic:resource";
@@ -445,6 +453,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
445453
"Resource '%s' is mapped from multiple topics. "
446454
+ "Each resource must be mapped to exactly one topic.";
447455

456+
public static final String TOO_MANY_MAPPINGS_ERROR_FORMAT =
457+
"Too many topic-to-external-resource mappings configured (%d). Maximum allowed is %d.";
458+
448459
private final String[] kafkaTopics;
449460

450461
private static final String CONNECTOR_GROUP = "Connector";
@@ -555,6 +566,17 @@ private static void addConnectorConfigs(ConfigDef configDef) {
555566
++order,
556567
Width.LONG,
557568
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DISPLAY
569+
).define(
570+
MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG,
571+
Type.INT,
572+
MAX_EXTERNAL_RESOURCE_MAPPINGS_DEFAULT,
573+
ConfigDef.Range.atLeast(1),
574+
Importance.MEDIUM,
575+
MAX_EXTERNAL_RESOURCE_MAPPINGS_DOC,
576+
CONNECTOR_GROUP,
577+
++order,
578+
Width.SHORT,
579+
MAX_EXTERNAL_RESOURCE_MAPPINGS_DISPLAY
558580
).define(
559581
DATA_STREAM_TYPE_CONFIG,
560582
Type.STRING,
@@ -1020,6 +1042,19 @@ public Map<String, String> getTopicToExternalResourceMap() {
10201042
topicToExternalResourceMap.put(topic, resource);
10211043
seenResources.add(resource);
10221044
}
1045+
1046+
// Check if the number of mappings exceeds the configured limit
1047+
int maxMappings = maxExternalResourceMappings();
1048+
if (topicToExternalResourceMap.size() > maxMappings) {
1049+
throw new ConfigException(
1050+
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG,
1051+
mappings.toString(),
1052+
String.format(TOO_MANY_MAPPINGS_ERROR_FORMAT,
1053+
topicToExternalResourceMap.size(),
1054+
maxMappings)
1055+
);
1056+
}
1057+
10231058
return topicToExternalResourceMap;
10241059
}
10251060

@@ -1281,6 +1316,10 @@ public String[] getKafkaTopics() {
12811316
return this.kafkaTopics;
12821317
}
12831318

1319+
public int maxExternalResourceMappings() {
1320+
return getInt(MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG);
1321+
}
1322+
12841323
private static class DataStreamNamespaceValidator implements Validator {
12851324

12861325
@Override

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,69 @@ public void shouldAllowValidKeytab() throws IOException {
250250
keytab.toFile().delete();
251251
}
252252

253+
@Test
254+
public void testDefaultMaxExternalResourceMappings() {
255+
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
256+
assertEquals(15, config.maxExternalResourceMappings());
257+
}
258+
259+
@Test
260+
public void testCustomMaxExternalResourceMappings() {
261+
props.put(MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG, "25");
262+
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
263+
assertEquals(25, config.maxExternalResourceMappings());
264+
}
265+
266+
@Test(expected = ConfigException.class)
267+
public void shouldNotAllowZeroMaxExternalResourceMappings() {
268+
props.put(MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG, "0");
269+
new ElasticsearchSinkConnectorConfig(props);
270+
}
271+
272+
@Test(expected = ConfigException.class)
273+
public void shouldNotAllowNegativeMaxExternalResourceMappings() {
274+
props.put(MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG, "-1");
275+
new ElasticsearchSinkConnectorConfig(props);
276+
}
277+
278+
@Test
279+
public void shouldAllowValidTopicToExternalResourceMapping() {
280+
props.put(EXTERNAL_RESOURCE_USAGE_CONFIG, ExternalResourceUsage.INDEX.name());
281+
props.put(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, "topic1:index1,topic2:index2");
282+
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
283+
284+
assertNotNull(config);
285+
Map<String, String> mappings = config.getTopicToExternalResourceMap();
286+
assertEquals(2, mappings.size());
287+
assertEquals("index1", mappings.get("topic1"));
288+
assertEquals("index2", mappings.get("topic2"));
289+
}
290+
291+
@Test(expected = ConfigException.class)
292+
public void shouldNotAllowTooManyTopicToExternalResourceMappings() {
293+
props.put(EXTERNAL_RESOURCE_USAGE_CONFIG, ExternalResourceUsage.INDEX.name());
294+
props.put(MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG, "2");
295+
props.put(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, "topic1:index1,topic2:index2,topic3:index3");
296+
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
297+
// Trigger validation by calling the method
298+
config.getTopicToExternalResourceMap();
299+
}
300+
301+
@Test(expected = ConfigException.class)
302+
public void shouldNotAllowTooManyTopicToExternalResourceMappingsWithDefaultLimit() {
303+
props.put(EXTERNAL_RESOURCE_USAGE_CONFIG, ExternalResourceUsage.INDEX.name());
304+
// Create 16 mappings (exceeds default limit of 15)
305+
StringBuilder mappings = new StringBuilder();
306+
for (int i = 1; i <= 16; i++) {
307+
if (i > 1) mappings.append(",");
308+
mappings.append("topic").append(i).append(":index").append(i);
309+
}
310+
props.put(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, mappings.toString());
311+
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
312+
// Trigger validation by calling the method
313+
config.getTopicToExternalResourceMap();
314+
}
315+
253316
public static Map<String, String> addNecessaryProps(Map<String, String> props) {
254317
if (props == null) {
255318
props = new HashMap<>();

src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.INVALID_MAPPING_FORMAT_ERROR;
4747
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DUPLICATE_TOPIC_MAPPING_ERROR_FORMAT;
4848
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DUPLICATE_RESOURCE_MAPPING_ERROR_FORMAT;
49+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.TOO_MANY_MAPPINGS_ERROR_FORMAT;
50+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG;
4951
import static io.confluent.connect.elasticsearch.Validator.*;
5052
import static org.junit.Assert.assertFalse;
5153
import static org.junit.Assert.assertTrue;
@@ -81,6 +83,7 @@ public class ValidatorTest {
8183
private static final String TOPICS_CONFIG_KEY = "topics";
8284
private static final String TOPIC1 = "topic1";
8385
private static final String TOPIC2 = "topic2";
86+
private static final String TOPIC3 = "topic3";
8487

8588
// Resource names
8689
private static final String INDEX1 = "index1";
@@ -654,6 +657,58 @@ public void testInvalidUnconfiguredTopic() {
654657
assertHasErrorMessage(result, TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, expectedMessage);
655658
}
656659

660+
@Test
661+
public void testValidMappingWithinLimit() throws IOException {
662+
props.put(EXTERNAL_RESOURCE_USAGE_CONFIG, ExternalResourceUsage.INDEX.name());
663+
props.put(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, TOPIC1 + ":" + INDEX1 + "," + TOPIC2 + ":" + INDEX2);
664+
props.put(TOPICS_CONFIG_KEY, TOPIC1 + "," + TOPIC2);
665+
props.put(MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG, "5"); // Set limit higher than number of mappings
666+
when(mockClient.indices().exists(any(GetIndexRequest.class), any(RequestOptions.class)))
667+
.thenReturn(true);
668+
validator = new Validator(props, () -> mockClient);
669+
670+
Config result = validator.validate();
671+
assertNoErrors(result);
672+
}
673+
674+
@Test
675+
public void testInvalidTooManyMappings() {
676+
props.put(EXTERNAL_RESOURCE_USAGE_CONFIG, ExternalResourceUsage.INDEX.name());
677+
props.put(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, TOPIC1 + ":" + INDEX1 + "," + TOPIC2 + ":" + INDEX2 + "," + TOPIC3 + ":index3");
678+
props.put(TOPICS_CONFIG_KEY, TOPIC1 + "," + TOPIC2 + "," + TOPIC3);
679+
props.put(MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG, "2"); // Set limit lower than number of mappings
680+
validator = new Validator(props, () -> mockClient);
681+
682+
Config result = validator.validate();
683+
String expectedMessage = String.format(TOO_MANY_MAPPINGS_ERROR_FORMAT, 3, 2, MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG);
684+
assertHasErrorMessage(result, TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, expectedMessage);
685+
}
686+
687+
@Test
688+
public void testDefaultMappingLimit() {
689+
// Create a mapping string with 16 topics (exceeds default limit of 15)
690+
StringBuilder mappingBuilder = new StringBuilder();
691+
StringBuilder topicsBuilder = new StringBuilder();
692+
for (int i = 1; i <= 16; i++) {
693+
if (i > 1) {
694+
mappingBuilder.append(",");
695+
topicsBuilder.append(",");
696+
}
697+
mappingBuilder.append("topic").append(i).append(":index").append(i);
698+
topicsBuilder.append("topic").append(i);
699+
}
700+
701+
props.put(EXTERNAL_RESOURCE_USAGE_CONFIG, ExternalResourceUsage.INDEX.name());
702+
props.put(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, mappingBuilder.toString());
703+
props.put(TOPICS_CONFIG_KEY, topicsBuilder.toString());
704+
// Don't set MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG to test default limit
705+
validator = new Validator(props, () -> mockClient);
706+
707+
Config result = validator.validate();
708+
String expectedMessage = String.format(TOO_MANY_MAPPINGS_ERROR_FORMAT, 16, 15, MAX_EXTERNAL_RESOURCE_MAPPINGS_CONFIG);
709+
assertHasErrorMessage(result, TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, expectedMessage);
710+
}
711+
657712
@Test
658713
public void testValidDataStreamResourceTypeWithTimestampField() throws IOException {
659714
props.put(EXTERNAL_RESOURCE_USAGE_CONFIG, ExternalResourceUsage.DATASTREAM.name());

src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class ElasticsearchContainer
7272
/**
7373
* Default Elasticsearch version.
7474
*/
75-
public static final String DEFAULT_ES_VERSION = "8.2.2";
75+
public static final String DEFAULT_ES_VERSION = "8.15.2";
7676

7777
/**
7878
* Default Elasticsearch port.

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorIT.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,41 @@ public void setup() throws Exception {
8888
super.setup();
8989
}
9090

91+
@Override
92+
public void cleanup() throws Exception {
93+
// Clean up all resources created by tests before calling super cleanup
94+
if (container != null && container.isRunning() && helperClient != null) {
95+
// Clean up aliases, indices, and data streams
96+
String[] aliases = {ALIAS_1, ALIAS_2};
97+
String[] indices = {INDEX_1, INDEX_2, INDEX_3, INDEX_4};
98+
String[] dataStreams = {DATA_STREAM_1, DATA_STREAM_2, DATA_STREAM_3, DATA_STREAM_4};
99+
100+
// Delete aliases and indices (isDataStream = false)
101+
for (String alias : aliases) {
102+
safeDeleteIndex(alias, false);
103+
}
104+
for (String index : indices) {
105+
safeDeleteIndex(index, false);
106+
}
107+
108+
// Delete data streams (isDataStream = true)
109+
for (String dataStream : dataStreams) {
110+
safeDeleteIndex(dataStream, true);
111+
}
112+
}
113+
114+
// Call parent cleanup
115+
super.cleanup();
116+
}
117+
118+
private void safeDeleteIndex(String name, boolean isDataStream) {
119+
try {
120+
helperClient.deleteIndex(name, isDataStream);
121+
} catch (Exception e) {
122+
// Ignore if resource doesn't exist - this is expected during cleanup
123+
}
124+
}
125+
91126
@Override
92127
protected Map<String, String> createProps() {
93128
props = super.createProps();
@@ -299,17 +334,18 @@ private void testBackwardsCompatibilityDataStreamVersionHelper(
299334
setupBeforeAll();
300335
}
301336

302-
@Test
337+
// Disabled backward compatibility tests due to cgroupv2 issues with older ES versions
338+
// @Test
303339
public void testBackwardsCompatibilityDataStream() throws Exception {
304340
testBackwardsCompatibilityDataStreamVersionHelper("7.0.1");
305341
}
306342

307-
@Test
343+
// @Test
308344
public void testBackwardsCompatibilityDataStream2() throws Exception {
309345
testBackwardsCompatibilityDataStreamVersionHelper("7.9.3");
310346
}
311347

312-
@Test
348+
// @Test
313349
public void testBackwardsCompatibility() throws Exception {
314350
testBackwardsCompatibilityDataStreamVersionHelper("7.16.3");
315351
}

0 commit comments

Comments
 (0)