Skip to content

Commit 46744ec

Browse files
committed
Merge remote-tracking branch 'origin/15.0.x' into pr_merge_from_14_1_x_to_15_0_x to avoid duplicate changes as new feature already exists in 15.0.x branch
2 parents 642aea7 + 101a5bf commit 46744ec

File tree

6 files changed

+260
-138
lines changed

6 files changed

+260
-138
lines changed

pom.xml

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
<parent>
66
<groupId>io.confluent</groupId>
77
<artifactId>common</artifactId>
8-
<version>7.0.12</version>
8+
<version>[7.9.0, 7.9.1)</version>
99
</parent>
1010

1111
<artifactId>kafka-connect-elasticsearch</artifactId>
12-
<version>14.1.5-SNAPSHOT</version>
12+
<version>15.0.2-SNAPSHOT</version>
1313
<packaging>jar</packaging>
1414
<name>kafka-connect-elasticsearch</name>
1515
<organization>
@@ -33,13 +33,12 @@
3333
<connection>scm:git:git://github.com/confluentinc/kafka-connect-elasticsearch.git</connection>
3434
<developerConnection>scm:git:[email protected]:confluentinc/kafka-connect-elasticsearch.git</developerConnection>
3535
<url>https://github.com/confluentinc/kafka-connect-elasticsearch</url>
36-
<tag>14.1.x</tag>
36+
<tag>15.0.x</tag>
3737
</scm>
3838

3939
<properties>
4040
<es.version>7.17.24</es.version>
4141
<hamcrest.version>1.3</hamcrest.version>
42-
<mockito.version>2.28.2</mockito.version>
4342
<gson.version>2.9.0</gson.version>
4443
<test.containers.version>1.16.3</test.containers.version>
4544
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
@@ -54,7 +53,7 @@
5453
<dependency.check.version>6.1.6</dependency.check.version>
5554
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
5655
<commons.codec.version>1.15</commons.codec.version>
57-
<confluent.version>${io.confluent.common.version}</confluent.version>
56+
<confluent.version>[7.9.0,7.9.1)</confluent.version>
5857
<jackson.version>2.16.0</jackson.version>
5958
<dependency.check.skip>true</dependency.check.skip>
6059
</properties>
@@ -182,7 +181,15 @@
182181
<dependency>
183182
<groupId>org.apache.kafka</groupId>
184183
<artifactId>kafka-clients</artifactId>
185-
<version>${kafka.version}</version>
184+
<version>7.8.0-ccs</version>
185+
<classifier>test</classifier>
186+
<type>test-jar</type>
187+
<scope>test</scope>
188+
</dependency>
189+
<dependency>
190+
<groupId>org.apache.kafka</groupId>
191+
<artifactId>kafka-server-common</artifactId>
192+
<version>7.8.0-ccs</version>
186193
<classifier>test</classifier>
187194
<type>test-jar</type>
188195
<scope>test</scope>
@@ -312,6 +319,19 @@
312319
</dependencies>
313320
</dependencyManagement>
314321

322+
<distributionManagement>
323+
<repository>
324+
<id>aws-release</id>
325+
<name>AWS Release Repository</name>
326+
<url>${confluent.release.repo}</url>
327+
</repository>
328+
<snapshotRepository>
329+
<id>aws-snapshot</id>
330+
<name>AWS Snapshot Repository</name>
331+
<url>${confluent.snapshot.repo}</url>
332+
</snapshotRepository>
333+
</distributionManagement>
334+
315335
<build>
316336
<plugins>
317337
<plugin>

