Skip to content

Commit 0c1737c

Browse files
committed
add support for new connector restart request parameters
1 parent f3ff154 commit 0c1737c

File tree

5 files changed

+160
-11
lines changed

5 files changed

+160
-11
lines changed

CHANGELOG.md

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,20 @@
22
The format is based on [Keep a Changelog](http://keepachangelog.com/)
33
and this project adheres to [Semantic Versioning](http://semver.org/).
44

5-
## 3.1.4 (12/10/2021)
6-
- Remove Guava as a dependency.
7-
- Update Log4j dependency from 2.14.1 to 2.15.0 for [CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228).
5+
## 4.0.0 (12/10/2021)
86

7+
**Note:** Major release version updated purely out of abundance of caution.
8+
9+
Internal dependency on Google Guava was removed. Functionally nothing about this library's
10+
API has changed. If you were making use of Google Guava via a transitive dependency provided by this
11+
library, then you'll need to update your project to include it directly.
12+
13+
### New Features
14+
- Update [/connector/\<name\>/restart](https://docs.confluent.io/platform/current/connect/references/restapi.html#post--connectors-(string-name)-restart) request to support the `includeTasks` and `onlyFailed` parameters from Kafka-Connect 3.0.0.
15+
16+
#### Internal Dependency Updates
17+
- Removed Google Guava as a dependency.
18+
- com.fasterxml.jackson.core from 2.12.2 -> 2.13.0.
919

1020
## 3.1.3 (08/11/2021)
1121
- [Issue-55](https://github.com/SourceLabOrg/kafka-connect-client/issues/55) Create new HttpContext for every request.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ This client library is released on Maven Central. Add a new dependency to your
1212
<dependency>
1313
<groupId>org.sourcelab</groupId>
1414
<artifactId>kafka-connect-client</artifactId>
15-
<version>3.1.3</version>
15+
<version>3.1.4</version>
1616
</dependency>
1717
```
1818

pom.xml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>org.sourcelab</groupId>
88
<artifactId>kafka-connect-client</artifactId>
9-
<version>3.1.3</version>
9+
<version>4.0.0</version>
1010
<packaging>jar</packaging>
1111

1212
<!-- Require Maven 3.3.9 -->
@@ -46,14 +46,11 @@
4646
<properties>
4747
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4848

49-
<!-- guava version -->
50-
<guava.version>30.1.1-jre</guava.version>
51-
5249
<!-- Http Components version -->
5350
<http-components.version>4.5.13</http-components.version>
5451

5552
<!-- Jackson version -->
56-
<jackson.version>2.12.2</jackson.version>
53+
<jackson.version>2.13.0</jackson.version>
5754

5855
<!-- Define which version of junit you'll be running -->
5956
<junit.version>4.13.1</junit.version>
@@ -64,7 +61,7 @@
6461
<checkstyle.version>8.32</checkstyle.version>
6562

6663
<!-- Log4J Version -->
67-
<log4j2.version>2.14.1</log4j2.version>
64+
<log4j2.version>2.15.0</log4j2.version>
6865
<slf4j.version>1.7.32</slf4j.version>
6966

7067
<!-- test toggling -->

src/main/java/org/sourcelab/kafka/connect/apiclient/request/post/PostConnectorRestart.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import org.sourcelab.kafka.connect.apiclient.util.UrlEscapingUtil;
2121

2222
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
2327
import java.util.Objects;
2428

2529
/**
@@ -28,6 +32,8 @@
2832
public final class PostConnectorRestart implements PostRequest<Boolean> {
2933
private final String connectorName;
3034

35+
private Map<String, Boolean> options = new HashMap<>();
36+
3137
/**
3238
* Constructor.
3339
* @param connectorName Name of connector to restart
@@ -37,9 +43,46 @@ public PostConnectorRestart(final String connectorName) {
3743
this.connectorName = connectorName;
3844
}
3945

46+
/**
47+
* Only available from Kafka Connect version 3.0.0 and up.
48+
* @param includeTasks Specifies whether to restart the connector instance and task instances (includeTasks=true`) or just the connector instance (includeTasks=false).
49+
* @return self reference for method chaining.
50+
*/
51+
public PostConnectorRestart withIncludeTasks(final boolean includeTasks)
52+
{
53+
options.put("includeTasks", includeTasks);
54+
return this;
55+
}
56+
57+
/**
58+
* Only available from Kafka Connect version 3.0.0 and up.
59+
* @param onlyFailed specifies whether to restart just the instances with a FAILED status (onlyFailed=true) or all instances (onlyFailed=false).
60+
* @return self reference for method chaining.
61+
*/
62+
public PostConnectorRestart withOnlyFailed(final boolean onlyFailed)
63+
{
64+
options.put("onlyFailed", onlyFailed);
65+
return this;
66+
}
67+
4068
@Override
4169
public String getApiEndpoint() {
42-
return "/connectors/" + UrlEscapingUtil.escapePath(connectorName) + "/restart";
70+
// Define base URL
71+
String url = "/connectors/" + UrlEscapingUtil.escapePath(connectorName) + "/restart";
72+
73+
// Optionally add additional request parameters if explicitly defined.
74+
final List<String> params = new ArrayList<>();
75+
for (final Map.Entry<String, Boolean> option : options.entrySet()) {
76+
// skip null
77+
if (option.getValue() == null) {
78+
continue;
79+
}
80+
params.add(option.getKey() + "=" + (option.getValue() ? "true" : "false"));
81+
}
82+
if (params.size() > 0) {
83+
url = url + "?" + String.join("&", params);
84+
}
85+
return url;
4386
}
4487

4588
@Override
@@ -49,6 +92,7 @@ public Object getRequestBody() {
4992

5093
@Override
5194
public Boolean parseResponse(final String responseStr) throws IOException {
95+
// Note: this doesn't currently support 202 responses which would normally contain a response body :/
5296
return true;
5397
}
5498
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.post;
2+
3+
import org.junit.Test;
4+
import org.sourcelab.kafka.connect.apiclient.request.AbstractRequestTest;
5+
6+
import static org.junit.Assert.assertEquals;
7+
8+
/**
9+
*
10+
*/
11+
public class PostConnectorRestartTest extends AbstractRequestTest {
12+
@Override
13+
public void testParseResponse() throws Exception {
14+
15+
}
16+
17+
/**
18+
* With no optional parameters.
19+
*/
20+
@Override
21+
public void getApiEndpoint() {
22+
final String inputName = "My Test Connector";
23+
final String expectedUrl = "/connectors/My%20Test%20Connector/restart";
24+
final String result = new PostConnectorRestart(inputName).getApiEndpoint();
25+
assertEquals("Unexpected URL returned!", expectedUrl, result);
26+
}
27+
28+
/**
29+
* Verify with various optional parameters specified.
30+
*/
31+
@Test
32+
public void getApiEndpoint_includeTasks_true() {
33+
final String inputName = "My Test Connector";
34+
final String expectedUrl = "/connectors/My%20Test%20Connector/restart?includeTasks=true";
35+
final String result = new PostConnectorRestart(inputName)
36+
.withIncludeTasks(true)
37+
.getApiEndpoint();
38+
39+
assertEquals("Unexpected URL returned!", expectedUrl, result);
40+
}
41+
42+
/**
43+
* Verify with various optional parameters specified.
44+
*/
45+
@Test
46+
public void getApiEndpoint_includeTasks_false() {
47+
final String inputName = "My Test Connector";
48+
final String expectedUrl = "/connectors/My%20Test%20Connector/restart?includeTasks=false";
49+
final String result = new PostConnectorRestart(inputName)
50+
.withIncludeTasks(false)
51+
.getApiEndpoint();
52+
53+
assertEquals("Unexpected URL returned!", expectedUrl, result);
54+
}
55+
56+
/**
57+
* Verify with various optional parameters specified.
58+
*/
59+
@Test
60+
public void getApiEndpoint_onlyFailed_true() {
61+
final String inputName = "My Test Connector";
62+
final String expectedUrl = "/connectors/My%20Test%20Connector/restart?onlyFailed=true";
63+
final String result = new PostConnectorRestart(inputName)
64+
.withOnlyFailed(true)
65+
.getApiEndpoint();
66+
67+
assertEquals("Unexpected URL returned!", expectedUrl, result);
68+
}
69+
70+
/**
71+
* Verify with various optional parameters specified.
72+
*/
73+
@Test
74+
public void getApiEndpoint_onlyFailed_false() {
75+
final String inputName = "My Test Connector";
76+
final String expectedUrl = "/connectors/My%20Test%20Connector/restart?onlyFailed=false";
77+
final String result = new PostConnectorRestart(inputName)
78+
.withOnlyFailed(false)
79+
.getApiEndpoint();
80+
81+
assertEquals("Unexpected URL returned!", expectedUrl, result);
82+
}
83+
84+
/**
85+
* Verify with various optional parameters specified.
86+
*/
87+
@Test
88+
public void getApiEndpoint_onlyFailed_includeTasks_true() {
89+
final String inputName = "My Test Connector";
90+
final String expectedUrl = "/connectors/My%20Test%20Connector/restart?includeTasks=true&onlyFailed=true";
91+
final String result = new PostConnectorRestart(inputName)
92+
.withOnlyFailed(true)
93+
.withIncludeTasks(true)
94+
.getApiEndpoint();
95+
96+
assertEquals("Unexpected URL returned!", expectedUrl, result);
97+
}
98+
}

0 commit comments

Comments
 (0)