Skip to content

Commit 2ccfd41

Browse files
committed
HTTP154 change to introduce wrapper - incomplete code
Signed-off-by: davidradl <[email protected]>
1 parent 776a2fc commit 2ccfd41

20 files changed

+365
-283
lines changed

README.md

Lines changed: 72 additions & 55 deletions
Large diffs are not rendered by default.

src/main/java/com/getindata/connectors/http/HttpStatusCodeValidationFailedException.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/main/java/com/getindata/connectors/http/internal/PollingClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package com.getindata.connectors.http.internal;
22

3-
import java.util.Collection;
4-
53
import org.apache.flink.table.data.RowData;
64
import org.apache.flink.table.functions.FunctionContext;
75

6+
import com.getindata.connectors.http.internal.table.lookup.HttpRowDataWrapper;
7+
88
/**
99
* A client that is used to get enrichment data from external component.
1010
*/
11-
public interface PollingClient<T> {
11+
public interface PollingClient {
1212

1313
/**
1414
* Gets enrichment data from external component using provided lookup arguments.
1515
* @param lookupRow A {@link RowData} containing request parameters.
16-
* @return an optional result of data lookup.
16+
* @return an optional result of data lookup with http information.
1717
*/
18-
Collection<T> pull(RowData lookupRow);
18+
HttpRowDataWrapper pull(RowData lookupRow);
1919

2020
/**
2121
* Initialize the client.

src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import java.io.Serializable;
44

55
import org.apache.flink.api.common.serialization.DeserializationSchema;
6+
import org.apache.flink.table.data.RowData;
67
import org.apache.flink.util.ConfigurationException;
78

89
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
910

10-
public interface PollingClientFactory<OUT> extends Serializable {
11+
public interface PollingClientFactory extends Serializable {
1112

12-
PollingClient<OUT> createPollClient(
13+
PollingClient createPollClient(
1314
HttpLookupConfig options,
14-
DeserializationSchema<OUT> schemaDecoder
15+
DeserializationSchema<RowData> schemaDecoder
1516
) throws ConfigurationException;
1617
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ public final class HttpConnectorConfigConstants {
8888
public static final String SOURCE_CONNECTION_TIMEOUT =
8989
SOURCE_LOOKUP_PREFIX + "connection.timeout";
9090

91-
public static final String FAIL_JOB_ON_ERROR =
92-
SOURCE_LOOKUP_PREFIX + "fail-job-on-error";
91+
public static final String CONTINUE_ON_ERROR_ =
92+
SOURCE_LOOKUP_PREFIX + "continue_on_error";
9393

9494
public static final String SOURCE_PROXY_HOST =
9595
SOURCE_LOOKUP_PREFIX + "proxy.host";

src/main/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetry.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import lombok.extern.slf4j.Slf4j;
1414
import org.apache.flink.metrics.MetricGroup;
1515

16-
import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
1716
import com.getindata.connectors.http.internal.status.HttpResponseChecker;
1817

1918
@Slf4j
@@ -48,15 +47,10 @@ public void registerMetrics(MetricGroup metrics){
4847
public <T> HttpResponse<T> send(
4948
Supplier<HttpRequest> requestSupplier,
5049
HttpResponse.BodyHandler<T> responseBodyHandler
51-
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
50+
) throws IOException, InterruptedException{
5251
try {
53-
var response = Retry.decorateCheckedSupplier(retry,
52+
return Retry.decorateCheckedSupplier(retry,
5453
() -> httpClient.send(requestSupplier.get(), responseBodyHandler)).apply();
55-
if (!responseChecker.isSuccessful(response)) {
56-
throw new HttpStatusCodeValidationFailedException(
57-
"Incorrect response code: " + response.statusCode(), response);
58-
}
59-
return response;
6054
} catch (IOException | InterruptedException e) {
6155
throw e; //re-throw without wrapping
6256
} catch (Throwable t) {
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
public enum ContinueOnErrorType {
4+
HTTP_FAILED_AFTER_RETRY,
5+
HTTP_FAILED,
6+
CLIENT_SIDE_EXCEPTION,
7+
NONE
8+
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,6 @@ public class HttpLookupConnectorOptions {
8282
.noDefaultValue()
8383
.withDescription("Http client connection timeout.");
8484

85-
public static final ConfigOption<Boolean> SOURCE_LOOKUP_FAIL_JOB_ON_ERROR =
86-
ConfigOptions.key(FAIL_JOB_ON_ERROR)
87-
.booleanType()
88-
.defaultValue(true)
89-
.withDescription("Fail job on error.");
90-
9185
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
9286
ConfigOptions.key(SOURCE_PROXY_HOST)
9387
.stringType()

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3-
import java.net.http.HttpResponse;
43
import java.util.*;
54
import java.util.stream.Collectors;
65
import java.util.stream.Stream;
@@ -113,7 +112,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
113112
dynamicTableFactoryContext
114113
);
115114

116-
PollingClientFactory<RowData> pollingClientFactory =
115+
PollingClientFactory pollingClientFactory =
117116
createPollingClientFactory(lookupQueryCreator, lookupConfig);
118117

119118
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
@@ -122,7 +121,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
122121
protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
123122
DeserializationSchema<RowData>
124123
responseSchemaDecoder,
125-
PollingClientFactory<RowData>
124+
PollingClientFactory
126125
pollingClientFactory) {
127126
MetadataConverter[] metadataConverters={};
128127
if (this.metadataKeys != null) {
@@ -191,7 +190,7 @@ public boolean supportsNestedProjection() {
191190
return true;
192191
}
193192

194-
private PollingClientFactory<RowData> createPollingClientFactory(
193+
private PollingClientFactory createPollingClientFactory(
195194
LookupQueryCreator lookupQueryCreator,
196195
HttpLookupConfig lookupConfig) {
197196

@@ -322,55 +321,72 @@ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDa
322321
this.metadataKeys = connectorMetadataKeys;
323322
this.producedDataType = producedDataType;
324323
}
324+
325325
// --------------------------------------------------------------------------------------------
326326
// Metadata handling
327327
// --------------------------------------------------------------------------------------------
328328
enum ReadableMetadata {
329-
HTTP_ERROR_STRING(
329+
ERROR_STRING(
330330
"error_string",
331331
DataTypes.STRING(),
332332
new MetadataConverter() {
333333
private static final long serialVersionUID = 1L;
334-
@Override
335-
public Object read(String msg, HttpResponse httpResponse) {
334+
335+
public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
336336
return StringData.fromString(msg);
337337
}
338-
}),
339-
HTTP_ERROR_CODE(
340-
"error_code",
338+
}),
339+
HTTP_STATUS_CODE(
340+
"http_status_code",
341341
DataTypes.INT(),
342342
new MetadataConverter() {
343343
private static final long serialVersionUID = 1L;
344-
@Override
345-
public Object read(String msg, HttpResponse httpResponse) {
346-
return httpResponse != null? httpResponse.statusCode():null;
344+
345+
public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
346+
return (httpRowDataWrapper != null) ? httpRowDataWrapper.getHttpStatusCode() : null;
347347
}
348-
}),
348+
}
349+
),
349350
HTTP_HEADERS(
350-
"error_headers",
351+
"http_headers",
351352
DataTypes.MAP(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())),
352353
new MetadataConverter() {
353354
private static final long serialVersionUID = 1L;
354-
@Override
355-
public Object read(String msg, HttpResponse httpResponse) {
356-
if (httpResponse == null) return null;
357-
Map<String, List<String>> httpHeadersMap = httpResponse.headers().map();
355+
public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
356+
if (httpRowDataWrapper == null) {
357+
return null;
358+
}
359+
Map<String, List<String>> httpHeadersMap = httpRowDataWrapper.getHttpHeadersMap();
358360
Map<StringData, ArrayData> stringDataMap = new HashMap<>();
359-
for (String key: httpHeadersMap.keySet()) {
361+
for (String key : httpHeadersMap.keySet()) {
360362
List<StringData> strDataList = new ArrayList<>();
361363
httpHeadersMap.get(key).stream()
362-
.forEach((c) -> strDataList.add(StringData.fromString(c)));
364+
.forEach((c) -> strDataList.add(StringData.fromString(c)));
363365
stringDataMap.put(StringData.fromString(key), new GenericArrayData(strDataList.toArray()));
364366
}
365367
return new GenericMapData(stringDataMap);
366368
}
367-
});
369+
}
370+
),
371+
CONTINUE_ON_ERROR_TYPE(
372+
"continue_on_error_type",
373+
DataTypes.STRING(),
374+
new MetadataConverter() {
375+
private static final long serialVersionUID = 1L;
376+
377+
public Object read(String msg, HttpRowDataWrapper httpRowDataWrapper) {
378+
if (httpRowDataWrapper == null) {
379+
return null;
380+
}
381+
return StringData.fromString(httpRowDataWrapper.getContinueOnErrorType().name());
382+
}
383+
})
384+
;
368385
final String key;
369386

370387
final DataType dataType;
371388
final MetadataConverter converter;
372389

373-
//TODO decide if we need a MetadataConverter
374390
ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
375391
this.key = key;
376392
this.dataType = dataType;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
4+
import java.util.Collection;
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
import lombok.Data;
9+
import org.apache.flink.table.data.RowData;
10+
11+
/**
12+
* This bean contains the RowData information (the response body as a flink RowData).
13+
* It also contains information from the http response, namely the http headers map
14+
* and the http status code for the metadata columns.
15+
*/
16+
@Data
17+
public class HttpRowDataWrapper {
18+
private final Collection<RowData> data;
19+
private final Map<String, List<String>> httpHeadersMap;
20+
private final Integer httpStatusCode;
21+
private final ContinueOnErrorType continueOnErrorType;
22+
}

0 commit comments

Comments
 (0)