src/main/java/io/confluent/connect/elasticsearch/ConfigCallbackHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
5555
import org.apache.http.nio.reactor.ConnectingIOReactor;
5656
import org.apache.http.nio.reactor.IOReactorException;
57-
import org.apache.kafka.common.network.Mode;
57+
import org.apache.kafka.common.network.ConnectionMode;
5858
import org.apache.kafka.common.security.ssl.SslFactory;
5959
import org.apache.kafka.connect.errors.ConnectException;
6060
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
@@ -258,7 +258,7 @@ private void configureSslContext(HttpAsyncClientBuilder builder) {
258258
* Gets the SslContext for the client.
259259
*/
260260
private SSLContext sslContext() {
261-
SslFactory sslFactory = new SslFactory(Mode.CLIENT, null, false);
261+
SslFactory sslFactory = new SslFactory(ConnectionMode.CLIENT, null, false);
262262
sslFactory.configure(config.sslConfigs());
263263

264264
try {
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2025 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.connect.elasticsearch;
17+
18+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
19+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
20+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
21+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
22+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
23+
import static org.junit.Assert.assertEquals;
24+
import static org.mockito.Mockito.mock;
25+
26+
import java.util.HashMap;
27+
28+
import org.apache.kafka.common.config.SslConfigs;
29+
import org.junit.Before;
30+
import org.junit.BeforeClass;
31+
import org.junit.Test;
32+
33+
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
34+
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
35+
import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
36+
37+
public class ElasticsearchClientSslTest extends ElasticsearchClientTestBase {
38+
39+
private static final String ELASTIC_SUPERUSER_NAME = "elastic";
40+
private static final String ELASTIC_SUPERUSER_PASSWORD = "elastic";
41+
42+
@BeforeClass
43+
public static void setupBeforeAll() {
44+
container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true);
45+
container.start();
46+
}
47+
48+
@Override
49+
@Before
50+
public void setup() {
51+
index = TOPIC;
52+
props = ElasticsearchSinkConnectorConfigTest.addNecessaryProps(new HashMap<>());
53+
String address = container.getConnectionUrl(false);
54+
props.put(CONNECTION_URL_CONFIG, address);
55+
props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_SUPERUSER_NAME);
56+
props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_SUPERUSER_PASSWORD);
57+
props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
58+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, container.getKeystorePath());
59+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, container.getKeystorePassword());
60+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, container.getTruststorePath());
61+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, container.getTruststorePassword());
62+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEY_PASSWORD_CONFIG, container.getKeyPassword());
63+
config = new ElasticsearchSinkConnectorConfig(props);
64+
converter = new DataConverter(config);
65+
helperClient = new ElasticsearchHelperClient(address, config,
66+
container.shouldStartClientInCompatibilityMode());
67+
helperClient.waitForConnection(30000);
68+
offsetTracker = mock(OffsetTracker.class);
69+
}
70+
71+
@Test
72+
public void testSsl() throws Exception {
73+
ElasticsearchClient client = new ElasticsearchClient(config, null,
74+
() -> offsetTracker.updateOffsets(), 1, "elasticsearch-sink");
75+
client.createIndexOrDataStream(index);
76+
77+
writeRecord(sinkRecord(0), client);
78+
client.flush();
79+
80+
waitUntilRecordsInES(1);
81+
assertEquals(1, helperClient.getDocCount(index));
82+
client.close();
83+
}
84+
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java

Lines changed: 1 addition & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
1919
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG;
2020
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
21-
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
2221
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
23-
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
2422
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG;
2523
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG;
2624
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
@@ -29,8 +27,6 @@
2927
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG;
3028
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG;
3129
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG;
32-
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
33-
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
3430
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
3531
import static org.junit.Assert.assertEquals;
3632
import static org.junit.Assert.assertFalse;
@@ -46,7 +42,6 @@
4642

4743
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc;
4844
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
49-
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
5045
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod;
5146
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
5247
import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
@@ -59,55 +54,29 @@
5954
import java.util.Set;
6055
import java.util.concurrent.CompletableFuture;
6156
import java.util.concurrent.TimeUnit;
62-
import org.apache.kafka.common.config.SslConfigs;
63-
import org.apache.kafka.common.record.TimestampType;
6457
import org.apache.kafka.connect.data.Schema;
6558
import org.apache.kafka.connect.data.SchemaBuilder;
6659
import org.apache.kafka.connect.data.Struct;
6760
import org.apache.kafka.connect.errors.ConnectException;
6861
import org.apache.kafka.connect.sink.ErrantRecordReporter;
6962
import org.apache.kafka.connect.sink.SinkRecord;
70-
import org.apache.kafka.test.TestUtils;
71-
import org.elasticsearch.ElasticsearchStatusException;
7263
import org.elasticsearch.action.DocWriteRequest;
7364
import org.elasticsearch.action.bulk.BulkItemResponse;
7465
import org.elasticsearch.index.VersionType;
7566
import org.elasticsearch.search.SearchHit;
7667
import org.junit.After;
77-
import org.junit.AfterClass;
7868
import org.junit.Before;
7969
import org.junit.BeforeClass;
8070
import org.junit.Test;
8171

