Skip to content

Commit da1cbba

Browse files
authored
Merge pull request #19 from tchiotludo/fix/collection
Fix connector tasks & plugins query
2 parents cb78656 + 908f160 commit da1cbba

File tree

6 files changed

+130
-4
lines changed

6 files changed

+130
-4
lines changed

src/main/java/org/sourcelab/kafka/connect/apiclient/request/dto/Task.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public String toString() {
4545
/**
4646
* Defines a Task Id.
4747
*/
48-
private static class TaskId {
48+
public static class TaskId {
4949
private String connector;
5050
private int task;
5151

src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorPlugins.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin;
2222

2323
import java.io.IOException;
24+
import java.util.Arrays;
2425
import java.util.Collection;
2526

2627
/**
@@ -40,6 +41,6 @@ public Object getRequestBody() {
4041

4142
@Override
4243
public Collection<ConnectorPlugin> parseResponse(final String responseStr) throws IOException {
43-
return JacksonFactory.newInstance().readValue(responseStr, Collection.class);
44+
return Arrays.asList(JacksonFactory.newInstance().readValue(responseStr, ConnectorPlugin[].class));
4445
}
4546
}

src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorTasks.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.sourcelab.kafka.connect.apiclient.request.dto.Task;
2222

2323
import java.io.IOException;
24+
import java.util.Arrays;
2425
import java.util.Collection;
2526
import java.util.Objects;
2627

@@ -54,6 +55,6 @@ public Object getRequestBody() {
5455

5556
@Override
5657
public Collection<Task> parseResponse(final String responseStr) throws IOException {
57-
return JacksonFactory.newInstance().readValue(responseStr, Collection.class);
58+
return Arrays.asList(JacksonFactory.newInstance().readValue(responseStr, Task[].class));
5859
}
5960
}

src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectors.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
2121

2222
import java.io.IOException;
23+
import java.util.Arrays;
2324
import java.util.Collection;
2425

2526
/**
@@ -38,6 +39,6 @@ public Object getRequestBody() {
3839

3940
@Override
4041
public Collection<String> parseResponse(final String responseStr) throws IOException {
41-
return JacksonFactory.newInstance().readValue(responseStr, Collection.class);
42+
return Arrays.asList(JacksonFactory.newInstance().readValue(responseStr, String[].class));
4243
}
4344
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7+
* persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package org.sourcelab.kafka.connect.apiclient.request.get.connector;
19+
20+
import org.junit.Test;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import org.sourcelab.kafka.connect.apiclient.request.AbstractRequestTest;
24+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin;
25+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorPlugins;
26+
27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.Collection;
30+
import java.util.List;
31+
32+
import static org.junit.Assert.*;
33+
34+
public class GetConnectorPluginsTest extends AbstractRequestTest {
35+
private static final Logger logger = LoggerFactory.getLogger(GetConnectorPluginsTest.class);
36+
37+
/**
38+
* Test Parsing GET /connectors response.
39+
*/
40+
@Test
41+
public void testParseResponse() throws IOException {
42+
final String mockResponse = readFile("getConnectorPlugins.json");
43+
final List<ConnectorPlugin> result = new ArrayList<>(new GetConnectorPlugins().parseResponse(mockResponse));
44+
45+
// Validate
46+
assertNotNull("Should not be null", result);
47+
assertEquals("Should have two entries", 2, result.size());
48+
49+
assertEquals("Should have connector", result.get(0).getClassName(), "org.apache.kafka.connect.file.FileStreamSinkConnector");
50+
assertEquals("Should have type", result.get(0).getType(), "sink");
51+
assertEquals("Should have version", result.get(0).getVersion(), "1.0.0-cp1");
52+
53+
assertEquals("Should have connector", result.get(1).getClassName(), "org.apache.kafka.connect.file.FileStreamSourceConnector");
54+
assertEquals("Should have type", result.get(1).getType(), "source");
55+
assertEquals("Should have version", result.get(1).getVersion(), "1.0.0-cp1");
56+
}
57+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7+
* persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package org.sourcelab.kafka.connect.apiclient.request.get.connector;
19+
20+
import com.google.common.collect.ImmutableMap;
21+
import org.junit.Test;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import org.sourcelab.kafka.connect.apiclient.request.AbstractRequestTest;
25+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin;
26+
import org.sourcelab.kafka.connect.apiclient.request.dto.Task;
27+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorPlugins;
28+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTasks;
29+
30+
import java.io.IOException;
31+
import java.util.ArrayList;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
35+
import static org.junit.Assert.assertEquals;
36+
import static org.junit.Assert.assertNotNull;
37+
38+
public class GetConnectorTasksTest extends AbstractRequestTest {
39+
private static final Logger logger = LoggerFactory.getLogger(GetConnectorTasksTest.class);
40+
41+
/**
42+
* Test Parsing GET /connectors response.
43+
*/
44+
@Test
45+
public void testParseResponse() throws IOException {
46+
final String mockResponse = readFile("getConnectorTasks.json");
47+
final List<Task> result = new ArrayList<>(new GetConnectorTasks("MyTestConnector").parseResponse(mockResponse));
48+
49+
// Validate
50+
assertNotNull("Should not be null", result);
51+
assertEquals("Should have one entry", 1, result.size());
52+
53+
assertEquals("Should have connector", result.get(0).getId().getConnector(), "MyTestConnector");
54+
assertEquals("Should have task id", result.get(0).getId().getTask(), 0);
55+
56+
assertEquals("Should have configs", result.get(0).getConfig(), ImmutableMap.<String, String>builder()
57+
.put("connector.class", "org.apache.kafka.connect.tools.VerifiableSourceConnector")
58+
.put("task.class", "org.apache.kafka.connect.tools.VerifiableSourceTask")
59+
.put("tasks.max", "1")
60+
.put("topics", "test-topic")
61+
.put("name", "MyTestConnector")
62+
.put("id", "0")
63+
.build()
64+
);
65+
}
66+
}

0 commit comments

Comments
 (0)