Skip to content

Commit 7eb1b8e

Browse files
authored
Merge pull request #51 from AdaptiveScale/bugfix/PLUGIN-219
PLUGIN-219: Pipeline with HttpSink plugin fails to deploy if URL field is specified as macro.
2 parents 14a0a65 + 8479299 commit 7eb1b8e

File tree

3 files changed

+87
-5
lines changed

3 files changed

+87
-5
lines changed

src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,13 @@ public Map<String, String> getRequestHeadersMap() {
272272
}
273273

274274
public void validate(FailureCollector collector) {
275-
try {
276-
new URL(url);
277-
} catch (MalformedURLException e) {
278-
collector.addFailure(String.format("URL '%s' is malformed: %s", url, e.getMessage()), null)
279-
.withConfigProperty(URL);
275+
if (!containsMacro(URL)) {
276+
try {
277+
new URL(url);
278+
} catch (MalformedURLException e) {
279+
collector.addFailure(String.format("URL '%s' is malformed: %s", url, e.getMessage()), null)
280+
.withConfigProperty(URL);
281+
}
280282
}
281283

282284
if (!containsMacro(CONNECTION_TIMEOUT) && Objects.nonNull(connectTimeout) && connectTimeout < 0) {

src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.http.sink.batch;
1818

1919
import io.cdap.cdap.etl.api.validation.CauseAttributes;
20+
import io.cdap.cdap.etl.api.validation.ValidationException;
2021
import io.cdap.cdap.etl.api.validation.ValidationFailure;
2122
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
2223
import org.junit.Assert;
@@ -134,6 +135,17 @@ public void testInvalidMessageFormat() {
134135
assertPropertyValidationFailed(failureCollector, HTTPSinkConfig.MESSAGE_FORMAT);
135136
}
136137

138+
@Test(expected = ValidationException.class)
139+
public void testHTTPSinkWithEmptyUrl() {
140+
HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG)
141+
.setUrl("")
142+
.build();
143+
144+
MockFailureCollector collector = new MockFailureCollector("httpsinkwithemptyurl");
145+
config.validate(collector);
146+
collector.getOrThrowException();
147+
}
148+
137149
public static void assertPropertyValidationFailed(MockFailureCollector failureCollector, String paramName) {
138150
List<ValidationFailure> failureList = failureCollector.getValidationFailures();
139151
Assert.assertEquals(1, failureList.size());

src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.api.data.format.StructuredRecord;
2424
import io.cdap.cdap.api.data.schema.Schema;
2525
import io.cdap.cdap.api.dataset.table.Table;
26+
import io.cdap.cdap.common.utils.Tasks;
2627
import io.cdap.cdap.datapipeline.DataPipelineApp;
2728
import io.cdap.cdap.datapipeline.SmartWorkflow;
2829
import io.cdap.cdap.etl.api.batch.BatchSink;
@@ -58,7 +59,9 @@
5859
import java.util.ArrayList;
5960
import java.util.List;
6061
import java.util.Map;
62+
import java.util.concurrent.ExecutionException;
6163
import java.util.concurrent.TimeUnit;
64+
import java.util.concurrent.TimeoutException;
6265
import javax.ws.rs.HttpMethod;
6366

6467
/**
@@ -174,4 +177,69 @@ private int getFeeds() throws IOException {
174177
urlConn.disconnect();
175178
return responseCode;
176179
}
180+
181+
@Test
182+
public void testHTTPSinkMacroUrl() throws Exception {
183+
String inputDatasetName = "input-http-sink-with-macro";
184+
ETLStage source = new ETLStage("source", MockSource.getPlugin(inputDatasetName));
185+
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
186+
.put("url", "${url}")
187+
.put("method", "PUT")
188+
.put("messageFormat", "Custom")
189+
.put("charset", "UTF-8")
190+
.put("body", "cask cdap, hydrator tracker, ui cli")
191+
.put("batchSize", "1")
192+
.put("referenceName", "HTTPSinkReference")
193+
.put("delimiterForMessages", "\n")
194+
.put("numRetries", "3")
195+
.put("followRedirects", "true")
196+
.put("disableSSLValidation", "true")
197+
.put("connectTimeout", "60000")
198+
.put("readTimeout", "60000")
199+
.put("failOnNon200Response", "true")
200+
.build();
201+
202+
ImmutableMap<String, String> runtimeProperties =
203+
ImmutableMap.of("url", baseURL + "/feeds/users");
204+
205+
ETLStage sink = new ETLStage("HTTP", new ETLPlugin("HTTP", BatchSink.PLUGIN_TYPE, properties, null));
206+
ETLBatchConfig etlConfig = ETLBatchConfig.builder("* * * * *")
207+
.addStage(source)
208+
.addStage(sink)
209+
.addConnection(source.getName(), sink.getName())
210+
.build();
211+
212+
ApplicationManager appManager = deployETL(etlConfig, inputDatasetName);
213+
214+
DataSetManager<Table> inputManager = getDataset(inputDatasetName);
215+
List<StructuredRecord> input = ImmutableList.of(
216+
StructuredRecord.builder(inputSchema).set("id", "1").build()
217+
);
218+
MockSource.writeInput(inputManager, input);
219+
// run the pipeline
220+
runETLOnce(appManager, runtimeProperties);
221+
}
222+
223+
/**
224+
* Run the SmartWorkflow in the given ETL application for once and wait for the workflow's COMPLETED status
225+
* with 5 minutes timeout.
226+
*
227+
* @param appManager the ETL application to run
228+
* @param arguments the arguments to be passed when running SmartWorkflow
229+
*/
230+
protected WorkflowManager runETLOnce(ApplicationManager appManager, Map<String, String> arguments)
231+
throws TimeoutException, InterruptedException, ExecutionException {
232+
final WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
233+
int numRuns = workflowManager.getHistory().size();
234+
workflowManager.start(arguments);
235+
Tasks.waitFor(numRuns + 1, () -> workflowManager.getHistory().size(), 20, TimeUnit.SECONDS);
236+
workflowManager.waitForStopped(5, TimeUnit.MINUTES);
237+
return workflowManager;
238+
}
239+
240+
protected ApplicationManager deployETL(ETLBatchConfig etlConfig, String appName) throws Exception {
241+
AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(BATCH_ARTIFACT, etlConfig);
242+
ApplicationId appId = NamespaceId.DEFAULT.app(appName);
243+
return deployApplication(appId, appRequest);
244+
}
177245
}

0 commit comments

Comments
 (0)