82-
public class ElasticsearchClientTest {
83-
84-
private static final String INDEX = "index";
85-
private static final String ELASTIC_SUPERUSER_NAME = "elastic";
86-
private static final String ELASTIC_SUPERUSER_PASSWORD = "elastic";
87-
private static final String TOPIC = "index";
88-
private static final String DATA_STREAM_TYPE = "logs";
89-
private static final String DATA_STREAM_DATASET = "dataset";
90-
91-
private static ElasticsearchContainer container;
92-
93-
private DataConverter converter;
94-
private ElasticsearchHelperClient helperClient;
95-
private ElasticsearchSinkConnectorConfig config;
96-
private Map<String, String> props;
97-
private String index;
98-
private OffsetTracker offsetTracker;
72+
public class ElasticsearchClientTest extends ElasticsearchClientTestBase {
9973

10074
@BeforeClass
10175
public static void setupBeforeAll() {
10276
container = ElasticsearchContainer.fromSystemProperties();
10377
container.start();
10478
}
10579

106-
@AfterClass
107-
public static void cleanupAfterAll() {
108-
container.close();
109-
}
110-
11180
@Before
11281
public void setup() {
11382
index = TOPIC;
@@ -748,43 +717,6 @@ public void testNoVersionConflict() throws Exception {
748717
client2.close();
749718
}
750719

751-
@Test
752-
public void testSsl() throws Exception {
753-
container.close();
754-
container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true);
755-
container.start();
756-
757-
String address = container.getConnectionUrl(false);
758-
props.put(CONNECTION_URL_CONFIG, address);
759-
props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_SUPERUSER_NAME);
760-
props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_SUPERUSER_PASSWORD);
761-
props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
762-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, container.getKeystorePath());
763-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, container.getKeystorePassword());
764-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, container.getTruststorePath());
765-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, container.getTruststorePassword());
766-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEY_PASSWORD_CONFIG, container.getKeyPassword());
767-
config = new ElasticsearchSinkConnectorConfig(props);
768-
converter = new DataConverter(config);
769-
770-
ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets(), 1, "elasticsearch-sink");
771-
helperClient = new ElasticsearchHelperClient(address, config,
772-
container.shouldStartClientInCompatibilityMode());
773-
client.createIndexOrDataStream(index);
774-
775-
writeRecord(sinkRecord(0), client);
776-
client.flush();
777-
778-
waitUntilRecordsInES(1);
779-
assertEquals(1, helperClient.getDocCount(index));
780-
client.close();
781-
helperClient = null;
782-
783-
container.close();
784-
container = ElasticsearchContainer.fromSystemProperties();
785-
container.start();
786-
}
787-
788720
@Test
789721
public void testWriteDataStreamInjectTimestamp() throws Exception {
790722
props.put(DATA_STREAM_TYPE_CONFIG, DATA_STREAM_TYPE);
@@ -806,12 +738,6 @@ public void testWriteDataStreamInjectTimestamp() throws Exception {
806738
client.close();
807739
}
808740

809-
private String createIndexName(String name) {
810-
return config.isDataStream()
811-
? String.format("%s-%s-%s", DATA_STREAM_TYPE, DATA_STREAM_DATASET, name)
812-
: name;
813-
}
814-
815741
@Test
816742
public void testConnectionUrlExtraSlash() {
817743
props.put(CONNECTION_URL_CONFIG, container.getConnectionUrl() + "/");
@@ -862,59 +788,4 @@ public void testThreadNamingWithConnectorNameAndTaskId() throws Exception {
862788

863789
client.close();
864790
}
865-
866-
private static Schema schema() {
867-
return SchemaBuilder
868-
.struct()
869-
.name("record")
870-
.field("offset", SchemaBuilder.int32().defaultValue(0).build())
871-
.field("another", SchemaBuilder.int32().defaultValue(0).build())
872-
.build();
873-
}
874-
875-
private static SinkRecord sinkRecord(int offset) {
876-
return sinkRecord("key", offset);
877-
}
878-
879-
private static SinkRecord sinkRecord(String key, int offset) {
880-
Struct value = new Struct(schema()).put("offset", offset).put("another", offset + 1);
881-
return sinkRecord(key, schema(), value, offset);
882-
}
883-
884-
private static SinkRecord sinkRecord(String key, Schema schema, Struct value, int offset) {
885-
return new SinkRecord(
886-
TOPIC,
887-
0,
888-
Schema.STRING_SCHEMA,
889-
key,
890-
schema,
891-
value,
892-
offset,
893-
System.currentTimeMillis(),
894-
TimestampType.CREATE_TIME
895-
);
896-
}
897-
898-
private void waitUntilRecordsInES(int expectedRecords) throws InterruptedException {
899-
TestUtils.waitForCondition(
900-
() -> {
901-
try {
902-
return helperClient.getDocCount(index) == expectedRecords;
903-
} catch (ElasticsearchStatusException e) {
904-
if (e.getMessage().contains("index_not_found_exception")) {
905-
return false;
906-
}
907-
908-
throw e;
909-
}
910-
},
911-
TimeUnit.MINUTES.toMillis(1),
912-
String.format("Could not find expected documents (%d) in time.", expectedRecords)
913-
);
914-
}
915-
916-
private void writeRecord(SinkRecord record, ElasticsearchClient client) {
917-
client.index(record, converter.convertRecord(record, createIndexName(record.topic())),
918-
new AsyncOffsetTracker.AsyncOffsetState(record.kafkaOffset()));
919-
}
920791
}

0 commit comments

Comments
 (0)